diff --git a/crates/pod/src/feature/plugin.rs b/crates/pod/src/feature/plugin.rs index ca05dddd..cf880c5b 100644 --- a/crates/pod/src/feature/plugin.rs +++ b/crates/pod/src/feature/plugin.rs @@ -7,7 +7,7 @@ //! host APIs through explicit imports with matching permissions and scoped //! allowlist grants. -use std::collections::{HashMap, HashSet}; +use std::collections::{HashMap, HashSet, VecDeque}; use std::fs; use std::io::{Read as _, Write as _}; use std::net::{IpAddr, SocketAddr, ToSocketAddrs}; @@ -103,16 +103,18 @@ impl PluginToolFeature { &self, ingress_name: &str, event: PluginIngressEvent, - ) -> Result { + ) -> Result { if !surface_enabled(&self.record, PluginSurface::Ingress) { - return Err(PluginWasmError::Module( + return Err(PluginIngressDispatchError::InvalidEvent( "plugin ingress surface is not enabled".to_string(), )); } let handle = self .registry .handle(&self.record.identity.to_string()) - .ok_or_else(|| PluginWasmError::Module("plugin instance is not started".to_string()))?; + .ok_or_else(|| PluginIngressDispatchError::ServiceUnavailable { + state: PluginInstanceLifecycleState::Stopped, + })?; handle.deliver_ingress(ingress_name, event) } @@ -2280,6 +2282,11 @@ const PLUGIN_WASM_MAX_OUTPUT_BYTES: usize = 64 * 1024; const PLUGIN_WASM_MAX_SUMMARY_BYTES: usize = 1024; const PLUGIN_WASM_FUEL: u64 = 5_000_000; const PLUGIN_WASM_TIMEOUT: Duration = Duration::from_secs(1); +const PLUGIN_SERVICE_INGRESS_QUEUE_CAPACITY: usize = 32; +#[cfg(test)] +const PLUGIN_SERVICE_INGRESS_DISPATCH_TIMEOUT: Duration = Duration::from_millis(25); +#[cfg(not(test))] +const PLUGIN_SERVICE_INGRESS_DISPATCH_TIMEOUT: Duration = Duration::from_secs(1); const PLUGIN_WASM_MEMORY_BYTES: usize = 2 * 1024 * 1024; const PLUGIN_WASM_TABLE_ELEMENTS: usize = 256; const PLUGIN_REQUEST_MAX_REQUEST_BYTES: usize = 48 * 1024; @@ -2933,31 +2940,68 @@ impl PluginRequestError { #[derive(Clone, Debug, PartialEq, Eq, Serialize)] pub enum PluginInstanceLifecycleState { Ready, - Started, + Starting, + Running, + Stopping, Stopped, Failed, } +#[derive(Clone, Debug, PartialEq, Eq, Serialize)] +pub enum PluginInstanceDiagnosticKind { + Lifecycle, + InvalidEvent, + QueueFull, + DispatchTimeout, + DispatchFailed, + ServiceUnavailable, + ServiceFailed, + ServiceStopped, +} + #[derive(Clone, Debug, PartialEq, Eq, Serialize)] pub struct PluginInstanceDiagnostic { + pub kind: PluginInstanceDiagnosticKind, pub state: PluginInstanceLifecycleState, pub message: String, } impl PluginInstanceDiagnostic { pub fn new(state: PluginInstanceLifecycleState, message: impl Into) -> Self { + Self::with_kind(PluginInstanceDiagnosticKind::Lifecycle, state, message) + } + + pub fn with_kind( + kind: PluginInstanceDiagnosticKind, + state: PluginInstanceLifecycleState, + message: impl Into, + ) -> Self { Self { + kind, state, - message: bounded_message(message.into()), + message: bounded_message(redact_secret_like(&message.into())), } } } +#[derive(Clone, Debug, Default, PartialEq, Eq, Serialize)] +pub struct PluginIngressDispatchCounters { + pub enqueued: u64, + pub dispatched: u64, + pub rejected: u64, + pub failed: u64, + pub timed_out: u64, +} + #[derive(Clone, Debug, PartialEq, Serialize)] pub struct PluginInstanceStatus { pub plugin_ref: String, pub lifecycle: PluginInstanceLifecycleState, pub component_status: Option, + pub queue_depth: usize, + pub queue_capacity: usize, + pub last_error: Option, + pub dispatch_counters: PluginIngressDispatchCounters, pub diagnostics: Vec, } @@ -2965,7 +3009,30 @@ pub struct PluginInstanceStatus { pub struct PluginIngressEvent { pub kind: String, pub source: String, + pub ingress_name: String, pub payload: Value, + pub created_at: String, + pub attempt: u32, + pub correlation_id: String, +} + +impl PluginIngressEvent { + pub fn new( + ingress_name: impl Into, + kind: impl Into, + source: impl Into, + payload: Value, + ) -> Self { + Self { + kind: kind.into(), + source: source.into(), + ingress_name: ingress_name.into(), + payload, + created_at: chrono::Utc::now().to_rfc3339_opts(chrono::SecondsFormat::Millis, true), + attempt: 1, + correlation_id: uuid::Uuid::now_v7().to_string(), + } + } } #[derive(Clone, Debug, PartialEq, Serialize)] @@ -2974,9 +3041,80 @@ pub struct PluginIngressDispatchReport { pub ingress: String, pub accepted: bool, pub output: Value, + pub queue_depth: usize, + pub dispatch_counters: PluginIngressDispatchCounters, pub diagnostics: Vec, } +#[derive(Clone, Debug, PartialEq, Eq)] +pub enum PluginIngressDispatchError { + InvalidEvent(String), + QueueFull { capacity: usize }, + ServiceUnavailable { state: PluginInstanceLifecycleState }, + ServiceFailed(String), + ServiceStopped { state: PluginInstanceLifecycleState }, + DispatchTimeout { timeout: Duration }, + DispatchFailed(String), +} + +impl PluginIngressDispatchError { + fn bounded_message(&self) -> String { + match self { + Self::InvalidEvent(message) => bounded_message(format!( + "invalid plugin ingress event: {}", + redact_secret_like(message) + )), + Self::QueueFull { capacity } => bounded_message(format!( + "plugin ingress queue is full (capacity {capacity})" + )), + Self::ServiceUnavailable { state } => bounded_message(format!( + "plugin service is not running for ingress dispatch (state {state:?})" + )), + Self::ServiceFailed(message) => bounded_message(format!( + "plugin service is failed for ingress dispatch: {}", + redact_secret_like(message) + )), + Self::ServiceStopped { state } => bounded_message(format!( + "plugin service rejects ingress while stopping/stopped (state {state:?})" + )), + Self::DispatchTimeout { timeout } => bounded_message(format!( + "plugin ingress dispatch timed out after {timeout:?}" + )), + Self::DispatchFailed(message) => bounded_message(format!( + "plugin ingress dispatch failed closed: {}", + redact_secret_like(message) + )), + } + } + + fn diagnostic_kind(&self) -> PluginInstanceDiagnosticKind { + match self { + Self::InvalidEvent(_) => PluginInstanceDiagnosticKind::InvalidEvent, + Self::QueueFull { .. } => PluginInstanceDiagnosticKind::QueueFull, + Self::ServiceUnavailable { .. } => PluginInstanceDiagnosticKind::ServiceUnavailable, + Self::ServiceFailed(_) => PluginInstanceDiagnosticKind::ServiceFailed, + Self::ServiceStopped { .. } => PluginInstanceDiagnosticKind::ServiceStopped, + Self::DispatchTimeout { .. } => PluginInstanceDiagnosticKind::DispatchTimeout, + Self::DispatchFailed(_) => PluginInstanceDiagnosticKind::DispatchFailed, + } + } +} + +impl std::fmt::Display for PluginIngressDispatchError { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + f.write_str(&self.bounded_message()) + } +} + +impl std::error::Error for PluginIngressDispatchError {} + +#[derive(Clone, Debug)] +struct QueuedPluginIngress { + ingress_name: String, + event: PluginIngressEvent, + enqueued_at: Instant, +} + #[derive(Clone, Default)] pub struct PluginInstanceRegistry { instances: Arc>>, @@ -3043,6 +3181,10 @@ impl PluginInstanceHandle { runtime, lifecycle: PluginInstanceLifecycleState::Ready, component_status: None, + ingress_queue: VecDeque::new(), + ingress_queue_capacity: PLUGIN_SERVICE_INGRESS_QUEUE_CAPACITY, + dispatch_counters: PluginIngressDispatchCounters::default(), + last_error: None, diagnostics: Vec::new(), }; instance.start()?; @@ -3060,11 +3202,10 @@ impl PluginInstanceHandle { &self, ingress_name: &str, event: PluginIngressEvent, - ) -> Result { - self.0 - .lock() - .expect("plugin instance poisoned") - .deliver_ingress(ingress_name, event) + ) -> Result { + let mut instance = self.0.lock().expect("plugin instance poisoned"); + instance.enqueue_ingress(ingress_name, event)?; + instance.dispatch_next_ingress() } pub fn status(&self) -> PluginInstanceStatus { @@ -3081,6 +3222,7 @@ impl PluginInstanceHandle { fn record_diagnostic(&self, diagnostic: PluginInstanceDiagnostic) { if let Ok(mut instance) = self.0.lock() { instance.lifecycle = diagnostic.state.clone(); + instance.last_error = Some(diagnostic.message.clone()); instance.diagnostics.push(diagnostic); } } @@ -3091,28 +3233,56 @@ struct PluginInstance { runtime: PluginInstanceRuntime, lifecycle: PluginInstanceLifecycleState, component_status: Option, + ingress_queue: VecDeque, + ingress_queue_capacity: usize, + dispatch_counters: PluginIngressDispatchCounters, + last_error: Option, diagnostics: Vec, } impl PluginInstance { fn start(&mut self) -> Result<(), PluginWasmError> { - match &mut self.runtime { + self.lifecycle = PluginInstanceLifecycleState::Starting; + let start_result = match &mut self.runtime { PluginInstanceRuntime::ComponentToolAdapter => { self.lifecycle = PluginInstanceLifecycleState::Ready; self.diagnostics.push(PluginInstanceDiagnostic::new( PluginInstanceLifecycleState::Ready, "component tool runtime registered behind PluginInstanceRegistry", )); + Ok(()) } #[cfg(test)] PluginInstanceRuntime::TestIngress { .. } => { - self.lifecycle = PluginInstanceLifecycleState::Started; + self.lifecycle = PluginInstanceLifecycleState::Running; + self.diagnostics.push(PluginInstanceDiagnostic::new( + PluginInstanceLifecycleState::Running, + "test ingress runtime initialized", + )); + Ok(()) } PluginInstanceRuntime::ComponentInstance(runtime) => { - let status = runtime.start(&self.record)?; - self.component_status = Some(status); - self.lifecycle = PluginInstanceLifecycleState::Started; + match runtime.start(&self.record) { + Ok(status) => { + self.component_status = Some(status); + self.lifecycle = PluginInstanceLifecycleState::Running; + self.diagnostics.push(PluginInstanceDiagnostic::new( + PluginInstanceLifecycleState::Running, + "component instance start returned; host-managed ingress queue is running", + )); + Ok(()) + } + Err(error) => Err(error), + } } + }; + if let Err(error) = start_result { + self.lifecycle = PluginInstanceLifecycleState::Failed; + self.record_runtime_error( + PluginInstanceDiagnosticKind::ServiceFailed, + format!("plugin component start failed: {}", error.bounded_message()), + ); + return Err(error); } Ok(()) } @@ -3149,10 +3319,10 @@ impl PluginInstance { run_plugin_component_tool(self.record.clone(), tool_name.to_string(), input) } #[cfg(test)] - PluginInstanceRuntime::TestIngress { calls } => { - *calls += 1; + PluginInstanceRuntime::TestIngress { tool_calls, .. } => { + *tool_calls += 1; Ok(ToolOutput { - summary: format!("{tool_name}: {calls}"), + summary: format!("{tool_name}: {tool_calls}"), content: Some(String::from_utf8_lossy(&input).to_string()), }) } @@ -3162,53 +3332,208 @@ impl PluginInstance { } } - fn deliver_ingress( + fn enqueue_ingress( &mut self, ingress_name: &str, event: PluginIngressEvent, - ) -> Result { + ) -> Result<(), PluginIngressDispatchError> { + self.validate_ingress_event(ingress_name, &event) + .map_err(|error| self.record_rejection(error))?; + if self.ingress_queue.len() >= self.ingress_queue_capacity { + let error = PluginIngressDispatchError::QueueFull { + capacity: self.ingress_queue_capacity, + }; + return Err(self.record_rejection(error)); + } + self.ingress_queue.push_back(QueuedPluginIngress { + ingress_name: ingress_name.to_string(), + event, + enqueued_at: Instant::now(), + }); + self.dispatch_counters.enqueued += 1; + Ok(()) + } + + fn validate_ingress_event( + &self, + ingress_name: &str, + event: &PluginIngressEvent, + ) -> Result<(), PluginIngressDispatchError> { if !surface_enabled(&self.record, PluginSurface::Ingress) { - return Err(PluginWasmError::Module( + return Err(PluginIngressDispatchError::InvalidEvent( "plugin ingress surface is not enabled".to_string(), )); } - if serde_json::to_vec(&event) + match self.lifecycle { + PluginInstanceLifecycleState::Running => {} + PluginInstanceLifecycleState::Failed => { + return Err(PluginIngressDispatchError::ServiceFailed( + self.last_error + .clone() + .unwrap_or_else(|| "service is failed".to_string()), + )); + } + PluginInstanceLifecycleState::Stopping | PluginInstanceLifecycleState::Stopped => { + return Err(PluginIngressDispatchError::ServiceStopped { + state: self.lifecycle.clone(), + }); + } + _ => { + return Err(PluginIngressDispatchError::ServiceUnavailable { + state: self.lifecycle.clone(), + }); + } + } + if event.source.trim().is_empty() { + return Err(PluginIngressDispatchError::InvalidEvent( + "source must not be empty".to_string(), + )); + } + if event.ingress_name != ingress_name { + return Err(PluginIngressDispatchError::InvalidEvent(format!( + "event ingress `{}` does not match dispatch ingress `{ingress_name}`", + event.ingress_name + ))); + } + if event.kind.trim().is_empty() { + return Err(PluginIngressDispatchError::InvalidEvent( + "event kind must not be empty".to_string(), + )); + } + if event.created_at.trim().is_empty() { + return Err(PluginIngressDispatchError::InvalidEvent( + "created_at must not be empty".to_string(), + )); + } + if event.attempt == 0 { + return Err(PluginIngressDispatchError::InvalidEvent( + "attempt must be greater than zero".to_string(), + )); + } + if event.correlation_id.trim().is_empty() { + return Err(PluginIngressDispatchError::InvalidEvent( + "correlation_id must not be empty".to_string(), + )); + } + if serde_json::to_vec(event) .map(|bytes| bytes.len()) .unwrap_or(usize::MAX) > PLUGIN_WASM_MAX_INPUT_BYTES { - return Err(PluginWasmError::Module(format!( + return Err(PluginIngressDispatchError::InvalidEvent(format!( "plugin ingress event exceeds {} bytes", PLUGIN_WASM_MAX_INPUT_BYTES ))); } - self.record + let ingress = self + .record .manifest .ingresses .iter() .find(|ingress| ingress.name == ingress_name) .ok_or_else(|| { - PluginWasmError::Module( + PluginIngressDispatchError::InvalidEvent( "requested ingress is not declared by plugin manifest".to_string(), ) })?; + if !ingress.event_kinds.is_empty() + && !ingress.event_kinds.iter().any(|kind| kind == &event.kind) + { + return Err(PluginIngressDispatchError::InvalidEvent(format!( + "event kind `{}` is not declared for ingress `{ingress_name}`", + event.kind + ))); + } authorize_plugin_ingress(&self.record, ingress_name).map_err(|error| { - PluginWasmError::Module(format!( + PluginIngressDispatchError::InvalidEvent(format!( "plugin ingress permission denied: {}", error.bounded_message() )) })?; + Ok(()) + } + + fn dispatch_next_ingress( + &mut self, + ) -> Result { + let Some(queued) = self.ingress_queue.pop_front() else { + return Err(PluginIngressDispatchError::InvalidEvent( + "plugin ingress queue is empty".to_string(), + )); + }; + let started_at = Instant::now(); + let result = self.dispatch_ingress_now(&queued.ingress_name, queued.event); + let elapsed = started_at.elapsed(); + if elapsed > PLUGIN_SERVICE_INGRESS_DISPATCH_TIMEOUT { + let error = PluginIngressDispatchError::DispatchTimeout { + timeout: PLUGIN_SERVICE_INGRESS_DISPATCH_TIMEOUT, + }; + self.dispatch_counters.timed_out += 1; + self.dispatch_counters.failed += 1; + self.lifecycle = PluginInstanceLifecycleState::Failed; + self.record_dispatch_error(&error); + return Err(error); + } + match result { + Ok(mut report) => { + let queue_latency_ms = queued.enqueued_at.elapsed().as_millis() as u64; + if report.output.get("queue_latency_ms").is_none() { + if let Some(map) = report.output.as_object_mut() { + map.insert( + "queue_latency_ms".to_string(), + Value::from(queue_latency_ms), + ); + } + } + self.dispatch_counters.dispatched += 1; + report.queue_depth = self.ingress_queue.len(); + report.dispatch_counters = self.dispatch_counters.clone(); + report.diagnostics = self.diagnostics.clone(); + Ok(report) + } + Err(error) => { + self.dispatch_counters.failed += 1; + self.lifecycle = PluginInstanceLifecycleState::Failed; + let dispatch_error = + PluginIngressDispatchError::DispatchFailed(error.bounded_message()); + self.record_dispatch_error(&dispatch_error); + Err(dispatch_error) + } + } + } + + fn dispatch_ingress_now( + &mut self, + ingress_name: &str, + event: PluginIngressEvent, + ) -> Result { match &mut self.runtime { PluginInstanceRuntime::ComponentToolAdapter => Err(PluginWasmError::Module( "component tool runtime does not expose ingress dispatch".to_string(), )), #[cfg(test)] - PluginInstanceRuntime::TestIngress { calls } => { + PluginInstanceRuntime::TestIngress { + tool_calls, + ingress_calls, + } => { + if let Some(sleep_ms) = event.payload.get("sleep_ms").and_then(Value::as_u64) { + std::thread::sleep(Duration::from_millis(sleep_ms)); + } + if event.payload.get("fail").and_then(Value::as_bool) == Some(true) { + return Err(PluginWasmError::Execution( + "test ingress requested failure".to_string(), + )); + } + *ingress_calls += 1; let output = serde_json::json!({ "ingress": ingress_name, "kind": event.kind, "source": event.source, - "calls": *calls, + "ingress_name": event.ingress_name, + "attempt": event.attempt, + "correlation_id": event.correlation_id, + "calls": *tool_calls, + "ingress_calls": *ingress_calls, "payload": event.payload, }); Ok(PluginIngressDispatchReport { @@ -3216,6 +3541,8 @@ impl PluginInstance { ingress: ingress_name.to_string(), accepted: true, output, + queue_depth: self.ingress_queue.len(), + dispatch_counters: self.dispatch_counters.clone(), diagnostics: self.diagnostics.clone(), }) } @@ -3226,6 +3553,8 @@ impl PluginInstance { ingress: ingress_name.to_string(), accepted: true, output, + queue_depth: self.ingress_queue.len(), + dispatch_counters: self.dispatch_counters.clone(), diagnostics: self.diagnostics.clone(), }) } @@ -3233,15 +3562,40 @@ impl PluginInstance { } fn stop(&mut self) -> Result<(), PluginWasmError> { - match &mut self.runtime { - PluginInstanceRuntime::ComponentToolAdapter => {} + if self.lifecycle == PluginInstanceLifecycleState::Stopped { + return Ok(()); + } + self.lifecycle = PluginInstanceLifecycleState::Stopping; + self.diagnostics.push(PluginInstanceDiagnostic::new( + PluginInstanceLifecycleState::Stopping, + "plugin service stop requested; ingress queue is closed", + )); + self.ingress_queue.clear(); + let stop_result = match &mut self.runtime { + PluginInstanceRuntime::ComponentToolAdapter => Ok(()), #[cfg(test)] - PluginInstanceRuntime::TestIngress { .. } => {} - PluginInstanceRuntime::ComponentInstance(runtime) => { - self.component_status = Some(runtime.stop()?); - } + PluginInstanceRuntime::TestIngress { .. } => Ok(()), + PluginInstanceRuntime::ComponentInstance(runtime) => match runtime.stop() { + Ok(status) => { + self.component_status = Some(status); + Ok(()) + } + Err(error) => Err(error), + }, + }; + if let Err(error) = stop_result { + self.lifecycle = PluginInstanceLifecycleState::Failed; + self.record_runtime_error( + PluginInstanceDiagnosticKind::ServiceFailed, + format!("plugin component stop failed: {}", error.bounded_message()), + ); + return Err(error); } self.lifecycle = PluginInstanceLifecycleState::Stopped; + self.diagnostics.push(PluginInstanceDiagnostic::new( + PluginInstanceLifecycleState::Stopped, + "plugin service stopped", + )); Ok(()) } @@ -3250,40 +3604,79 @@ impl PluginInstance { plugin_ref: self.record.identity.to_string(), lifecycle: self.lifecycle.clone(), component_status: self.component_status.clone(), + queue_depth: self.ingress_queue.len(), + queue_capacity: self.ingress_queue_capacity, + last_error: self.last_error.clone(), + dispatch_counters: self.dispatch_counters.clone(), diagnostics: self.diagnostics.clone(), } } fn status(&mut self) -> PluginInstanceStatus { - if let PluginInstanceRuntime::ComponentInstance(runtime) = &mut self.runtime { - match runtime.status() { - Ok(status) => self.component_status = Some(status), - Err(error) => { - self.lifecycle = PluginInstanceLifecycleState::Failed; - self.diagnostics.push(PluginInstanceDiagnostic::new( - PluginInstanceLifecycleState::Failed, - format!( - "plugin component status failed: {}", - error.bounded_message() - ), - )); + if self.lifecycle == PluginInstanceLifecycleState::Running { + if let PluginInstanceRuntime::ComponentInstance(runtime) = &mut self.runtime { + match runtime.status() { + Ok(status) => self.component_status = Some(status), + Err(error) => { + self.lifecycle = PluginInstanceLifecycleState::Failed; + self.record_runtime_error( + PluginInstanceDiagnosticKind::ServiceFailed, + format!( + "plugin component status failed: {}", + error.bounded_message() + ), + ); + } } } } - PluginInstanceStatus { - plugin_ref: self.record.identity.to_string(), - lifecycle: self.lifecycle.clone(), - component_status: self.component_status.clone(), - diagnostics: self.diagnostics.clone(), + self.snapshot_status() + } + + fn record_rejection( + &mut self, + error: PluginIngressDispatchError, + ) -> PluginIngressDispatchError { + self.dispatch_counters.rejected += 1; + self.record_dispatch_error(&error); + error + } + + fn record_dispatch_error(&mut self, error: &PluginIngressDispatchError) { + let state = match error { + PluginIngressDispatchError::DispatchFailed(_) + | PluginIngressDispatchError::DispatchTimeout { .. } + | PluginIngressDispatchError::ServiceFailed(_) => PluginInstanceLifecycleState::Failed, + PluginIngressDispatchError::ServiceStopped { .. } => self.lifecycle.clone(), + _ => self.lifecycle.clone(), + }; + self.record_runtime_error(error.diagnostic_kind(), error.bounded_message()); + if matches!(state, PluginInstanceLifecycleState::Failed) { + self.lifecycle = PluginInstanceLifecycleState::Failed; } } + + fn record_runtime_error( + &mut self, + kind: PluginInstanceDiagnosticKind, + message: impl Into, + ) { + let message = bounded_message(redact_secret_like(&message.into())); + self.last_error = Some(message.clone()); + self.diagnostics.push(PluginInstanceDiagnostic::with_kind( + kind, + self.lifecycle.clone(), + message, + )); + } } enum PluginInstanceRuntime { ComponentToolAdapter, #[cfg(test)] TestIngress { - calls: u64, + tool_calls: u64, + ingress_calls: u64, }, ComponentInstance(PluginComponentInstanceRuntime), } @@ -3297,7 +3690,10 @@ impl PluginInstanceRuntime { }; match runtime.kind.as_str() { #[cfg(test)] - "test-ingress" => Ok(Self::TestIngress { calls: 0 }), + "test-ingress" => Ok(Self::TestIngress { + tool_calls: 0, + ingress_calls: 0, + }), PLUGIN_RUNTIME_COMPONENT_KIND if runtime.world.as_deref() == Some(PLUGIN_COMPONENT_INSTANCE_WORLD) => { @@ -4348,7 +4744,7 @@ mod tests { Some(PLUGIN_COMPONENT_INSTANCE_WORLD.into()); let handle = PluginInstanceHandle::new(record).unwrap(); let status = handle.status(); - assert_eq!(status.lifecycle, PluginInstanceLifecycleState::Started); + assert_eq!(status.lifecycle, PluginInstanceLifecycleState::Running); assert_eq!(status.component_status.unwrap()["data"]["phase"], "status"); let stopped = handle.stop().unwrap(); assert_eq!(stopped.lifecycle, PluginInstanceLifecycleState::Stopped); @@ -4418,6 +4814,10 @@ mod tests { .push(PluginPermission::ingress(name)); } + fn test_ingress_event(ingress_name: &str, payload: Value) -> PluginIngressEvent { + PluginIngressEvent::new(ingress_name, "test", "unit", payload) + } + #[test] fn service_selected_ignores_unselected_tool_without_grants() { let mut record = record(vec![tool("hidden_tool")]); @@ -4440,7 +4840,7 @@ mod tests { assert_eq!(report.reports[0].provided_services.len(), 1); assert_eq!( feature.instance_status().unwrap().lifecycle, - PluginInstanceLifecycleState::Started + PluginInstanceLifecycleState::Running ); } @@ -4462,11 +4862,7 @@ mod tests { assert!(report.reports[0].provided_services.is_empty()); let dispatch = feature.dispatch_ingress( "hidden_ingress", - PluginIngressEvent { - kind: "test".into(), - source: "unit".into(), - payload: serde_json::json!({}), - }, + test_ingress_event("hidden_ingress", serde_json::json!({})), ); assert!( dispatch @@ -4494,7 +4890,174 @@ mod tests { "{report:#?}" ); let status = feature.instance_status().expect("service instance started"); - assert_eq!(status.lifecycle, PluginInstanceLifecycleState::Started); + assert_eq!(status.lifecycle, PluginInstanceLifecycleState::Running); + assert_eq!(status.queue_depth, 0); + assert_eq!(status.queue_capacity, PLUGIN_SERVICE_INGRESS_QUEUE_CAPACITY); + assert!(status.last_error.is_none()); + } + + fn test_service_ingress_record() -> ResolvedPluginRecord { + let mut record = record(Vec::new()); + add_service(&mut record, "svc"); + add_ingress(&mut record, "shared_ingress"); + record.manifest.runtime = Some(manifest::plugin::PluginRuntimeManifest { + kind: "test-ingress".into(), + entry: None, + abi: None, + component: None, + world: Some(PLUGIN_COMPONENT_INSTANCE_WORLD.into()), + }); + record + } + + #[test] + fn ingress_queue_dispatches_serially_and_reports_status() { + let handle = PluginInstanceHandle::new(test_service_ingress_record()).unwrap(); + assert_eq!( + handle.status().lifecycle, + PluginInstanceLifecycleState::Running + ); + + let first = handle + .deliver_ingress( + "shared_ingress", + test_ingress_event("shared_ingress", serde_json::json!({ "seq": 1 })), + ) + .unwrap(); + let second = handle + .deliver_ingress( + "shared_ingress", + test_ingress_event("shared_ingress", serde_json::json!({ "seq": 2 })), + ) + .unwrap(); + + assert_eq!(first.output["ingress_calls"], 1); + assert_eq!(second.output["ingress_calls"], 2); + assert_eq!(second.queue_depth, 0); + assert_eq!(second.dispatch_counters.enqueued, 2); + assert_eq!(second.dispatch_counters.dispatched, 2); + let status = handle.status(); + assert_eq!(status.queue_depth, 0); + assert_eq!(status.dispatch_counters.enqueued, 2); + assert_eq!(status.dispatch_counters.dispatched, 2); + assert!(status.last_error.is_none()); + } + + #[test] + fn bounded_ingress_queue_rejects_full_queue() { + let handle = PluginInstanceHandle::new(test_service_ingress_record()).unwrap(); + let mut instance = handle.0.lock().unwrap(); + instance.ingress_queue_capacity = 1; + instance + .enqueue_ingress( + "shared_ingress", + test_ingress_event("shared_ingress", serde_json::json!({ "seq": 1 })), + ) + .unwrap(); + let error = instance + .enqueue_ingress( + "shared_ingress", + test_ingress_event("shared_ingress", serde_json::json!({ "seq": 2 })), + ) + .unwrap_err(); + assert!(matches!( + error, + PluginIngressDispatchError::QueueFull { capacity: 1 } + )); + assert_eq!(instance.ingress_queue.len(), 1); + assert_eq!(instance.dispatch_counters.rejected, 1); + assert_eq!( + instance.diagnostics.last().unwrap().kind, + PluginInstanceDiagnosticKind::QueueFull + ); + } + + #[test] + fn ingress_dispatch_failure_marks_service_failed_and_rejects_later_events() { + let handle = PluginInstanceHandle::new(test_service_ingress_record()).unwrap(); + let error = handle + .deliver_ingress( + "shared_ingress", + test_ingress_event("shared_ingress", serde_json::json!({ "fail": true })), + ) + .unwrap_err(); + assert!(matches!( + error, + PluginIngressDispatchError::DispatchFailed(_) + )); + let status = handle.status(); + assert_eq!(status.lifecycle, PluginInstanceLifecycleState::Failed); + assert_eq!(status.dispatch_counters.failed, 1); + assert_eq!( + status.diagnostics.last().unwrap().kind, + PluginInstanceDiagnosticKind::DispatchFailed + ); + + let retry = handle + .deliver_ingress( + "shared_ingress", + test_ingress_event("shared_ingress", serde_json::json!({ "seq": 3 })), + ) + .unwrap_err(); + assert!(matches!( + retry, + PluginIngressDispatchError::ServiceFailed(_) + )); + } + + #[test] + fn ingress_dispatch_timeout_records_typed_diagnostic() { + let handle = PluginInstanceHandle::new(test_service_ingress_record()).unwrap(); + let error = handle + .deliver_ingress( + "shared_ingress", + test_ingress_event("shared_ingress", serde_json::json!({ "sleep_ms": 50 })), + ) + .unwrap_err(); + assert!(matches!( + error, + PluginIngressDispatchError::DispatchTimeout { .. } + )); + let status = handle.status(); + assert_eq!(status.lifecycle, PluginInstanceLifecycleState::Failed); + assert_eq!(status.dispatch_counters.timed_out, 1); + assert_eq!( + status.diagnostics.last().unwrap().kind, + PluginInstanceDiagnosticKind::DispatchTimeout + ); + } + + #[test] + fn stopped_service_rejects_ingress_events() { + let handle = PluginInstanceHandle::new(test_service_ingress_record()).unwrap(); + handle.stop().unwrap(); + let error = handle + .deliver_ingress( + "shared_ingress", + test_ingress_event("shared_ingress", serde_json::json!({ "seq": 1 })), + ) + .unwrap_err(); + assert!(matches!( + error, + PluginIngressDispatchError::ServiceStopped { + state: PluginInstanceLifecycleState::Stopped + } + )); + } + + #[test] + fn invalid_ingress_event_is_typed_rejection() { + let handle = PluginInstanceHandle::new(test_service_ingress_record()).unwrap(); + let mut event = test_ingress_event("shared_ingress", serde_json::json!({})); + event.correlation_id.clear(); + let error = handle.deliver_ingress("shared_ingress", event).unwrap_err(); + assert!(matches!(error, PluginIngressDispatchError::InvalidEvent(_))); + let status = handle.status(); + assert_eq!(status.dispatch_counters.rejected, 1); + assert_eq!( + status.diagnostics.last().unwrap().kind, + PluginInstanceDiagnosticKind::InvalidEvent + ); } #[test] @@ -4530,11 +5093,7 @@ mod tests { let report = feature .dispatch_ingress( "shared_ingress", - PluginIngressEvent { - kind: "test".into(), - source: "unit".into(), - payload: serde_json::json!({ "hello": "world" }), - }, + test_ingress_event("shared_ingress", serde_json::json!({ "hello": "world" })), ) .unwrap(); assert!(report.accepted); @@ -4575,9 +5134,16 @@ mod tests { .push(PluginPermission::ingress("shared_ingress")); let handle = PluginInstanceHandle(Arc::new(Mutex::new(PluginInstance { record, - runtime: PluginInstanceRuntime::TestIngress { calls: 0 }, - lifecycle: PluginInstanceLifecycleState::Started, + runtime: PluginInstanceRuntime::TestIngress { + tool_calls: 0, + ingress_calls: 0, + }, + lifecycle: PluginInstanceLifecycleState::Running, component_status: None, + ingress_queue: VecDeque::new(), + ingress_queue_capacity: PLUGIN_SERVICE_INGRESS_QUEUE_CAPACITY, + dispatch_counters: PluginIngressDispatchCounters::default(), + last_error: None, diagnostics: Vec::new(), }))); @@ -4587,11 +5153,7 @@ mod tests { let report = handle .deliver_ingress( "shared_ingress", - PluginIngressEvent { - kind: "test".into(), - source: "unit".into(), - payload: serde_json::json!({ "hello": "world" }), - }, + test_ingress_event("shared_ingress", serde_json::json!({ "hello": "world" })), ) .unwrap(); assert!(report.accepted);