diff --git a/crates/pod/src/feature/plugin.rs b/crates/pod/src/feature/plugin.rs index e7109092..8cf585f0 100644 --- a/crates/pod/src/feature/plugin.rs +++ b/crates/pod/src/feature/plugin.rs @@ -12,7 +12,10 @@ use std::fs; use std::io::{Read as _, Write as _}; use std::net::{IpAddr, SocketAddr, ToSocketAddrs}; use std::path::{Component, Path, PathBuf}; -use std::sync::{Arc, Mutex, OnceLock}; +use std::sync::{ + Arc, Mutex, OnceLock, + atomic::{AtomicBool, Ordering}, +}; use std::time::{Duration, Instant}; use async_trait::async_trait; @@ -35,6 +38,7 @@ use tokio_tungstenite::tungstenite::client::IntoClientRequest; use tokio_tungstenite::tungstenite::protocol::{Message, WebSocketConfig}; const LEGACY_PLUGIN_RUNTIME_WASM_KIND: &str = "wasm"; +const PLUGIN_SERVICE_WEBSOCKET_RECV_TIMEOUT: Duration = Duration::from_millis(250); use super::{ FeatureDescriptor, FeatureId, FeatureInstallContext, FeatureInstallError, FeatureModule, @@ -2440,7 +2444,7 @@ struct PluginRequestResponse { truncated: bool, } -#[derive(Clone, Debug, Deserialize)] +#[derive(Clone, Debug, Deserialize, Serialize)] #[serde(deny_unknown_fields)] struct PluginWebSocketOpenRequest { url: String, @@ -2979,6 +2983,10 @@ pub enum PluginInstanceDiagnosticKind { ServiceOutputCommandRecorded, ServiceOutputCommandRejected, ServiceOutputCommandUnsupported, + ServiceWebSocketConnected, + ServiceWebSocketClosed, + ServiceWebSocketError, + ServiceWebSocketSendFailed, } #[derive(Clone, Debug, PartialEq, Eq, Serialize)] @@ -3015,6 +3023,27 @@ pub struct PluginIngressDispatchCounters { pub timed_out: u64, } +#[derive(Clone, Debug, PartialEq, Eq, Serialize)] +pub enum PluginServiceWebSocketConnectionState { + Connecting, + Connected, + Closed, + Failed, +} + +#[derive(Clone, Debug, PartialEq, Eq, Serialize)] +pub struct PluginServiceWebSocketConnectionStatus { + pub url: String, + pub ingress_name: String, + pub state: PluginServiceWebSocketConnectionState, + pub last_frame_at: Option, + pub last_error: Option, + pub received_text_frames: u64, + pub sent_text_frames: u64, + pub queue_drops: u64, + pub send_failures: u64, +} + #[derive(Clone, Debug, PartialEq, Serialize)] pub struct PluginInstanceStatus { pub plugin_ref: String, @@ -3027,6 +3056,7 @@ pub struct PluginInstanceStatus { /// Last bounded Service output command outcomes. These are produced only by /// Service/Ingress dispatch and are intentionally separate from ToolOutput. pub output_command_results: Vec, + pub websocket_connections: Vec, pub diagnostics: Vec, } @@ -3321,11 +3351,496 @@ impl PluginInstanceRegistry { } } +#[derive(Clone, Debug)] +struct PluginServiceWebSocketSubscription { + ingress_name: String, + source: String, + url: String, +} + +#[derive(Clone, Default)] +struct PluginServiceWebSocketDriver { + inner: Arc>>, +} + +struct PluginServiceWebSocketConnection { + status: PluginServiceWebSocketConnectionStatus, + connection: Option>>>, + stop: Arc, +} + +impl PluginServiceWebSocketDriver { + fn start_connection( + &self, + handle: PluginInstanceHandle, + client: Arc, + subscription: PluginServiceWebSocketSubscription, + ) { + if self.connection_count() >= PLUGIN_WEBSOCKET_MAX_OPEN_CONNECTIONS { + let message = format!( + "host-owned WebSocket connection limit ({}) exceeded for {}", + PLUGIN_WEBSOCKET_MAX_OPEN_CONNECTIONS, subscription.url + ); + self.insert_failed_status(&subscription, message.clone()); + handle.record_service_websocket_diagnostic( + PluginInstanceDiagnosticKind::ServiceWebSocketError, + message, + true, + ); + return; + } + + let request = PluginWebSocketOpenRequest { + url: subscription.url.clone(), + protocols: Vec::new(), + headers: Vec::new(), + }; + self.insert_status( + &subscription, + PluginServiceWebSocketConnectionState::Connecting, + None, + ); + + let (request, url) = match validate_plugin_service_websocket_open_request(&handle, &request) + { + Ok(value) => value, + Err(message) => { + self.update_status_error( + &subscription.url, + PluginServiceWebSocketConnectionState::Failed, + message.clone(), + ); + handle.record_service_websocket_diagnostic( + PluginInstanceDiagnosticKind::ServiceWebSocketError, + message, + true, + ); + return; + } + }; + if !client.supports_bounded_open() { + let message = "host-owned WebSocket client cannot guarantee bounded/cancellable open; refusing to dial".to_string(); + self.update_status_error( + &subscription.url, + PluginServiceWebSocketConnectionState::Failed, + message.clone(), + ); + handle.record_service_websocket_diagnostic( + PluginInstanceDiagnosticKind::ServiceWebSocketError, + message, + true, + ); + return; + } + let connection = match client.open(&request, &url, PluginWebSocketLimits::default()) { + Ok(connection) => Arc::new(Mutex::new(connection)), + Err(error) => { + let message = format!( + "host-owned WebSocket open failed for {}: {}", + safe_url(&url), + error.0 + ); + self.update_status_error( + &subscription.url, + PluginServiceWebSocketConnectionState::Failed, + message.clone(), + ); + handle.record_service_websocket_diagnostic( + PluginInstanceDiagnosticKind::ServiceWebSocketError, + message, + true, + ); + return; + } + }; + + let stop = Arc::new(AtomicBool::new(false)); + self.attach_connection(&subscription.url, connection.clone(), stop.clone()); + self.update_status_connected(&subscription.url); + handle.record_service_websocket_diagnostic( + PluginInstanceDiagnosticKind::ServiceWebSocketConnected, + format!("host-owned WebSocket connected: {}", safe_url(&url)), + false, + ); + + let driver = self.clone(); + std::thread::spawn(move || { + driver.reader_loop(handle, subscription, connection, stop); + }); + } + + fn reader_loop( + &self, + handle: PluginInstanceHandle, + subscription: PluginServiceWebSocketSubscription, + connection: Arc>>, + stop: Arc, + ) { + while !stop.load(Ordering::SeqCst) { + let recv = { + let mut connection = connection + .lock() + .expect("service websocket connection poisoned"); + connection.recv_text( + PLUGIN_SERVICE_WEBSOCKET_RECV_TIMEOUT, + PLUGIN_WEBSOCKET_MAX_MESSAGE_BYTES, + ) + }; + match recv { + Ok(PluginWebSocketRecvResponse::Text { text }) => { + self.record_frame(&subscription.url); + let event = PluginIngressEvent::new( + subscription.ingress_name.clone(), + "websocket_text", + subscription.source.clone(), + serde_json::json!({ + "url": subscription.url, + "text": text, + }), + ); + if let Err(error) = handle.deliver_ingress(&subscription.ingress_name, event) { + let message = error.bounded_message(); + self.record_queue_drop(&subscription.url, message.clone()); + handle.record_service_websocket_diagnostic( + error.diagnostic_kind(), + format!("host-owned WebSocket ingress drop: {message}"), + true, + ); + } + } + Ok(PluginWebSocketRecvResponse::Closed) => { + let message = "host-owned WebSocket closed by peer".to_string(); + self.update_status_error( + &subscription.url, + PluginServiceWebSocketConnectionState::Closed, + message.clone(), + ); + handle.record_service_websocket_diagnostic( + PluginInstanceDiagnosticKind::ServiceWebSocketClosed, + message.clone(), + false, + ); + let event = PluginIngressEvent::new( + subscription.ingress_name.clone(), + "websocket_close", + subscription.source.clone(), + serde_json::json!({"url": subscription.url, "reason": message}), + ); + let _ = handle.deliver_ingress(&subscription.ingress_name, event); + break; + } + Err(error) if error.0.contains("timed out") => continue, + Err(error) => { + let message = format!("host-owned WebSocket receive failed: {}", error.0); + self.update_status_error( + &subscription.url, + PluginServiceWebSocketConnectionState::Failed, + message.clone(), + ); + handle.record_service_websocket_diagnostic( + PluginInstanceDiagnosticKind::ServiceWebSocketError, + message.clone(), + true, + ); + let event = PluginIngressEvent::new( + subscription.ingress_name.clone(), + "websocket_error", + subscription.source.clone(), + serde_json::json!({"url": subscription.url, "error": message}), + ); + let _ = handle.deliver_ingress(&subscription.ingress_name, event); + break; + } + } + } + } + + fn send_text(&self, url: &str, text: &str) -> Result { + if text.len() > PLUGIN_WEBSOCKET_MAX_TEXT_BYTES { + return Err(format!( + "websocket_send text exceeds {} bytes", + PLUGIN_WEBSOCKET_MAX_TEXT_BYTES + )); + } + let parsed = + reqwest::Url::parse(url).map_err(|error| format!("invalid WebSocket URL: {error}"))?; + let key = parsed.as_str().to_string(); + let (display_url, connection) = { + let guard = self + .inner + .lock() + .expect("service websocket driver poisoned"); + let entry = guard.get(&key).ok_or_else(|| { + format!( + "no host-owned WebSocket connection is active for {}", + safe_url(&parsed) + ) + })?; + let Some(connection) = &entry.connection else { + return Err(format!( + "host-owned WebSocket connection is not connected for {}", + safe_url(&parsed) + )); + }; + (entry.status.url.clone(), connection.clone()) + }; + let send = connection + .lock() + .expect("service websocket connection poisoned") + .send_text(text); + match send { + Ok(()) => { + self.record_send_success(&key); + Ok(format!( + "websocket_send sent {} bytes to {display_url}", + text.len() + )) + } + Err(error) => { + let message = format!("websocket_send failed for {display_url}: {}", error.0); + self.record_send_failure(&key, message.clone()); + Err(message) + } + } + } + + fn statuses(&self) -> Vec { + let mut statuses: Vec<_> = self + .inner + .lock() + .expect("service websocket driver poisoned") + .values() + .map(|entry| entry.status.clone()) + .collect(); + statuses.sort_by(|left, right| left.url.cmp(&right.url)); + statuses + } + + fn stop_all(&self) { + let connections: Vec<_> = { + let mut guard = self + .inner + .lock() + .expect("service websocket driver poisoned"); + guard + .values_mut() + .filter_map(|entry| { + entry.stop.store(true, Ordering::SeqCst); + entry.status.state = PluginServiceWebSocketConnectionState::Closed; + entry.connection.clone() + }) + .collect() + }; + for connection in connections { + if let Ok(mut connection) = connection.lock() { + let _ = connection.close(); + } + } + } + + fn connection_count(&self) -> usize { + self.inner + .lock() + .expect("service websocket driver poisoned") + .len() + } + + fn insert_status( + &self, + subscription: &PluginServiceWebSocketSubscription, + state: PluginServiceWebSocketConnectionState, + error: Option, + ) { + let url = reqwest::Url::parse(&subscription.url) + .map(|url| safe_url(&url)) + .unwrap_or_else(|_| safe_fs_path(&subscription.url)); + self.inner + .lock() + .expect("service websocket driver poisoned") + .insert( + subscription.url.clone(), + PluginServiceWebSocketConnection { + status: PluginServiceWebSocketConnectionStatus { + url, + ingress_name: subscription.ingress_name.clone(), + state, + last_frame_at: None, + last_error: error, + received_text_frames: 0, + sent_text_frames: 0, + queue_drops: 0, + send_failures: 0, + }, + connection: None, + stop: Arc::new(AtomicBool::new(false)), + }, + ); + } + + fn insert_failed_status( + &self, + subscription: &PluginServiceWebSocketSubscription, + error: String, + ) { + self.insert_status( + subscription, + PluginServiceWebSocketConnectionState::Failed, + Some(error), + ); + } + + fn attach_connection( + &self, + key: &str, + connection: Arc>>, + stop: Arc, + ) { + if let Some(entry) = self + .inner + .lock() + .expect("service websocket driver poisoned") + .get_mut(key) + { + entry.connection = Some(connection); + entry.stop = stop; + } + } + + fn update_status_connected(&self, key: &str) { + if let Some(entry) = self + .inner + .lock() + .expect("service websocket driver poisoned") + .get_mut(key) + { + entry.status.state = PluginServiceWebSocketConnectionState::Connected; + entry.status.last_error = None; + } + } + + fn update_status_error( + &self, + key: &str, + state: PluginServiceWebSocketConnectionState, + error: String, + ) { + if let Some(entry) = self + .inner + .lock() + .expect("service websocket driver poisoned") + .get_mut(key) + { + entry.status.state = state; + entry.status.last_error = Some(bounded_message(redact_secret_like(&error))); + } + } + + fn record_frame(&self, key: &str) { + if let Some(entry) = self + .inner + .lock() + .expect("service websocket driver poisoned") + .get_mut(key) + { + entry.status.last_frame_at = + Some(chrono::Utc::now().to_rfc3339_opts(chrono::SecondsFormat::Millis, true)); + entry.status.received_text_frames += 1; + } + } + + fn record_queue_drop(&self, key: &str, error: String) { + if let Some(entry) = self + .inner + .lock() + .expect("service websocket driver poisoned") + .get_mut(key) + { + entry.status.queue_drops += 1; + entry.status.last_error = Some(bounded_message(redact_secret_like(&error))); + } + } + + fn record_send_success(&self, key: &str) { + if let Some(entry) = self + .inner + .lock() + .expect("service websocket driver poisoned") + .get_mut(key) + { + entry.status.sent_text_frames += 1; + } + } + + fn record_send_failure(&self, key: &str, error: String) { + if let Some(entry) = self + .inner + .lock() + .expect("service websocket driver poisoned") + .get_mut(key) + { + entry.status.send_failures += 1; + entry.status.last_error = Some(bounded_message(redact_secret_like(&error))); + } + } +} + +fn validate_plugin_service_websocket_open_request( + handle: &PluginInstanceHandle, + request: &PluginWebSocketOpenRequest, +) -> Result<(PluginWebSocketOpenRequest, reqwest::Url), String> { + let record = { + let instance = handle.0.lock().expect("plugin instance poisoned"); + instance.record.clone() + }; + authorize_plugin_host_api(&record, PluginHostApi::WebSocket) + .map_err(|error| format!("host-owned websocket not granted: {}", error.0))?; + let bytes = serde_json::to_vec(request) + .map_err(|error| format!("failed to encode host-owned websocket open request: {error}"))?; + validate_plugin_websocket_open_request(&record, &bytes).map_err(|error| error.0) +} + +fn parse_plugin_service_websocket_source(source: &str) -> Option> { + let trimmed = source.trim(); + let raw = if let Some(rest) = trimmed.strip_prefix("websocket:") { + rest.trim() + } else if trimmed.starts_with("ws://") || trimmed.starts_with("wss://") { + trimmed + } else { + return None; + }; + if raw.is_empty() { + return Some(Err("websocket source URL is empty".to_string())); + } + let parsed = match reqwest::Url::parse(raw) { + Ok(parsed) => parsed, + Err(error) => return Some(Err(format!("invalid WebSocket URL: {error}"))), + }; + match parsed.scheme() { + "ws" | "wss" => Some(Ok(parsed.as_str().to_string())), + other => Some(Err(format!("unsupported WebSocket URL scheme: {other}"))), + } +} + #[derive(Clone)] pub struct PluginInstanceHandle(Arc>); impl PluginInstanceHandle { fn new(record: ResolvedPluginRecord) -> Result { + Self::new_with_service_websocket_client(record, Arc::new(TungstenitePluginWebSocketClient)) + } + + #[cfg(test)] + fn new_with_test_websocket_client( + record: ResolvedPluginRecord, + service_websocket_client: Arc, + ) -> Result { + Self::new_with_service_websocket_client(record, service_websocket_client) + } + + fn new_with_service_websocket_client( + record: ResolvedPluginRecord, + service_websocket_client: Arc, + ) -> Result { let runtime = PluginInstanceRuntime::new(&record)?; let mut instance = PluginInstance { record, @@ -3337,10 +3852,14 @@ impl PluginInstanceHandle { dispatch_counters: PluginIngressDispatchCounters::default(), last_error: None, output_command_results: Vec::new(), + service_websockets: PluginServiceWebSocketDriver::default(), + service_websocket_client, diagnostics: Vec::new(), }; instance.start()?; - Ok(Self(Arc::new(Mutex::new(instance)))) + let handle = Self(Arc::new(Mutex::new(instance))); + handle.start_service_websockets(); + Ok(handle) } fn handle_tool(&self, tool_name: &str, input: Vec) -> Result { @@ -3378,6 +3897,44 @@ impl PluginInstanceHandle { instance.diagnostics.push(diagnostic); } } + + fn record_service_websocket_diagnostic( + &self, + kind: PluginInstanceDiagnosticKind, + message: impl Into, + mark_error: bool, + ) { + if let Ok(mut instance) = self.0.lock() { + let message = bounded_message(redact_secret_like(&message.into())); + if mark_error { + instance.last_error = Some(message.clone()); + } + let state = instance.lifecycle.clone(); + instance + .diagnostics + .push(PluginInstanceDiagnostic::with_kind(kind, state, message)); + } + } + + fn start_service_websockets(&self) { + let (driver, client, subscriptions, diagnostics) = { + let instance = self.0.lock().expect("plugin instance poisoned"); + let (subscriptions, diagnostics) = instance.service_websocket_subscriptions(); + let driver = instance.service_websockets.clone(); + let client = instance.service_websocket_client.clone(); + (driver, client, subscriptions, diagnostics) + }; + for diagnostic in diagnostics { + self.record_service_websocket_diagnostic( + PluginInstanceDiagnosticKind::ServiceWebSocketError, + diagnostic, + true, + ); + } + for subscription in subscriptions { + driver.start_connection(self.clone(), client.clone(), subscription); + } + } } struct PluginInstance { @@ -3390,6 +3947,8 @@ struct PluginInstance { dispatch_counters: PluginIngressDispatchCounters, last_error: Option, output_command_results: Vec, + service_websockets: PluginServiceWebSocketDriver, + service_websocket_client: Arc, diagnostics: Vec, } @@ -3440,6 +3999,35 @@ impl PluginInstance { Ok(()) } + fn service_websocket_subscriptions( + &self, + ) -> (Vec, Vec) { + if !surface_enabled(&self.record, PluginSurface::Service) + || !surface_enabled(&self.record, PluginSurface::Ingress) + { + return (Vec::new(), Vec::new()); + } + let mut subscriptions = Vec::new(); + let mut diagnostics = Vec::new(); + for ingress in &self.record.manifest.ingresses { + for source in &ingress.sources { + match parse_plugin_service_websocket_source(source) { + None => {} + Some(Ok(url)) => subscriptions.push(PluginServiceWebSocketSubscription { + ingress_name: ingress.name.clone(), + source: source.clone(), + url, + }), + Some(Err(message)) => diagnostics.push(format!( + "invalid WebSocket ingress source for {}: {message}", + ingress.name + )), + } + } + } + (subscriptions, diagnostics) + } + fn handle_tool( &mut self, tool_name: &str, @@ -3726,6 +4314,7 @@ impl PluginInstance { "plugin service stop requested; ingress queue is closed", )); self.ingress_queue.clear(); + self.service_websockets.stop_all(); let stop_result = match &mut self.runtime { PluginInstanceRuntime::ComponentToolAdapter => Ok(()), #[cfg(test)] @@ -3764,6 +4353,7 @@ impl PluginInstance { last_error: self.last_error.clone(), dispatch_counters: self.dispatch_counters.clone(), output_command_results: self.output_command_results.clone(), + websocket_connections: self.service_websockets.statuses(), diagnostics: self.diagnostics.clone(), } } @@ -3952,10 +4542,10 @@ impl PluginInstance { &self, payload: &PluginServiceWebSocketSendCommandPayload, ) -> Result<(), String> { - if payload.text.len() > PLUGIN_WEBSOCKET_MAX_MESSAGE_BYTES { + if payload.text.len() > PLUGIN_WEBSOCKET_MAX_TEXT_BYTES { return Err(format!( "websocket_send text exceeds {} bytes", - PLUGIN_WEBSOCKET_MAX_MESSAGE_BYTES + PLUGIN_WEBSOCKET_MAX_TEXT_BYTES )); } let url = reqwest::Url::parse(&payload.url) @@ -4015,10 +4605,30 @@ impl PluginInstance { ) } PluginServiceOutputCommandKind::WebSocketSend => { - PluginServiceOutputCommandResult::unsupported( - &command, - "websocket_send output command is grant-checked but WebSocket send transport is unsupported in v0", - ) + let payload: PluginServiceWebSocketSendCommandPayload = + match serde_json::from_value(command.payload.clone()) { + Ok(payload) => payload, + Err(error) => { + return PluginServiceOutputCommandResult::rejected_for( + &command, + format!("invalid websocket_send payload: {error}"), + ); + } + }; + match self + .service_websockets + .send_text(&payload.url, &payload.text) + { + Ok(message) => PluginServiceOutputCommandResult::recorded(&command, message), + Err(message) => { + self.diagnostics.push(PluginInstanceDiagnostic::with_kind( + PluginInstanceDiagnosticKind::ServiceWebSocketSendFailed, + self.lifecycle.clone(), + bounded_message(redact_secret_like(&message)), + )); + PluginServiceOutputCommandResult::rejected_for(&command, message) + } + } } } } @@ -5284,6 +5894,123 @@ mod tests { record.grants.websocket.push(target); } + fn add_websocket_ingress_source(record: &mut ResolvedPluginRecord, source: &str) { + let ingress = record + .manifest + .ingresses + .iter_mut() + .find(|ingress| ingress.name == "shared_ingress") + .expect("shared ingress"); + ingress.event_kinds = vec![ + "test".into(), + "websocket_text".into(), + "websocket_close".into(), + "websocket_error".into(), + ]; + ingress.sources.push(source.to_string()); + } + + fn service_websocket_record() -> ResolvedPluginRecord { + let mut record = test_service_ingress_record(); + add_websocket_output_grant(&mut record); + add_websocket_ingress_source(&mut record, "websocket:wss://ws.example.test/events"); + record + } + + #[derive(Clone, Default)] + struct ServiceWebSocketClient { + events: Arc>>>, + sent: Arc>>, + opens: Arc, + send_error: Arc>>, + } + + impl ServiceWebSocketClient { + fn with_events(events: Vec>) -> Self { + Self { + events: Arc::new(Mutex::new(events.into())), + ..Self::default() + } + } + + fn sent(&self) -> Vec { + self.sent.lock().unwrap().clone() + } + + fn fail_sends_with(&self, message: &str) { + *self.send_error.lock().unwrap() = Some(message.to_string()); + } + } + + impl PluginWebSocketClient for ServiceWebSocketClient { + fn supports_bounded_open(&self) -> bool { + true + } + + fn open( + &self, + _request: &PluginWebSocketOpenRequest, + _url: &reqwest::Url, + _limits: PluginWebSocketLimits, + ) -> Result, PluginWebSocketError> { + self.opens.fetch_add(1, std::sync::atomic::Ordering::SeqCst); + Ok(Box::new(ServiceWebSocketConnection { + events: self.events.clone(), + sent: self.sent.clone(), + send_error: self.send_error.clone(), + closed: Arc::new(std::sync::atomic::AtomicBool::new(false)), + })) + } + } + + struct ServiceWebSocketConnection { + events: Arc>>>, + sent: Arc>>, + send_error: Arc>>, + closed: Arc, + } + + impl PluginWebSocketConnection for ServiceWebSocketConnection { + fn send_text(&mut self, text: &str) -> Result<(), PluginWebSocketError> { + if let Some(error) = self.send_error.lock().unwrap().clone() { + return Err(PluginWebSocketError::new(error)); + } + self.sent.lock().unwrap().push(text.to_string()); + Ok(()) + } + + fn recv_text( + &mut self, + timeout: Duration, + _max_message_bytes: usize, + ) -> Result { + if self.closed.load(std::sync::atomic::Ordering::SeqCst) { + return Ok(PluginWebSocketRecvResponse::Closed); + } + if let Some(event) = self.events.lock().unwrap().pop_front() { + return event.map_err(PluginWebSocketError::new); + } + std::thread::sleep(timeout.min(Duration::from_millis(10))); + Err(PluginWebSocketError::new("receive timed out")) + } + + fn close(&mut self) -> Result<(), PluginWebSocketError> { + self.closed.store(true, std::sync::atomic::Ordering::SeqCst); + Ok(()) + } + } + + fn wait_until(mut condition: impl FnMut() -> bool) { + let deadline = std::time::Instant::now() + Duration::from_secs(2); + while std::time::Instant::now() < deadline { + if condition() { + return; + } + std::thread::sleep(Duration::from_millis(10)); + } + assert!(condition(), "condition did not become true before timeout"); + } + #[test] fn service_selected_ignores_unselected_tool_without_grants() { let mut record = record(vec![tool("hidden_tool")]); @@ -5604,7 +6331,8 @@ mod tests { } #[test] - fn service_output_command_placeholders_are_grant_checked_and_unsupported() { + fn service_output_commands_are_grant_checked_and_supported_transport_rejects_without_connection() + { let mut record = test_service_ingress_record(); add_request_output_grant(&mut record); add_websocket_output_grant(&mut record); @@ -5633,20 +6361,226 @@ mod tests { let report = handle.deliver_ingress("shared_ingress", event).unwrap(); assert_eq!(report.output_command_results.len(), 2); - assert!( - report - .output_command_results - .iter() - .all(|result| { result.status == PluginServiceOutputCommandStatus::Unsupported }) + let request_result = report + .output_command_results + .iter() + .find(|result| result.command_id.as_deref() == Some("cmd-request")) + .unwrap(); + let websocket_result = report + .output_command_results + .iter() + .find(|result| result.command_id.as_deref() == Some("cmd-websocket")) + .unwrap(); + assert_eq!( + request_result.status, + PluginServiceOutputCommandStatus::Unsupported + ); + assert_eq!( + websocket_result.status, + PluginServiceOutputCommandStatus::Rejected ); let status = handle.status(); assert_eq!(status.output_command_results.len(), 2); assert!(status.diagnostics.iter().any(|diagnostic| { diagnostic.kind == PluginInstanceDiagnosticKind::ServiceOutputCommandUnsupported + && diagnostic.message.contains("cmd-request") + })); + assert!(status.diagnostics.iter().any(|diagnostic| { + diagnostic.kind == PluginInstanceDiagnosticKind::ServiceOutputCommandRejected && diagnostic.message.contains("cmd-websocket") })); } + #[test] + fn service_websocket_driver_enqueues_incoming_text_and_reports_close() { + let client = ServiceWebSocketClient::with_events(vec![ + Ok(PluginWebSocketRecvResponse::Text { + text: "hello service".into(), + }), + Ok(PluginWebSocketRecvResponse::Closed), + ]); + let handle = PluginInstanceHandle::new_with_test_websocket_client( + service_websocket_record(), + Arc::new(client.clone()), + ) + .unwrap(); + + wait_until(|| handle.status().dispatch_counters.dispatched >= 1); + let status = handle.status(); + assert!(status.dispatch_counters.dispatched >= 1); + assert_eq!(status.websocket_connections.len(), 1); + let connection = &status.websocket_connections[0]; + assert_eq!(connection.received_text_frames, 1); + assert!(connection.last_frame_at.is_some()); + assert_eq!(connection.queue_drops, 0); + assert!(status.diagnostics.iter().any(|diagnostic| { + diagnostic.kind == PluginInstanceDiagnosticKind::ServiceWebSocketClosed + })); + handle.stop().unwrap(); + } + + #[test] + fn websocket_send_output_command_sends_on_host_owned_connection() { + let client = ServiceWebSocketClient::default(); + let handle = PluginInstanceHandle::new_with_test_websocket_client( + service_websocket_record(), + Arc::new(client.clone()), + ) + .unwrap(); + wait_until(|| { + handle + .status() + .websocket_connections + .iter() + .any(|connection| { + connection.state == PluginServiceWebSocketConnectionState::Connected + }) + }); + let event = test_ingress_event("shared_ingress", json!({})); + let command_value = service_output_command( + &event, + "cmd-websocket", + "websocket_send", + json!({"url": "wss://ws.example.test/events", "text": "pong"}), + ); + let command: PluginServiceOutputCommandEnvelope = + serde_json::from_value(command_value).unwrap(); + + let result = handle + .0 + .lock() + .unwrap() + .execute_service_output_command(command); + + assert_eq!(result.status, PluginServiceOutputCommandStatus::Recorded); + assert_eq!(client.sent(), vec!["pong".to_string()]); + let status = handle.status(); + assert_eq!(status.websocket_connections[0].sent_text_frames, 1); + handle.stop().unwrap(); + } + + #[test] + fn websocket_send_output_command_records_send_failure_diagnostic() { + let client = ServiceWebSocketClient::default(); + client.fail_sends_with("transport write failed"); + let handle = PluginInstanceHandle::new_with_test_websocket_client( + service_websocket_record(), + Arc::new(client), + ) + .unwrap(); + wait_until(|| { + handle + .status() + .websocket_connections + .iter() + .any(|connection| { + connection.state == PluginServiceWebSocketConnectionState::Connected + }) + }); + let event = test_ingress_event("shared_ingress", json!({})); + let command: PluginServiceOutputCommandEnvelope = + serde_json::from_value(service_output_command( + &event, + "cmd-websocket-fail", + "websocket_send", + json!({"url": "wss://ws.example.test/events", "text": "pong"}), + )) + .unwrap(); + + let result = handle + .0 + .lock() + .unwrap() + .execute_service_output_command(command); + + assert_eq!(result.status, PluginServiceOutputCommandStatus::Rejected); + let status = handle.status(); + assert_eq!(status.websocket_connections[0].send_failures, 1); + assert!(status.diagnostics.iter().any(|diagnostic| { + diagnostic.kind == PluginInstanceDiagnosticKind::ServiceWebSocketSendFailed + && diagnostic.message.contains("transport write failed") + })); + handle.stop().unwrap(); + } + + #[test] + fn websocket_send_rejects_unauthorized_target_before_execution() { + let mut record = service_websocket_record(); + let handle = PluginInstanceHandle::new_with_test_websocket_client( + record.clone(), + Arc::new(ServiceWebSocketClient::default()), + ) + .unwrap(); + let event = test_ingress_event("shared_ingress", json!({})); + let command: PluginServiceOutputCommandEnvelope = + serde_json::from_value(service_output_command( + &event, + "cmd-websocket-denied", + "websocket_send", + json!({"url": "wss://ws.example.test/private", "text": "nope"}), + )) + .unwrap(); + + let error = handle + .0 + .lock() + .unwrap() + .validate_service_output_command_envelope(&command, &event) + .unwrap_err(); + + assert!( + error.contains("websocket_send target denied") + || error.contains("not declared by the plugin manifest"), + "{error}" + ); + assert_eq!(handle.status().websocket_connections[0].sent_text_frames, 0); + record.grants.permissions.retain(|permission| { + *permission != PluginPermission::host_api(PluginHostApi::WebSocket) + }); + let denied_handle = PluginInstanceHandle::new_with_test_websocket_client( + record, + Arc::new(ServiceWebSocketClient::default()), + ) + .unwrap(); + wait_until(|| !denied_handle.status().diagnostics.is_empty()); + assert!(denied_handle.status().diagnostics.iter().any(|diagnostic| { + diagnostic.kind == PluginInstanceDiagnosticKind::ServiceWebSocketError + && diagnostic.message.contains("not granted") + })); + handle.stop().unwrap(); + denied_handle.stop().unwrap(); + } + + #[test] + fn service_websocket_driver_reports_receive_error_as_diagnostic() { + let client = ServiceWebSocketClient::with_events(vec![Err( + "binary frames are not supported by host-owned service websocket".into(), + )]); + let handle = PluginInstanceHandle::new_with_test_websocket_client( + service_websocket_record(), + Arc::new(client), + ) + .unwrap(); + + wait_until(|| { + handle.status().websocket_connections[0].state + == PluginServiceWebSocketConnectionState::Failed + }); + let status = handle.status(); + assert!(status.diagnostics.iter().any(|diagnostic| { + diagnostic.kind == PluginInstanceDiagnosticKind::ServiceWebSocketError + && diagnostic.message.contains("binary frames") + })); + assert!( + status.websocket_connections[0] + .last_error + .as_deref() + .unwrap() + .contains("binary frames") + ); + handle.stop().unwrap(); + } + #[test] fn service_output_command_rejects_malformed_envelope() { let handle = PluginInstanceHandle::new(test_service_ingress_record()).unwrap(); @@ -5752,6 +6686,8 @@ mod tests { dispatch_counters: PluginIngressDispatchCounters::default(), last_error: None, output_command_results: Vec::new(), + service_websockets: PluginServiceWebSocketDriver::default(), + service_websocket_client: Arc::new(TungstenitePluginWebSocketClient), diagnostics: Vec::new(), }))); diff --git a/docs/development/plugin-development.md b/docs/development/plugin-development.md index 75df3828..881ff48e 100644 --- a/docs/development/plugin-development.md +++ b/docs/development/plugin-development.md @@ -329,7 +329,7 @@ Yoi checks method, scheme, host, optional port, and path prefix against both the ## `websocket` host API -The `websocket` host API is a separate grant-gated capability named `host_api.websocket`, not an extension of `host_api.request`. It opens host-owned WebSocket connections only when both the package manifest and enablement config declare matching targets. Plugin code drives the lifecycle explicitly through `open`, `send-text`, `recv`, and `close`; incoming messages are returned only from bounded `recv` calls and are not injected into model context, history, Dashboard state, or Ticket state. +The `websocket` host API is a separate grant-gated capability named `host_api.websocket`, not an extension of `host_api.request`. It opens host-owned WebSocket connections only when both the package manifest and enablement config declare matching targets. Tool-style/internal bounded use can still drive the lifecycle explicitly through `open`, `send-text`, `recv`, and `close`; incoming messages are returned only from bounded `recv` calls and are not injected into model context, history, Dashboard state, or Ticket state. Service Plugins should prefer the host-owned Service WebSocket driver instead of running a long-lived guest recv loop: declare a Service ingress source as `websocket:wss://host/path`, include the `websocket_text`/`websocket_close`/`websocket_error` event kinds you want delivered, and emit the Service output command `websocket_send` to send text back through the same grant-checked host connection. Example manifest shape: