feat: add plugin service ingress queue

This commit is contained in:
Keisuke Hirata 2026-06-25 06:42:56 +09:00
parent f26c7e0d09
commit 4e138b7e36
No known key found for this signature in database

View File

@ -7,7 +7,7 @@
//! host APIs through explicit imports with matching permissions and scoped
//! allowlist grants.
use std::collections::{HashMap, HashSet};
use std::collections::{HashMap, HashSet, VecDeque};
use std::fs;
use std::io::{Read as _, Write as _};
use std::net::{IpAddr, SocketAddr, ToSocketAddrs};
@ -103,16 +103,18 @@ impl PluginToolFeature {
&self,
ingress_name: &str,
event: PluginIngressEvent,
) -> Result<PluginIngressDispatchReport, PluginWasmError> {
) -> Result<PluginIngressDispatchReport, PluginIngressDispatchError> {
if !surface_enabled(&self.record, PluginSurface::Ingress) {
return Err(PluginWasmError::Module(
return Err(PluginIngressDispatchError::InvalidEvent(
"plugin ingress surface is not enabled".to_string(),
));
}
let handle = self
.registry
.handle(&self.record.identity.to_string())
.ok_or_else(|| PluginWasmError::Module("plugin instance is not started".to_string()))?;
.ok_or_else(|| PluginIngressDispatchError::ServiceUnavailable {
state: PluginInstanceLifecycleState::Stopped,
})?;
handle.deliver_ingress(ingress_name, event)
}
@ -2280,6 +2282,11 @@ const PLUGIN_WASM_MAX_OUTPUT_BYTES: usize = 64 * 1024;
const PLUGIN_WASM_MAX_SUMMARY_BYTES: usize = 1024;
const PLUGIN_WASM_FUEL: u64 = 5_000_000;
const PLUGIN_WASM_TIMEOUT: Duration = Duration::from_secs(1);
const PLUGIN_SERVICE_INGRESS_QUEUE_CAPACITY: usize = 32;
#[cfg(test)]
const PLUGIN_SERVICE_INGRESS_DISPATCH_TIMEOUT: Duration = Duration::from_millis(25);
#[cfg(not(test))]
const PLUGIN_SERVICE_INGRESS_DISPATCH_TIMEOUT: Duration = Duration::from_secs(1);
const PLUGIN_WASM_MEMORY_BYTES: usize = 2 * 1024 * 1024;
const PLUGIN_WASM_TABLE_ELEMENTS: usize = 256;
const PLUGIN_REQUEST_MAX_REQUEST_BYTES: usize = 48 * 1024;
@ -2933,31 +2940,68 @@ impl PluginRequestError {
#[derive(Clone, Debug, PartialEq, Eq, Serialize)]
pub enum PluginInstanceLifecycleState {
Ready,
Started,
Starting,
Running,
Stopping,
Stopped,
Failed,
}
#[derive(Clone, Debug, PartialEq, Eq, Serialize)]
pub enum PluginInstanceDiagnosticKind {
Lifecycle,
InvalidEvent,
QueueFull,
DispatchTimeout,
DispatchFailed,
ServiceUnavailable,
ServiceFailed,
ServiceStopped,
}
#[derive(Clone, Debug, PartialEq, Eq, Serialize)]
pub struct PluginInstanceDiagnostic {
pub kind: PluginInstanceDiagnosticKind,
pub state: PluginInstanceLifecycleState,
pub message: String,
}
impl PluginInstanceDiagnostic {
pub fn new(state: PluginInstanceLifecycleState, message: impl Into<String>) -> Self {
Self::with_kind(PluginInstanceDiagnosticKind::Lifecycle, state, message)
}
pub fn with_kind(
kind: PluginInstanceDiagnosticKind,
state: PluginInstanceLifecycleState,
message: impl Into<String>,
) -> Self {
Self {
kind,
state,
message: bounded_message(message.into()),
message: bounded_message(redact_secret_like(&message.into())),
}
}
}
#[derive(Clone, Debug, Default, PartialEq, Eq, Serialize)]
pub struct PluginIngressDispatchCounters {
pub enqueued: u64,
pub dispatched: u64,
pub rejected: u64,
pub failed: u64,
pub timed_out: u64,
}
#[derive(Clone, Debug, PartialEq, Serialize)]
pub struct PluginInstanceStatus {
pub plugin_ref: String,
pub lifecycle: PluginInstanceLifecycleState,
pub component_status: Option<Value>,
pub queue_depth: usize,
pub queue_capacity: usize,
pub last_error: Option<String>,
pub dispatch_counters: PluginIngressDispatchCounters,
pub diagnostics: Vec<PluginInstanceDiagnostic>,
}
@ -2965,7 +3009,30 @@ pub struct PluginInstanceStatus {
pub struct PluginIngressEvent {
pub kind: String,
pub source: String,
pub ingress_name: String,
pub payload: Value,
pub created_at: String,
pub attempt: u32,
pub correlation_id: String,
}
impl PluginIngressEvent {
pub fn new(
ingress_name: impl Into<String>,
kind: impl Into<String>,
source: impl Into<String>,
payload: Value,
) -> Self {
Self {
kind: kind.into(),
source: source.into(),
ingress_name: ingress_name.into(),
payload,
created_at: chrono::Utc::now().to_rfc3339_opts(chrono::SecondsFormat::Millis, true),
attempt: 1,
correlation_id: uuid::Uuid::now_v7().to_string(),
}
}
}
#[derive(Clone, Debug, PartialEq, Serialize)]
@ -2974,9 +3041,80 @@ pub struct PluginIngressDispatchReport {
pub ingress: String,
pub accepted: bool,
pub output: Value,
pub queue_depth: usize,
pub dispatch_counters: PluginIngressDispatchCounters,
pub diagnostics: Vec<PluginInstanceDiagnostic>,
}
#[derive(Clone, Debug, PartialEq, Eq)]
pub enum PluginIngressDispatchError {
InvalidEvent(String),
QueueFull { capacity: usize },
ServiceUnavailable { state: PluginInstanceLifecycleState },
ServiceFailed(String),
ServiceStopped { state: PluginInstanceLifecycleState },
DispatchTimeout { timeout: Duration },
DispatchFailed(String),
}
impl PluginIngressDispatchError {
fn bounded_message(&self) -> String {
match self {
Self::InvalidEvent(message) => bounded_message(format!(
"invalid plugin ingress event: {}",
redact_secret_like(message)
)),
Self::QueueFull { capacity } => bounded_message(format!(
"plugin ingress queue is full (capacity {capacity})"
)),
Self::ServiceUnavailable { state } => bounded_message(format!(
"plugin service is not running for ingress dispatch (state {state:?})"
)),
Self::ServiceFailed(message) => bounded_message(format!(
"plugin service is failed for ingress dispatch: {}",
redact_secret_like(message)
)),
Self::ServiceStopped { state } => bounded_message(format!(
"plugin service rejects ingress while stopping/stopped (state {state:?})"
)),
Self::DispatchTimeout { timeout } => bounded_message(format!(
"plugin ingress dispatch timed out after {timeout:?}"
)),
Self::DispatchFailed(message) => bounded_message(format!(
"plugin ingress dispatch failed closed: {}",
redact_secret_like(message)
)),
}
}
fn diagnostic_kind(&self) -> PluginInstanceDiagnosticKind {
match self {
Self::InvalidEvent(_) => PluginInstanceDiagnosticKind::InvalidEvent,
Self::QueueFull { .. } => PluginInstanceDiagnosticKind::QueueFull,
Self::ServiceUnavailable { .. } => PluginInstanceDiagnosticKind::ServiceUnavailable,
Self::ServiceFailed(_) => PluginInstanceDiagnosticKind::ServiceFailed,
Self::ServiceStopped { .. } => PluginInstanceDiagnosticKind::ServiceStopped,
Self::DispatchTimeout { .. } => PluginInstanceDiagnosticKind::DispatchTimeout,
Self::DispatchFailed(_) => PluginInstanceDiagnosticKind::DispatchFailed,
}
}
}
impl std::fmt::Display for PluginIngressDispatchError {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
f.write_str(&self.bounded_message())
}
}
impl std::error::Error for PluginIngressDispatchError {}
#[derive(Clone, Debug)]
struct QueuedPluginIngress {
ingress_name: String,
event: PluginIngressEvent,
enqueued_at: Instant,
}
#[derive(Clone, Default)]
pub struct PluginInstanceRegistry {
instances: Arc<Mutex<HashMap<String, PluginInstanceHandle>>>,
@ -3043,6 +3181,10 @@ impl PluginInstanceHandle {
runtime,
lifecycle: PluginInstanceLifecycleState::Ready,
component_status: None,
ingress_queue: VecDeque::new(),
ingress_queue_capacity: PLUGIN_SERVICE_INGRESS_QUEUE_CAPACITY,
dispatch_counters: PluginIngressDispatchCounters::default(),
last_error: None,
diagnostics: Vec::new(),
};
instance.start()?;
@ -3060,11 +3202,10 @@ impl PluginInstanceHandle {
&self,
ingress_name: &str,
event: PluginIngressEvent,
) -> Result<PluginIngressDispatchReport, PluginWasmError> {
self.0
.lock()
.expect("plugin instance poisoned")
.deliver_ingress(ingress_name, event)
) -> Result<PluginIngressDispatchReport, PluginIngressDispatchError> {
let mut instance = self.0.lock().expect("plugin instance poisoned");
instance.enqueue_ingress(ingress_name, event)?;
instance.dispatch_next_ingress()
}
pub fn status(&self) -> PluginInstanceStatus {
@ -3081,6 +3222,7 @@ impl PluginInstanceHandle {
fn record_diagnostic(&self, diagnostic: PluginInstanceDiagnostic) {
if let Ok(mut instance) = self.0.lock() {
instance.lifecycle = diagnostic.state.clone();
instance.last_error = Some(diagnostic.message.clone());
instance.diagnostics.push(diagnostic);
}
}
@ -3091,28 +3233,56 @@ struct PluginInstance {
runtime: PluginInstanceRuntime,
lifecycle: PluginInstanceLifecycleState,
component_status: Option<Value>,
ingress_queue: VecDeque<QueuedPluginIngress>,
ingress_queue_capacity: usize,
dispatch_counters: PluginIngressDispatchCounters,
last_error: Option<String>,
diagnostics: Vec<PluginInstanceDiagnostic>,
}
impl PluginInstance {
fn start(&mut self) -> Result<(), PluginWasmError> {
match &mut self.runtime {
self.lifecycle = PluginInstanceLifecycleState::Starting;
let start_result = match &mut self.runtime {
PluginInstanceRuntime::ComponentToolAdapter => {
self.lifecycle = PluginInstanceLifecycleState::Ready;
self.diagnostics.push(PluginInstanceDiagnostic::new(
PluginInstanceLifecycleState::Ready,
"component tool runtime registered behind PluginInstanceRegistry",
));
Ok(())
}
#[cfg(test)]
PluginInstanceRuntime::TestIngress { .. } => {
self.lifecycle = PluginInstanceLifecycleState::Started;
self.lifecycle = PluginInstanceLifecycleState::Running;
self.diagnostics.push(PluginInstanceDiagnostic::new(
PluginInstanceLifecycleState::Running,
"test ingress runtime initialized",
));
Ok(())
}
PluginInstanceRuntime::ComponentInstance(runtime) => {
let status = runtime.start(&self.record)?;
self.component_status = Some(status);
self.lifecycle = PluginInstanceLifecycleState::Started;
match runtime.start(&self.record) {
Ok(status) => {
self.component_status = Some(status);
self.lifecycle = PluginInstanceLifecycleState::Running;
self.diagnostics.push(PluginInstanceDiagnostic::new(
PluginInstanceLifecycleState::Running,
"component instance start returned; host-managed ingress queue is running",
));
Ok(())
}
Err(error) => Err(error),
}
}
};
if let Err(error) = start_result {
self.lifecycle = PluginInstanceLifecycleState::Failed;
self.record_runtime_error(
PluginInstanceDiagnosticKind::ServiceFailed,
format!("plugin component start failed: {}", error.bounded_message()),
);
return Err(error);
}
Ok(())
}
@ -3149,10 +3319,10 @@ impl PluginInstance {
run_plugin_component_tool(self.record.clone(), tool_name.to_string(), input)
}
#[cfg(test)]
PluginInstanceRuntime::TestIngress { calls } => {
*calls += 1;
PluginInstanceRuntime::TestIngress { tool_calls, .. } => {
*tool_calls += 1;
Ok(ToolOutput {
summary: format!("{tool_name}: {calls}"),
summary: format!("{tool_name}: {tool_calls}"),
content: Some(String::from_utf8_lossy(&input).to_string()),
})
}
@ -3162,53 +3332,208 @@ impl PluginInstance {
}
}
fn deliver_ingress(
fn enqueue_ingress(
&mut self,
ingress_name: &str,
event: PluginIngressEvent,
) -> Result<PluginIngressDispatchReport, PluginWasmError> {
) -> Result<(), PluginIngressDispatchError> {
self.validate_ingress_event(ingress_name, &event)
.map_err(|error| self.record_rejection(error))?;
if self.ingress_queue.len() >= self.ingress_queue_capacity {
let error = PluginIngressDispatchError::QueueFull {
capacity: self.ingress_queue_capacity,
};
return Err(self.record_rejection(error));
}
self.ingress_queue.push_back(QueuedPluginIngress {
ingress_name: ingress_name.to_string(),
event,
enqueued_at: Instant::now(),
});
self.dispatch_counters.enqueued += 1;
Ok(())
}
fn validate_ingress_event(
&self,
ingress_name: &str,
event: &PluginIngressEvent,
) -> Result<(), PluginIngressDispatchError> {
if !surface_enabled(&self.record, PluginSurface::Ingress) {
return Err(PluginWasmError::Module(
return Err(PluginIngressDispatchError::InvalidEvent(
"plugin ingress surface is not enabled".to_string(),
));
}
if serde_json::to_vec(&event)
match self.lifecycle {
PluginInstanceLifecycleState::Running => {}
PluginInstanceLifecycleState::Failed => {
return Err(PluginIngressDispatchError::ServiceFailed(
self.last_error
.clone()
.unwrap_or_else(|| "service is failed".to_string()),
));
}
PluginInstanceLifecycleState::Stopping | PluginInstanceLifecycleState::Stopped => {
return Err(PluginIngressDispatchError::ServiceStopped {
state: self.lifecycle.clone(),
});
}
_ => {
return Err(PluginIngressDispatchError::ServiceUnavailable {
state: self.lifecycle.clone(),
});
}
}
if event.source.trim().is_empty() {
return Err(PluginIngressDispatchError::InvalidEvent(
"source must not be empty".to_string(),
));
}
if event.ingress_name != ingress_name {
return Err(PluginIngressDispatchError::InvalidEvent(format!(
"event ingress `{}` does not match dispatch ingress `{ingress_name}`",
event.ingress_name
)));
}
if event.kind.trim().is_empty() {
return Err(PluginIngressDispatchError::InvalidEvent(
"event kind must not be empty".to_string(),
));
}
if event.created_at.trim().is_empty() {
return Err(PluginIngressDispatchError::InvalidEvent(
"created_at must not be empty".to_string(),
));
}
if event.attempt == 0 {
return Err(PluginIngressDispatchError::InvalidEvent(
"attempt must be greater than zero".to_string(),
));
}
if event.correlation_id.trim().is_empty() {
return Err(PluginIngressDispatchError::InvalidEvent(
"correlation_id must not be empty".to_string(),
));
}
if serde_json::to_vec(event)
.map(|bytes| bytes.len())
.unwrap_or(usize::MAX)
> PLUGIN_WASM_MAX_INPUT_BYTES
{
return Err(PluginWasmError::Module(format!(
return Err(PluginIngressDispatchError::InvalidEvent(format!(
"plugin ingress event exceeds {} bytes",
PLUGIN_WASM_MAX_INPUT_BYTES
)));
}
self.record
let ingress = self
.record
.manifest
.ingresses
.iter()
.find(|ingress| ingress.name == ingress_name)
.ok_or_else(|| {
PluginWasmError::Module(
PluginIngressDispatchError::InvalidEvent(
"requested ingress is not declared by plugin manifest".to_string(),
)
})?;
if !ingress.event_kinds.is_empty()
&& !ingress.event_kinds.iter().any(|kind| kind == &event.kind)
{
return Err(PluginIngressDispatchError::InvalidEvent(format!(
"event kind `{}` is not declared for ingress `{ingress_name}`",
event.kind
)));
}
authorize_plugin_ingress(&self.record, ingress_name).map_err(|error| {
PluginWasmError::Module(format!(
PluginIngressDispatchError::InvalidEvent(format!(
"plugin ingress permission denied: {}",
error.bounded_message()
))
})?;
Ok(())
}
fn dispatch_next_ingress(
&mut self,
) -> Result<PluginIngressDispatchReport, PluginIngressDispatchError> {
let Some(queued) = self.ingress_queue.pop_front() else {
return Err(PluginIngressDispatchError::InvalidEvent(
"plugin ingress queue is empty".to_string(),
));
};
let started_at = Instant::now();
let result = self.dispatch_ingress_now(&queued.ingress_name, queued.event);
let elapsed = started_at.elapsed();
if elapsed > PLUGIN_SERVICE_INGRESS_DISPATCH_TIMEOUT {
let error = PluginIngressDispatchError::DispatchTimeout {
timeout: PLUGIN_SERVICE_INGRESS_DISPATCH_TIMEOUT,
};
self.dispatch_counters.timed_out += 1;
self.dispatch_counters.failed += 1;
self.lifecycle = PluginInstanceLifecycleState::Failed;
self.record_dispatch_error(&error);
return Err(error);
}
match result {
Ok(mut report) => {
let queue_latency_ms = queued.enqueued_at.elapsed().as_millis() as u64;
if report.output.get("queue_latency_ms").is_none() {
if let Some(map) = report.output.as_object_mut() {
map.insert(
"queue_latency_ms".to_string(),
Value::from(queue_latency_ms),
);
}
}
self.dispatch_counters.dispatched += 1;
report.queue_depth = self.ingress_queue.len();
report.dispatch_counters = self.dispatch_counters.clone();
report.diagnostics = self.diagnostics.clone();
Ok(report)
}
Err(error) => {
self.dispatch_counters.failed += 1;
self.lifecycle = PluginInstanceLifecycleState::Failed;
let dispatch_error =
PluginIngressDispatchError::DispatchFailed(error.bounded_message());
self.record_dispatch_error(&dispatch_error);
Err(dispatch_error)
}
}
}
fn dispatch_ingress_now(
&mut self,
ingress_name: &str,
event: PluginIngressEvent,
) -> Result<PluginIngressDispatchReport, PluginWasmError> {
match &mut self.runtime {
PluginInstanceRuntime::ComponentToolAdapter => Err(PluginWasmError::Module(
"component tool runtime does not expose ingress dispatch".to_string(),
)),
#[cfg(test)]
PluginInstanceRuntime::TestIngress { calls } => {
PluginInstanceRuntime::TestIngress {
tool_calls,
ingress_calls,
} => {
if let Some(sleep_ms) = event.payload.get("sleep_ms").and_then(Value::as_u64) {
std::thread::sleep(Duration::from_millis(sleep_ms));
}
if event.payload.get("fail").and_then(Value::as_bool) == Some(true) {
return Err(PluginWasmError::Execution(
"test ingress requested failure".to_string(),
));
}
*ingress_calls += 1;
let output = serde_json::json!({
"ingress": ingress_name,
"kind": event.kind,
"source": event.source,
"calls": *calls,
"ingress_name": event.ingress_name,
"attempt": event.attempt,
"correlation_id": event.correlation_id,
"calls": *tool_calls,
"ingress_calls": *ingress_calls,
"payload": event.payload,
});
Ok(PluginIngressDispatchReport {
@ -3216,6 +3541,8 @@ impl PluginInstance {
ingress: ingress_name.to_string(),
accepted: true,
output,
queue_depth: self.ingress_queue.len(),
dispatch_counters: self.dispatch_counters.clone(),
diagnostics: self.diagnostics.clone(),
})
}
@ -3226,6 +3553,8 @@ impl PluginInstance {
ingress: ingress_name.to_string(),
accepted: true,
output,
queue_depth: self.ingress_queue.len(),
dispatch_counters: self.dispatch_counters.clone(),
diagnostics: self.diagnostics.clone(),
})
}
@ -3233,15 +3562,40 @@ impl PluginInstance {
}
fn stop(&mut self) -> Result<(), PluginWasmError> {
match &mut self.runtime {
PluginInstanceRuntime::ComponentToolAdapter => {}
if self.lifecycle == PluginInstanceLifecycleState::Stopped {
return Ok(());
}
self.lifecycle = PluginInstanceLifecycleState::Stopping;
self.diagnostics.push(PluginInstanceDiagnostic::new(
PluginInstanceLifecycleState::Stopping,
"plugin service stop requested; ingress queue is closed",
));
self.ingress_queue.clear();
let stop_result = match &mut self.runtime {
PluginInstanceRuntime::ComponentToolAdapter => Ok(()),
#[cfg(test)]
PluginInstanceRuntime::TestIngress { .. } => {}
PluginInstanceRuntime::ComponentInstance(runtime) => {
self.component_status = Some(runtime.stop()?);
}
PluginInstanceRuntime::TestIngress { .. } => Ok(()),
PluginInstanceRuntime::ComponentInstance(runtime) => match runtime.stop() {
Ok(status) => {
self.component_status = Some(status);
Ok(())
}
Err(error) => Err(error),
},
};
if let Err(error) = stop_result {
self.lifecycle = PluginInstanceLifecycleState::Failed;
self.record_runtime_error(
PluginInstanceDiagnosticKind::ServiceFailed,
format!("plugin component stop failed: {}", error.bounded_message()),
);
return Err(error);
}
self.lifecycle = PluginInstanceLifecycleState::Stopped;
self.diagnostics.push(PluginInstanceDiagnostic::new(
PluginInstanceLifecycleState::Stopped,
"plugin service stopped",
));
Ok(())
}
@ -3250,40 +3604,79 @@ impl PluginInstance {
plugin_ref: self.record.identity.to_string(),
lifecycle: self.lifecycle.clone(),
component_status: self.component_status.clone(),
queue_depth: self.ingress_queue.len(),
queue_capacity: self.ingress_queue_capacity,
last_error: self.last_error.clone(),
dispatch_counters: self.dispatch_counters.clone(),
diagnostics: self.diagnostics.clone(),
}
}
fn status(&mut self) -> PluginInstanceStatus {
if let PluginInstanceRuntime::ComponentInstance(runtime) = &mut self.runtime {
match runtime.status() {
Ok(status) => self.component_status = Some(status),
Err(error) => {
self.lifecycle = PluginInstanceLifecycleState::Failed;
self.diagnostics.push(PluginInstanceDiagnostic::new(
PluginInstanceLifecycleState::Failed,
format!(
"plugin component status failed: {}",
error.bounded_message()
),
));
if self.lifecycle == PluginInstanceLifecycleState::Running {
if let PluginInstanceRuntime::ComponentInstance(runtime) = &mut self.runtime {
match runtime.status() {
Ok(status) => self.component_status = Some(status),
Err(error) => {
self.lifecycle = PluginInstanceLifecycleState::Failed;
self.record_runtime_error(
PluginInstanceDiagnosticKind::ServiceFailed,
format!(
"plugin component status failed: {}",
error.bounded_message()
),
);
}
}
}
}
PluginInstanceStatus {
plugin_ref: self.record.identity.to_string(),
lifecycle: self.lifecycle.clone(),
component_status: self.component_status.clone(),
diagnostics: self.diagnostics.clone(),
self.snapshot_status()
}
fn record_rejection(
&mut self,
error: PluginIngressDispatchError,
) -> PluginIngressDispatchError {
self.dispatch_counters.rejected += 1;
self.record_dispatch_error(&error);
error
}
fn record_dispatch_error(&mut self, error: &PluginIngressDispatchError) {
let state = match error {
PluginIngressDispatchError::DispatchFailed(_)
| PluginIngressDispatchError::DispatchTimeout { .. }
| PluginIngressDispatchError::ServiceFailed(_) => PluginInstanceLifecycleState::Failed,
PluginIngressDispatchError::ServiceStopped { .. } => self.lifecycle.clone(),
_ => self.lifecycle.clone(),
};
self.record_runtime_error(error.diagnostic_kind(), error.bounded_message());
if matches!(state, PluginInstanceLifecycleState::Failed) {
self.lifecycle = PluginInstanceLifecycleState::Failed;
}
}
fn record_runtime_error(
&mut self,
kind: PluginInstanceDiagnosticKind,
message: impl Into<String>,
) {
let message = bounded_message(redact_secret_like(&message.into()));
self.last_error = Some(message.clone());
self.diagnostics.push(PluginInstanceDiagnostic::with_kind(
kind,
self.lifecycle.clone(),
message,
));
}
}
enum PluginInstanceRuntime {
ComponentToolAdapter,
#[cfg(test)]
TestIngress {
calls: u64,
tool_calls: u64,
ingress_calls: u64,
},
ComponentInstance(PluginComponentInstanceRuntime),
}
@ -3297,7 +3690,10 @@ impl PluginInstanceRuntime {
};
match runtime.kind.as_str() {
#[cfg(test)]
"test-ingress" => Ok(Self::TestIngress { calls: 0 }),
"test-ingress" => Ok(Self::TestIngress {
tool_calls: 0,
ingress_calls: 0,
}),
PLUGIN_RUNTIME_COMPONENT_KIND
if runtime.world.as_deref() == Some(PLUGIN_COMPONENT_INSTANCE_WORLD) =>
{
@ -4348,7 +4744,7 @@ mod tests {
Some(PLUGIN_COMPONENT_INSTANCE_WORLD.into());
let handle = PluginInstanceHandle::new(record).unwrap();
let status = handle.status();
assert_eq!(status.lifecycle, PluginInstanceLifecycleState::Started);
assert_eq!(status.lifecycle, PluginInstanceLifecycleState::Running);
assert_eq!(status.component_status.unwrap()["data"]["phase"], "status");
let stopped = handle.stop().unwrap();
assert_eq!(stopped.lifecycle, PluginInstanceLifecycleState::Stopped);
@ -4418,6 +4814,10 @@ mod tests {
.push(PluginPermission::ingress(name));
}
fn test_ingress_event(ingress_name: &str, payload: Value) -> PluginIngressEvent {
PluginIngressEvent::new(ingress_name, "test", "unit", payload)
}
#[test]
fn service_selected_ignores_unselected_tool_without_grants() {
let mut record = record(vec![tool("hidden_tool")]);
@ -4440,7 +4840,7 @@ mod tests {
assert_eq!(report.reports[0].provided_services.len(), 1);
assert_eq!(
feature.instance_status().unwrap().lifecycle,
PluginInstanceLifecycleState::Started
PluginInstanceLifecycleState::Running
);
}
@ -4462,11 +4862,7 @@ mod tests {
assert!(report.reports[0].provided_services.is_empty());
let dispatch = feature.dispatch_ingress(
"hidden_ingress",
PluginIngressEvent {
kind: "test".into(),
source: "unit".into(),
payload: serde_json::json!({}),
},
test_ingress_event("hidden_ingress", serde_json::json!({})),
);
assert!(
dispatch
@ -4494,7 +4890,174 @@ mod tests {
"{report:#?}"
);
let status = feature.instance_status().expect("service instance started");
assert_eq!(status.lifecycle, PluginInstanceLifecycleState::Started);
assert_eq!(status.lifecycle, PluginInstanceLifecycleState::Running);
assert_eq!(status.queue_depth, 0);
assert_eq!(status.queue_capacity, PLUGIN_SERVICE_INGRESS_QUEUE_CAPACITY);
assert!(status.last_error.is_none());
}
fn test_service_ingress_record() -> ResolvedPluginRecord {
let mut record = record(Vec::new());
add_service(&mut record, "svc");
add_ingress(&mut record, "shared_ingress");
record.manifest.runtime = Some(manifest::plugin::PluginRuntimeManifest {
kind: "test-ingress".into(),
entry: None,
abi: None,
component: None,
world: Some(PLUGIN_COMPONENT_INSTANCE_WORLD.into()),
});
record
}
#[test]
fn ingress_queue_dispatches_serially_and_reports_status() {
let handle = PluginInstanceHandle::new(test_service_ingress_record()).unwrap();
assert_eq!(
handle.status().lifecycle,
PluginInstanceLifecycleState::Running
);
let first = handle
.deliver_ingress(
"shared_ingress",
test_ingress_event("shared_ingress", serde_json::json!({ "seq": 1 })),
)
.unwrap();
let second = handle
.deliver_ingress(
"shared_ingress",
test_ingress_event("shared_ingress", serde_json::json!({ "seq": 2 })),
)
.unwrap();
assert_eq!(first.output["ingress_calls"], 1);
assert_eq!(second.output["ingress_calls"], 2);
assert_eq!(second.queue_depth, 0);
assert_eq!(second.dispatch_counters.enqueued, 2);
assert_eq!(second.dispatch_counters.dispatched, 2);
let status = handle.status();
assert_eq!(status.queue_depth, 0);
assert_eq!(status.dispatch_counters.enqueued, 2);
assert_eq!(status.dispatch_counters.dispatched, 2);
assert!(status.last_error.is_none());
}
#[test]
fn bounded_ingress_queue_rejects_full_queue() {
let handle = PluginInstanceHandle::new(test_service_ingress_record()).unwrap();
let mut instance = handle.0.lock().unwrap();
instance.ingress_queue_capacity = 1;
instance
.enqueue_ingress(
"shared_ingress",
test_ingress_event("shared_ingress", serde_json::json!({ "seq": 1 })),
)
.unwrap();
let error = instance
.enqueue_ingress(
"shared_ingress",
test_ingress_event("shared_ingress", serde_json::json!({ "seq": 2 })),
)
.unwrap_err();
assert!(matches!(
error,
PluginIngressDispatchError::QueueFull { capacity: 1 }
));
assert_eq!(instance.ingress_queue.len(), 1);
assert_eq!(instance.dispatch_counters.rejected, 1);
assert_eq!(
instance.diagnostics.last().unwrap().kind,
PluginInstanceDiagnosticKind::QueueFull
);
}
#[test]
fn ingress_dispatch_failure_marks_service_failed_and_rejects_later_events() {
let handle = PluginInstanceHandle::new(test_service_ingress_record()).unwrap();
let error = handle
.deliver_ingress(
"shared_ingress",
test_ingress_event("shared_ingress", serde_json::json!({ "fail": true })),
)
.unwrap_err();
assert!(matches!(
error,
PluginIngressDispatchError::DispatchFailed(_)
));
let status = handle.status();
assert_eq!(status.lifecycle, PluginInstanceLifecycleState::Failed);
assert_eq!(status.dispatch_counters.failed, 1);
assert_eq!(
status.diagnostics.last().unwrap().kind,
PluginInstanceDiagnosticKind::DispatchFailed
);
let retry = handle
.deliver_ingress(
"shared_ingress",
test_ingress_event("shared_ingress", serde_json::json!({ "seq": 3 })),
)
.unwrap_err();
assert!(matches!(
retry,
PluginIngressDispatchError::ServiceFailed(_)
));
}
#[test]
fn ingress_dispatch_timeout_records_typed_diagnostic() {
let handle = PluginInstanceHandle::new(test_service_ingress_record()).unwrap();
let error = handle
.deliver_ingress(
"shared_ingress",
test_ingress_event("shared_ingress", serde_json::json!({ "sleep_ms": 50 })),
)
.unwrap_err();
assert!(matches!(
error,
PluginIngressDispatchError::DispatchTimeout { .. }
));
let status = handle.status();
assert_eq!(status.lifecycle, PluginInstanceLifecycleState::Failed);
assert_eq!(status.dispatch_counters.timed_out, 1);
assert_eq!(
status.diagnostics.last().unwrap().kind,
PluginInstanceDiagnosticKind::DispatchTimeout
);
}
#[test]
fn stopped_service_rejects_ingress_events() {
let handle = PluginInstanceHandle::new(test_service_ingress_record()).unwrap();
handle.stop().unwrap();
let error = handle
.deliver_ingress(
"shared_ingress",
test_ingress_event("shared_ingress", serde_json::json!({ "seq": 1 })),
)
.unwrap_err();
assert!(matches!(
error,
PluginIngressDispatchError::ServiceStopped {
state: PluginInstanceLifecycleState::Stopped
}
));
}
#[test]
fn invalid_ingress_event_is_typed_rejection() {
let handle = PluginInstanceHandle::new(test_service_ingress_record()).unwrap();
let mut event = test_ingress_event("shared_ingress", serde_json::json!({}));
event.correlation_id.clear();
let error = handle.deliver_ingress("shared_ingress", event).unwrap_err();
assert!(matches!(error, PluginIngressDispatchError::InvalidEvent(_)));
let status = handle.status();
assert_eq!(status.dispatch_counters.rejected, 1);
assert_eq!(
status.diagnostics.last().unwrap().kind,
PluginInstanceDiagnosticKind::InvalidEvent
);
}
#[test]
@ -4530,11 +5093,7 @@ mod tests {
let report = feature
.dispatch_ingress(
"shared_ingress",
PluginIngressEvent {
kind: "test".into(),
source: "unit".into(),
payload: serde_json::json!({ "hello": "world" }),
},
test_ingress_event("shared_ingress", serde_json::json!({ "hello": "world" })),
)
.unwrap();
assert!(report.accepted);
@ -4575,9 +5134,16 @@ mod tests {
.push(PluginPermission::ingress("shared_ingress"));
let handle = PluginInstanceHandle(Arc::new(Mutex::new(PluginInstance {
record,
runtime: PluginInstanceRuntime::TestIngress { calls: 0 },
lifecycle: PluginInstanceLifecycleState::Started,
runtime: PluginInstanceRuntime::TestIngress {
tool_calls: 0,
ingress_calls: 0,
},
lifecycle: PluginInstanceLifecycleState::Running,
component_status: None,
ingress_queue: VecDeque::new(),
ingress_queue_capacity: PLUGIN_SERVICE_INGRESS_QUEUE_CAPACITY,
dispatch_counters: PluginIngressDispatchCounters::default(),
last_error: None,
diagnostics: Vec::new(),
})));
@ -4587,11 +5153,7 @@ mod tests {
let report = handle
.deliver_ingress(
"shared_ingress",
PluginIngressEvent {
kind: "test".into(),
source: "unit".into(),
payload: serde_json::json!({ "hello": "world" }),
},
test_ingress_event("shared_ingress", serde_json::json!({ "hello": "world" })),
)
.unwrap();
assert!(report.accepted);