diff --git a/crates/pod/src/feature/plugin.rs b/crates/pod/src/feature/plugin.rs index cf880c5b..e7109092 100644 --- a/crates/pod/src/feature/plugin.rs +++ b/crates/pod/src/feature/plugin.rs @@ -2283,6 +2283,9 @@ const PLUGIN_WASM_MAX_SUMMARY_BYTES: usize = 1024; const PLUGIN_WASM_FUEL: u64 = 5_000_000; const PLUGIN_WASM_TIMEOUT: Duration = Duration::from_secs(1); const PLUGIN_SERVICE_INGRESS_QUEUE_CAPACITY: usize = 32; +const PLUGIN_SERVICE_OUTPUT_COMMAND_MAX_COUNT: usize = 16; +const PLUGIN_SERVICE_OUTPUT_COMMAND_MAX_PAYLOAD_BYTES: usize = 16 * 1024; +const PLUGIN_SERVICE_OUTPUT_COMMAND_MAX_RESULTS: usize = 32; #[cfg(test)] const PLUGIN_SERVICE_INGRESS_DISPATCH_TIMEOUT: Duration = Duration::from_millis(25); #[cfg(not(test))] @@ -2447,6 +2450,22 @@ struct PluginWebSocketOpenRequest { headers: Vec, } +#[derive(Debug, Deserialize)] +#[serde(deny_unknown_fields)] +struct PluginServiceWebSocketSendCommandPayload { + url: String, + text: String, +} + +#[derive(Debug, Deserialize)] +#[serde(deny_unknown_fields)] +struct PluginServiceDiagnosticStatusCommandPayload { + #[serde(default)] + message: Option, + #[serde(default)] + status: Option, +} + #[derive(Clone, Debug, Deserialize, Serialize)] struct PluginWebSocketOpenResponse { handle: u32, @@ -2957,6 +2976,9 @@ pub enum PluginInstanceDiagnosticKind { ServiceUnavailable, ServiceFailed, ServiceStopped, + ServiceOutputCommandRecorded, + ServiceOutputCommandRejected, + ServiceOutputCommandUnsupported, } #[derive(Clone, Debug, PartialEq, Eq, Serialize)] @@ -3002,6 +3024,9 @@ pub struct PluginInstanceStatus { pub queue_capacity: usize, pub last_error: Option, pub dispatch_counters: PluginIngressDispatchCounters, + /// Last bounded Service output command outcomes. These are produced only by + /// Service/Ingress dispatch and are intentionally separate from ToolOutput. + pub output_command_results: Vec, pub diagnostics: Vec, } @@ -3035,12 +3060,138 @@ impl PluginIngressEvent { } } +/// Host-validated output command envelope returned by Service/Ingress handlers. +/// +/// Service output commands are intentionally distinct from ordinary plugin +/// `ToolOutput`: handlers can request bounded side effects, but the host parses, +/// validates, grant-checks, records diagnostics, and fail-closes before executing +/// any supported command. +#[derive(Clone, Debug, PartialEq, Serialize, Deserialize)] +#[serde(deny_unknown_fields)] +pub struct PluginServiceOutputCommandEnvelope { + pub correlation_id: String, + pub source_event_id: String, + pub command_id: String, + pub kind: PluginServiceOutputCommandKind, + pub payload: Value, + pub requested_at: String, +} + +#[derive(Clone, Copy, Debug, PartialEq, Eq, Serialize, Deserialize)] +#[serde(rename_all = "snake_case")] +pub enum PluginServiceOutputCommandKind { + DiagnosticStatusUpdate, + HostRequestDispatch, + #[serde(rename = "websocket_send")] + WebSocketSend, +} + +impl PluginServiceOutputCommandKind { + fn as_str(self) -> &'static str { + match self { + Self::DiagnosticStatusUpdate => "diagnostic_status_update", + Self::HostRequestDispatch => "host_request_dispatch", + Self::WebSocketSend => "websocket_send", + } + } +} + +#[derive(Clone, Debug, PartialEq, Eq, Serialize)] +pub enum PluginServiceOutputCommandStatus { + Recorded, + Rejected, + Unsupported, +} + +#[derive(Clone, Debug, PartialEq, Serialize)] +pub struct PluginServiceOutputCommandResult { + pub correlation_id: Option, + pub source_event_id: Option, + pub command_id: Option, + pub kind: Option, + pub status: PluginServiceOutputCommandStatus, + pub message: String, + pub recorded_at: String, +} + +impl PluginServiceOutputCommandResult { + fn rejected(message: impl Into) -> Self { + Self::from_parts( + None, + None, + None, + None, + PluginServiceOutputCommandStatus::Rejected, + message, + ) + } + + fn rejected_for( + command: &PluginServiceOutputCommandEnvelope, + message: impl Into, + ) -> Self { + Self::from_command(command, PluginServiceOutputCommandStatus::Rejected, message) + } + + fn unsupported( + command: &PluginServiceOutputCommandEnvelope, + message: impl Into, + ) -> Self { + Self::from_command( + command, + PluginServiceOutputCommandStatus::Unsupported, + message, + ) + } + + fn recorded(command: &PluginServiceOutputCommandEnvelope, message: impl Into) -> Self { + Self::from_command(command, PluginServiceOutputCommandStatus::Recorded, message) + } + + fn from_command( + command: &PluginServiceOutputCommandEnvelope, + status: PluginServiceOutputCommandStatus, + message: impl Into, + ) -> Self { + Self::from_parts( + Some(command.correlation_id.clone()), + Some(command.source_event_id.clone()), + Some(command.command_id.clone()), + Some(command.kind), + status, + message, + ) + } + + fn from_parts( + correlation_id: Option, + source_event_id: Option, + command_id: Option, + kind: Option, + status: PluginServiceOutputCommandStatus, + message: impl Into, + ) -> Self { + Self { + correlation_id, + source_event_id, + command_id, + kind, + status, + message: bounded_message(redact_secret_like(&message.into())), + recorded_at: chrono::Utc::now().to_rfc3339_opts(chrono::SecondsFormat::Millis, true), + } + } +} + #[derive(Clone, Debug, PartialEq, Serialize)] pub struct PluginIngressDispatchReport { pub plugin_ref: String, pub ingress: String, pub accepted: bool, pub output: Value, + /// Results of host-side Service output command parsing/validation/grant-checking. + /// This path never feeds ordinary plugin ToolOutput handling. + pub output_command_results: Vec, pub queue_depth: usize, pub dispatch_counters: PluginIngressDispatchCounters, pub diagnostics: Vec, @@ -3185,6 +3336,7 @@ impl PluginInstanceHandle { ingress_queue_capacity: PLUGIN_SERVICE_INGRESS_QUEUE_CAPACITY, dispatch_counters: PluginIngressDispatchCounters::default(), last_error: None, + output_command_results: Vec::new(), diagnostics: Vec::new(), }; instance.start()?; @@ -3237,6 +3389,7 @@ struct PluginInstance { ingress_queue_capacity: usize, dispatch_counters: PluginIngressDispatchCounters, last_error: Option, + output_command_results: Vec, diagnostics: Vec, } @@ -3507,10 +3660,12 @@ impl PluginInstance { ingress_name: &str, event: PluginIngressEvent, ) -> Result { - match &mut self.runtime { - PluginInstanceRuntime::ComponentToolAdapter => Err(PluginWasmError::Module( - "component tool runtime does not expose ingress dispatch".to_string(), - )), + let output = match &mut self.runtime { + PluginInstanceRuntime::ComponentToolAdapter => { + return Err(PluginWasmError::Module( + "component tool runtime does not expose ingress dispatch".to_string(), + )); + } #[cfg(test)] PluginInstanceRuntime::TestIngress { tool_calls, @@ -3525,40 +3680,40 @@ impl PluginInstance { )); } *ingress_calls += 1; - let output = serde_json::json!({ + let mut output = serde_json::json!({ "ingress": ingress_name, - "kind": event.kind, - "source": event.source, - "ingress_name": event.ingress_name, + "kind": event.kind.clone(), + "source": event.source.clone(), + "ingress_name": event.ingress_name.clone(), "attempt": event.attempt, - "correlation_id": event.correlation_id, + "correlation_id": event.correlation_id.clone(), "calls": *tool_calls, "ingress_calls": *ingress_calls, - "payload": event.payload, + "payload": event.payload.clone(), }); - Ok(PluginIngressDispatchReport { - plugin_ref: self.record.identity.to_string(), - ingress: ingress_name.to_string(), - accepted: true, - output, - queue_depth: self.ingress_queue.len(), - dispatch_counters: self.dispatch_counters.clone(), - diagnostics: self.diagnostics.clone(), - }) + if let (Some(map), Some(commands)) = ( + output.as_object_mut(), + event.payload.get("output_commands").cloned(), + ) { + map.insert("output_commands".to_string(), commands); + } + output } PluginInstanceRuntime::ComponentInstance(runtime) => { - let output = runtime.handle_ingress(ingress_name, &event)?; - Ok(PluginIngressDispatchReport { - plugin_ref: self.record.identity.to_string(), - ingress: ingress_name.to_string(), - accepted: true, - output, - queue_depth: self.ingress_queue.len(), - dispatch_counters: self.dispatch_counters.clone(), - diagnostics: self.diagnostics.clone(), - }) + runtime.handle_ingress(ingress_name, &event)? } - } + }; + let output_command_results = self.process_service_output_commands(&output, &event); + Ok(PluginIngressDispatchReport { + plugin_ref: self.record.identity.to_string(), + ingress: ingress_name.to_string(), + accepted: true, + output, + output_command_results, + queue_depth: self.ingress_queue.len(), + dispatch_counters: self.dispatch_counters.clone(), + diagnostics: self.diagnostics.clone(), + }) } fn stop(&mut self) -> Result<(), PluginWasmError> { @@ -3608,6 +3763,7 @@ impl PluginInstance { queue_capacity: self.ingress_queue_capacity, last_error: self.last_error.clone(), dispatch_counters: self.dispatch_counters.clone(), + output_command_results: self.output_command_results.clone(), diagnostics: self.diagnostics.clone(), } } @@ -3656,6 +3812,262 @@ impl PluginInstance { } } + fn process_service_output_commands( + &mut self, + output: &Value, + event: &PluginIngressEvent, + ) -> Vec { + let results = match self.validate_service_output_commands(output, event) { + Ok(commands) => { + let mut results = Vec::with_capacity(commands.len()); + for command in commands { + results.push(self.execute_service_output_command(command)); + } + results + } + Err(results) => results, + }; + self.record_service_output_command_results(&results); + results + } + + fn validate_service_output_commands( + &self, + output: &Value, + event: &PluginIngressEvent, + ) -> Result, Vec> + { + let Some(values) = output.get("output_commands") else { + return Ok(Vec::new()); + }; + let Some(values) = values.as_array() else { + return Err(vec![PluginServiceOutputCommandResult::rejected( + "service output_commands must be an array", + )]); + }; + if values.len() > PLUGIN_SERVICE_OUTPUT_COMMAND_MAX_COUNT { + return Err(vec![PluginServiceOutputCommandResult::rejected(format!( + "service output_commands exceeds {} commands", + PLUGIN_SERVICE_OUTPUT_COMMAND_MAX_COUNT + ))]); + } + + let mut commands = Vec::with_capacity(values.len()); + let mut rejected = Vec::new(); + for value in values { + let command: PluginServiceOutputCommandEnvelope = + match serde_json::from_value(value.clone()) { + Ok(command) => command, + Err(error) => { + rejected.push(PluginServiceOutputCommandResult::rejected(format!( + "malformed service output command envelope: {error}" + ))); + continue; + } + }; + if let Err(message) = self.validate_service_output_command_envelope(&command, event) { + rejected.push(PluginServiceOutputCommandResult::rejected_for( + &command, message, + )); + continue; + } + if let Err(message) = self.grant_check_service_output_command(&command) { + rejected.push(PluginServiceOutputCommandResult::rejected_for( + &command, message, + )); + continue; + } + commands.push(command); + } + + if rejected.is_empty() { + Ok(commands) + } else { + Err(rejected) + } + } + + fn validate_service_output_command_envelope( + &self, + command: &PluginServiceOutputCommandEnvelope, + event: &PluginIngressEvent, + ) -> Result<(), String> { + validate_output_command_id("correlation_id", &command.correlation_id)?; + validate_output_command_id("source_event_id", &command.source_event_id)?; + validate_output_command_id("command_id", &command.command_id)?; + if command.source_event_id != event.correlation_id { + return Err("source_event_id must match the ingress event correlation_id".to_string()); + } + chrono::DateTime::parse_from_rfc3339(&command.requested_at) + .map_err(|error| format!("requested_at must be RFC3339: {error}"))?; + let payload_bytes = serde_json::to_vec(&command.payload) + .map_err(|error| format!("payload is not serializable JSON: {error}"))?; + if payload_bytes.len() > PLUGIN_SERVICE_OUTPUT_COMMAND_MAX_PAYLOAD_BYTES { + return Err(format!( + "payload exceeds {} bytes", + PLUGIN_SERVICE_OUTPUT_COMMAND_MAX_PAYLOAD_BYTES + )); + } + match command.kind { + PluginServiceOutputCommandKind::DiagnosticStatusUpdate => { + serde_json::from_value::( + command.payload.clone(), + ) + .map_err(|error| format!("invalid diagnostic_status_update payload: {error}"))?; + } + PluginServiceOutputCommandKind::HostRequestDispatch => { + let request: PluginRequestRequest = serde_json::from_value(command.payload.clone()) + .map_err(|error| format!("invalid host_request_dispatch payload: {error}"))?; + validate_plugin_request_request(&self.record, &request) + .map_err(|error| format!("host_request_dispatch target denied: {}", error.0))?; + } + PluginServiceOutputCommandKind::WebSocketSend => { + let payload: PluginServiceWebSocketSendCommandPayload = + serde_json::from_value(command.payload.clone()) + .map_err(|error| format!("invalid websocket_send payload: {error}"))?; + self.validate_service_websocket_send_payload(&payload)?; + } + } + Ok(()) + } + + fn grant_check_service_output_command( + &self, + command: &PluginServiceOutputCommandEnvelope, + ) -> Result<(), String> { + match command.kind { + PluginServiceOutputCommandKind::DiagnosticStatusUpdate => Ok(()), + PluginServiceOutputCommandKind::HostRequestDispatch => { + authorize_plugin_host_api(&self.record, PluginHostApi::Request) + .map_err(|error| format!("host_request_dispatch not granted: {}", error.0)) + } + PluginServiceOutputCommandKind::WebSocketSend => { + authorize_plugin_host_api(&self.record, PluginHostApi::WebSocket) + .map_err(|error| format!("websocket_send not granted: {}", error.0)) + } + } + } + + fn validate_service_websocket_send_payload( + &self, + payload: &PluginServiceWebSocketSendCommandPayload, + ) -> Result<(), String> { + if payload.text.len() > PLUGIN_WEBSOCKET_MAX_MESSAGE_BYTES { + return Err(format!( + "websocket_send text exceeds {} bytes", + PLUGIN_WEBSOCKET_MAX_MESSAGE_BYTES + )); + } + let url = reqwest::Url::parse(&payload.url) + .map_err(|error| format!("invalid WebSocket URL: {error}"))?; + match url.scheme() { + "ws" | "wss" => {} + "http" | "https" => { + return Err("HTTP URLs are not supported by websocket_send".to_string()); + } + scheme => { + return Err(format!( + "unsupported WebSocket URL scheme {scheme:?}; only ws and wss are allowed" + )); + } + } + if url.host_str().is_none() { + return Err("WebSocket URL must include a host".to_string()); + } + if !url.username().is_empty() || url.password().is_some() { + return Err("WebSocket URLs with embedded credentials are not allowed".to_string()); + } + validate_static_request_target(&url).map_err(|error| error.0)?; + authorize_websocket_allowlist(&self.record, &url).map_err(|error| error.0)?; + Ok(()) + } + + fn execute_service_output_command( + &mut self, + command: PluginServiceOutputCommandEnvelope, + ) -> PluginServiceOutputCommandResult { + match command.kind { + PluginServiceOutputCommandKind::DiagnosticStatusUpdate => { + let payload: PluginServiceDiagnosticStatusCommandPayload = + match serde_json::from_value(command.payload.clone()) { + Ok(payload) => payload, + Err(error) => { + return PluginServiceOutputCommandResult::rejected_for( + &command, + format!("invalid diagnostic_status_update payload: {error}"), + ); + } + }; + if let Some(status) = payload.status { + self.component_status = Some(status); + } + let message = payload + .message + .as_deref() + .map(bounded_message) + .unwrap_or_else(|| "plugin service status update recorded".to_string()); + PluginServiceOutputCommandResult::recorded(&command, message) + } + PluginServiceOutputCommandKind::HostRequestDispatch => { + PluginServiceOutputCommandResult::unsupported( + &command, + "host_request_dispatch output command is grant-checked but transport dispatch is unsupported in v0", + ) + } + PluginServiceOutputCommandKind::WebSocketSend => { + PluginServiceOutputCommandResult::unsupported( + &command, + "websocket_send output command is grant-checked but WebSocket send transport is unsupported in v0", + ) + } + } + } + + fn record_service_output_command_results( + &mut self, + results: &[PluginServiceOutputCommandResult], + ) { + if results.is_empty() { + return; + } + for result in results { + let kind = match result.status { + PluginServiceOutputCommandStatus::Recorded => { + PluginInstanceDiagnosticKind::ServiceOutputCommandRecorded + } + PluginServiceOutputCommandStatus::Rejected => { + PluginInstanceDiagnosticKind::ServiceOutputCommandRejected + } + PluginServiceOutputCommandStatus::Unsupported => { + PluginInstanceDiagnosticKind::ServiceOutputCommandUnsupported + } + }; + let command_label = result.command_id.as_deref().unwrap_or(""); + let command_kind = result + .kind + .map(PluginServiceOutputCommandKind::as_str) + .unwrap_or("unknown"); + let message = bounded_message(format!( + "service output command {command_label} ({command_kind}): {}", + result.message + )); + if !matches!(result.status, PluginServiceOutputCommandStatus::Recorded) { + self.last_error = Some(message.clone()); + } + self.diagnostics.push(PluginInstanceDiagnostic::with_kind( + kind, + self.lifecycle.clone(), + message, + )); + } + self.output_command_results.extend_from_slice(results); + if self.output_command_results.len() > PLUGIN_SERVICE_OUTPUT_COMMAND_MAX_RESULTS { + let keep_from = + self.output_command_results.len() - PLUGIN_SERVICE_OUTPUT_COMMAND_MAX_RESULTS; + self.output_command_results.drain(0..keep_from); + } + } + fn record_runtime_error( &mut self, kind: PluginInstanceDiagnosticKind, @@ -4413,6 +4825,15 @@ fn bounded_message(message: impl Into) -> String { sanitized } +fn validate_output_command_id(field: &str, value: &str) -> Result<(), String> { + if value.is_empty() || value.len() > 128 || value.chars().any(char::is_control) { + return Err(format!( + "{field} is empty, too long, or contains control characters" + )); + } + Ok(()) +} + fn validate_declared_tool_names(record: &ResolvedPluginRecord) -> Result<(), FeatureInstallError> { let mut seen = HashSet::new(); for tool in &record.manifest.tools { @@ -4818,6 +5239,51 @@ mod tests { PluginIngressEvent::new(ingress_name, "test", "unit", payload) } + fn service_output_command( + event: &PluginIngressEvent, + command_id: &str, + kind: &str, + payload: Value, + ) -> Value { + json!({ + "correlation_id": event.correlation_id.clone(), + "source_event_id": event.correlation_id.clone(), + "command_id": command_id, + "kind": kind, + "payload": payload, + "requested_at": event.created_at.clone(), + }) + } + + fn add_request_output_grant(record: &mut ResolvedPluginRecord) { + let permission = PluginPermission::host_api(PluginHostApi::Request); + record.manifest.permissions.push(permission.clone()); + record.grants.permissions.push(permission); + let target = PluginRequestGrant { + scheme: "https".to_string(), + host: "api.example.test".to_string(), + port: None, + methods: vec!["POST".to_string()], + path_prefixes: vec!["/v1".to_string()], + }; + record.manifest.request.push(target.clone()); + record.grants.request.push(target); + } + + fn add_websocket_output_grant(record: &mut ResolvedPluginRecord) { + let permission = PluginPermission::host_api(PluginHostApi::WebSocket); + record.manifest.permissions.push(permission.clone()); + record.grants.permissions.push(permission); + let target = PluginWebSocketGrant { + scheme: "wss".to_string(), + host: "ws.example.test".to_string(), + port: None, + path_prefixes: vec!["/events".to_string()], + }; + record.manifest.websocket.push(target.clone()); + record.grants.websocket.push(target); + } + #[test] fn service_selected_ignores_unselected_tool_without_grants() { let mut record = record(vec![tool("hidden_tool")]); @@ -5060,6 +5526,147 @@ mod tests { ); } + #[test] + fn service_output_command_records_diagnostic_status_separately_from_tool_output() { + let handle = PluginInstanceHandle::new(test_service_ingress_record()).unwrap(); + let mut event = test_ingress_event("shared_ingress", json!({})); + let command = service_output_command( + &event, + "cmd-status", + "diagnostic_status_update", + json!({ + "message": "service became ready", + "status": {"ready": true} + }), + ); + event.payload = json!({"output_commands": [command]}); + + let report = handle.deliver_ingress("shared_ingress", event).unwrap(); + + assert_eq!(report.output_command_results.len(), 1); + assert_eq!( + report.output_command_results[0].kind, + Some(PluginServiceOutputCommandKind::DiagnosticStatusUpdate) + ); + assert_eq!( + report.output_command_results[0].status, + PluginServiceOutputCommandStatus::Recorded + ); + assert_eq!( + report.output["payload"]["output_commands"] + .as_array() + .unwrap() + .len(), + 1 + ); + let status = handle.status(); + assert_eq!(status.component_status, Some(json!({"ready": true}))); + assert_eq!(status.output_command_results.len(), 1); + assert!(status.diagnostics.iter().any(|diagnostic| { + diagnostic.kind == PluginInstanceDiagnosticKind::ServiceOutputCommandRecorded + && diagnostic.message.contains("cmd-status") + })); + } + + #[test] + fn service_output_command_rejects_ungranted_request_without_executing_status_update() { + let handle = PluginInstanceHandle::new(test_service_ingress_record()).unwrap(); + let mut event = test_ingress_event("shared_ingress", json!({})); + let status_command = service_output_command( + &event, + "cmd-status", + "diagnostic_status_update", + json!({"status": {"should_not_record": true}}), + ); + let request_command = service_output_command( + &event, + "cmd-request", + "host_request_dispatch", + json!({ + "method": "POST", + "url": "https://api.example.test/v1/events" + }), + ); + event.payload = json!({"output_commands": [status_command, request_command]}); + + let report = handle.deliver_ingress("shared_ingress", event).unwrap(); + + assert_eq!(report.output_command_results.len(), 1); + assert_eq!( + report.output_command_results[0].status, + PluginServiceOutputCommandStatus::Rejected + ); + assert_eq!(handle.status().component_status, None); + assert!(handle.status().diagnostics.iter().any(|diagnostic| { + diagnostic.kind == PluginInstanceDiagnosticKind::ServiceOutputCommandRejected + && diagnostic.message.contains("cmd-request") + })); + } + + #[test] + fn service_output_command_placeholders_are_grant_checked_and_unsupported() { + let mut record = test_service_ingress_record(); + add_request_output_grant(&mut record); + add_websocket_output_grant(&mut record); + let handle = PluginInstanceHandle::new(record).unwrap(); + let mut event = test_ingress_event("shared_ingress", json!({})); + let request_command = service_output_command( + &event, + "cmd-request", + "host_request_dispatch", + json!({ + "method": "POST", + "url": "https://api.example.test/v1/events" + }), + ); + let websocket_command = service_output_command( + &event, + "cmd-websocket", + "websocket_send", + json!({ + "url": "wss://ws.example.test/events", + "text": "hello" + }), + ); + event.payload = json!({"output_commands": [request_command, websocket_command]}); + + let report = handle.deliver_ingress("shared_ingress", event).unwrap(); + + assert_eq!(report.output_command_results.len(), 2); + assert!( + report + .output_command_results + .iter() + .all(|result| { result.status == PluginServiceOutputCommandStatus::Unsupported }) + ); + let status = handle.status(); + assert_eq!(status.output_command_results.len(), 2); + assert!(status.diagnostics.iter().any(|diagnostic| { + diagnostic.kind == PluginInstanceDiagnosticKind::ServiceOutputCommandUnsupported + && diagnostic.message.contains("cmd-websocket") + })); + } + + #[test] + fn service_output_command_rejects_malformed_envelope() { + let handle = PluginInstanceHandle::new(test_service_ingress_record()).unwrap(); + let mut event = test_ingress_event("shared_ingress", json!({})); + event.payload = json!({"output_commands": [{"kind": "diagnostic_status_update"}]}); + + let report = handle.deliver_ingress("shared_ingress", event).unwrap(); + + assert_eq!(report.output_command_results.len(), 1); + assert_eq!( + report.output_command_results[0].status, + PluginServiceOutputCommandStatus::Rejected + ); + assert!( + report.output_command_results[0] + .message + .contains("malformed service output command envelope") + ); + } + #[test] fn installed_ingress_dispatch_uses_retained_shared_instance() { let mut record = record(vec![tool("shared_tool")]); @@ -5144,6 +5751,7 @@ mod tests { ingress_queue_capacity: PLUGIN_SERVICE_INGRESS_QUEUE_CAPACITY, dispatch_counters: PluginIngressDispatchCounters::default(), last_error: None, + output_command_results: Vec::new(), diagnostics: Vec::new(), })));