feat: add plugin service output commands
This commit is contained in:
parent
89a40db79e
commit
755d460f0d
|
|
@ -2283,6 +2283,9 @@ const PLUGIN_WASM_MAX_SUMMARY_BYTES: usize = 1024;
|
||||||
const PLUGIN_WASM_FUEL: u64 = 5_000_000;
|
const PLUGIN_WASM_FUEL: u64 = 5_000_000;
|
||||||
const PLUGIN_WASM_TIMEOUT: Duration = Duration::from_secs(1);
|
const PLUGIN_WASM_TIMEOUT: Duration = Duration::from_secs(1);
|
||||||
const PLUGIN_SERVICE_INGRESS_QUEUE_CAPACITY: usize = 32;
|
const PLUGIN_SERVICE_INGRESS_QUEUE_CAPACITY: usize = 32;
|
||||||
|
const PLUGIN_SERVICE_OUTPUT_COMMAND_MAX_COUNT: usize = 16;
|
||||||
|
const PLUGIN_SERVICE_OUTPUT_COMMAND_MAX_PAYLOAD_BYTES: usize = 16 * 1024;
|
||||||
|
const PLUGIN_SERVICE_OUTPUT_COMMAND_MAX_RESULTS: usize = 32;
|
||||||
#[cfg(test)]
|
#[cfg(test)]
|
||||||
const PLUGIN_SERVICE_INGRESS_DISPATCH_TIMEOUT: Duration = Duration::from_millis(25);
|
const PLUGIN_SERVICE_INGRESS_DISPATCH_TIMEOUT: Duration = Duration::from_millis(25);
|
||||||
#[cfg(not(test))]
|
#[cfg(not(test))]
|
||||||
|
|
@ -2447,6 +2450,22 @@ struct PluginWebSocketOpenRequest {
|
||||||
headers: Vec<PluginRequestHeader>,
|
headers: Vec<PluginRequestHeader>,
|
||||||
}
|
}
|
||||||
|
|
||||||
|
#[derive(Debug, Deserialize)]
|
||||||
|
#[serde(deny_unknown_fields)]
|
||||||
|
struct PluginServiceWebSocketSendCommandPayload {
|
||||||
|
url: String,
|
||||||
|
text: String,
|
||||||
|
}
|
||||||
|
|
||||||
|
#[derive(Debug, Deserialize)]
|
||||||
|
#[serde(deny_unknown_fields)]
|
||||||
|
struct PluginServiceDiagnosticStatusCommandPayload {
|
||||||
|
#[serde(default)]
|
||||||
|
message: Option<String>,
|
||||||
|
#[serde(default)]
|
||||||
|
status: Option<Value>,
|
||||||
|
}
|
||||||
|
|
||||||
#[derive(Clone, Debug, Deserialize, Serialize)]
|
#[derive(Clone, Debug, Deserialize, Serialize)]
|
||||||
struct PluginWebSocketOpenResponse {
|
struct PluginWebSocketOpenResponse {
|
||||||
handle: u32,
|
handle: u32,
|
||||||
|
|
@ -2957,6 +2976,9 @@ pub enum PluginInstanceDiagnosticKind {
|
||||||
ServiceUnavailable,
|
ServiceUnavailable,
|
||||||
ServiceFailed,
|
ServiceFailed,
|
||||||
ServiceStopped,
|
ServiceStopped,
|
||||||
|
ServiceOutputCommandRecorded,
|
||||||
|
ServiceOutputCommandRejected,
|
||||||
|
ServiceOutputCommandUnsupported,
|
||||||
}
|
}
|
||||||
|
|
||||||
#[derive(Clone, Debug, PartialEq, Eq, Serialize)]
|
#[derive(Clone, Debug, PartialEq, Eq, Serialize)]
|
||||||
|
|
@ -3002,6 +3024,9 @@ pub struct PluginInstanceStatus {
|
||||||
pub queue_capacity: usize,
|
pub queue_capacity: usize,
|
||||||
pub last_error: Option<String>,
|
pub last_error: Option<String>,
|
||||||
pub dispatch_counters: PluginIngressDispatchCounters,
|
pub dispatch_counters: PluginIngressDispatchCounters,
|
||||||
|
/// Last bounded Service output command outcomes. These are produced only by
|
||||||
|
/// Service/Ingress dispatch and are intentionally separate from ToolOutput.
|
||||||
|
pub output_command_results: Vec<PluginServiceOutputCommandResult>,
|
||||||
pub diagnostics: Vec<PluginInstanceDiagnostic>,
|
pub diagnostics: Vec<PluginInstanceDiagnostic>,
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
@ -3035,12 +3060,138 @@ impl PluginIngressEvent {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/// Host-validated output command envelope returned by Service/Ingress handlers.
|
||||||
|
///
|
||||||
|
/// Service output commands are intentionally distinct from ordinary plugin
|
||||||
|
/// `ToolOutput`: handlers can request bounded side effects, but the host parses,
|
||||||
|
/// validates, grant-checks, records diagnostics, and fail-closes before executing
|
||||||
|
/// any supported command.
|
||||||
|
#[derive(Clone, Debug, PartialEq, Serialize, Deserialize)]
|
||||||
|
#[serde(deny_unknown_fields)]
|
||||||
|
pub struct PluginServiceOutputCommandEnvelope {
|
||||||
|
pub correlation_id: String,
|
||||||
|
pub source_event_id: String,
|
||||||
|
pub command_id: String,
|
||||||
|
pub kind: PluginServiceOutputCommandKind,
|
||||||
|
pub payload: Value,
|
||||||
|
pub requested_at: String,
|
||||||
|
}
|
||||||
|
|
||||||
|
#[derive(Clone, Copy, Debug, PartialEq, Eq, Serialize, Deserialize)]
|
||||||
|
#[serde(rename_all = "snake_case")]
|
||||||
|
pub enum PluginServiceOutputCommandKind {
|
||||||
|
DiagnosticStatusUpdate,
|
||||||
|
HostRequestDispatch,
|
||||||
|
#[serde(rename = "websocket_send")]
|
||||||
|
WebSocketSend,
|
||||||
|
}
|
||||||
|
|
||||||
|
impl PluginServiceOutputCommandKind {
|
||||||
|
fn as_str(self) -> &'static str {
|
||||||
|
match self {
|
||||||
|
Self::DiagnosticStatusUpdate => "diagnostic_status_update",
|
||||||
|
Self::HostRequestDispatch => "host_request_dispatch",
|
||||||
|
Self::WebSocketSend => "websocket_send",
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
#[derive(Clone, Debug, PartialEq, Eq, Serialize)]
|
||||||
|
pub enum PluginServiceOutputCommandStatus {
|
||||||
|
Recorded,
|
||||||
|
Rejected,
|
||||||
|
Unsupported,
|
||||||
|
}
|
||||||
|
|
||||||
|
#[derive(Clone, Debug, PartialEq, Serialize)]
|
||||||
|
pub struct PluginServiceOutputCommandResult {
|
||||||
|
pub correlation_id: Option<String>,
|
||||||
|
pub source_event_id: Option<String>,
|
||||||
|
pub command_id: Option<String>,
|
||||||
|
pub kind: Option<PluginServiceOutputCommandKind>,
|
||||||
|
pub status: PluginServiceOutputCommandStatus,
|
||||||
|
pub message: String,
|
||||||
|
pub recorded_at: String,
|
||||||
|
}
|
||||||
|
|
||||||
|
impl PluginServiceOutputCommandResult {
|
||||||
|
fn rejected(message: impl Into<String>) -> Self {
|
||||||
|
Self::from_parts(
|
||||||
|
None,
|
||||||
|
None,
|
||||||
|
None,
|
||||||
|
None,
|
||||||
|
PluginServiceOutputCommandStatus::Rejected,
|
||||||
|
message,
|
||||||
|
)
|
||||||
|
}
|
||||||
|
|
||||||
|
fn rejected_for(
|
||||||
|
command: &PluginServiceOutputCommandEnvelope,
|
||||||
|
message: impl Into<String>,
|
||||||
|
) -> Self {
|
||||||
|
Self::from_command(command, PluginServiceOutputCommandStatus::Rejected, message)
|
||||||
|
}
|
||||||
|
|
||||||
|
fn unsupported(
|
||||||
|
command: &PluginServiceOutputCommandEnvelope,
|
||||||
|
message: impl Into<String>,
|
||||||
|
) -> Self {
|
||||||
|
Self::from_command(
|
||||||
|
command,
|
||||||
|
PluginServiceOutputCommandStatus::Unsupported,
|
||||||
|
message,
|
||||||
|
)
|
||||||
|
}
|
||||||
|
|
||||||
|
fn recorded(command: &PluginServiceOutputCommandEnvelope, message: impl Into<String>) -> Self {
|
||||||
|
Self::from_command(command, PluginServiceOutputCommandStatus::Recorded, message)
|
||||||
|
}
|
||||||
|
|
||||||
|
fn from_command(
|
||||||
|
command: &PluginServiceOutputCommandEnvelope,
|
||||||
|
status: PluginServiceOutputCommandStatus,
|
||||||
|
message: impl Into<String>,
|
||||||
|
) -> Self {
|
||||||
|
Self::from_parts(
|
||||||
|
Some(command.correlation_id.clone()),
|
||||||
|
Some(command.source_event_id.clone()),
|
||||||
|
Some(command.command_id.clone()),
|
||||||
|
Some(command.kind),
|
||||||
|
status,
|
||||||
|
message,
|
||||||
|
)
|
||||||
|
}
|
||||||
|
|
||||||
|
fn from_parts(
|
||||||
|
correlation_id: Option<String>,
|
||||||
|
source_event_id: Option<String>,
|
||||||
|
command_id: Option<String>,
|
||||||
|
kind: Option<PluginServiceOutputCommandKind>,
|
||||||
|
status: PluginServiceOutputCommandStatus,
|
||||||
|
message: impl Into<String>,
|
||||||
|
) -> Self {
|
||||||
|
Self {
|
||||||
|
correlation_id,
|
||||||
|
source_event_id,
|
||||||
|
command_id,
|
||||||
|
kind,
|
||||||
|
status,
|
||||||
|
message: bounded_message(redact_secret_like(&message.into())),
|
||||||
|
recorded_at: chrono::Utc::now().to_rfc3339_opts(chrono::SecondsFormat::Millis, true),
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
#[derive(Clone, Debug, PartialEq, Serialize)]
|
#[derive(Clone, Debug, PartialEq, Serialize)]
|
||||||
pub struct PluginIngressDispatchReport {
|
pub struct PluginIngressDispatchReport {
|
||||||
pub plugin_ref: String,
|
pub plugin_ref: String,
|
||||||
pub ingress: String,
|
pub ingress: String,
|
||||||
pub accepted: bool,
|
pub accepted: bool,
|
||||||
pub output: Value,
|
pub output: Value,
|
||||||
|
/// Results of host-side Service output command parsing/validation/grant-checking.
|
||||||
|
/// This path never feeds ordinary plugin ToolOutput handling.
|
||||||
|
pub output_command_results: Vec<PluginServiceOutputCommandResult>,
|
||||||
pub queue_depth: usize,
|
pub queue_depth: usize,
|
||||||
pub dispatch_counters: PluginIngressDispatchCounters,
|
pub dispatch_counters: PluginIngressDispatchCounters,
|
||||||
pub diagnostics: Vec<PluginInstanceDiagnostic>,
|
pub diagnostics: Vec<PluginInstanceDiagnostic>,
|
||||||
|
|
@ -3185,6 +3336,7 @@ impl PluginInstanceHandle {
|
||||||
ingress_queue_capacity: PLUGIN_SERVICE_INGRESS_QUEUE_CAPACITY,
|
ingress_queue_capacity: PLUGIN_SERVICE_INGRESS_QUEUE_CAPACITY,
|
||||||
dispatch_counters: PluginIngressDispatchCounters::default(),
|
dispatch_counters: PluginIngressDispatchCounters::default(),
|
||||||
last_error: None,
|
last_error: None,
|
||||||
|
output_command_results: Vec::new(),
|
||||||
diagnostics: Vec::new(),
|
diagnostics: Vec::new(),
|
||||||
};
|
};
|
||||||
instance.start()?;
|
instance.start()?;
|
||||||
|
|
@ -3237,6 +3389,7 @@ struct PluginInstance {
|
||||||
ingress_queue_capacity: usize,
|
ingress_queue_capacity: usize,
|
||||||
dispatch_counters: PluginIngressDispatchCounters,
|
dispatch_counters: PluginIngressDispatchCounters,
|
||||||
last_error: Option<String>,
|
last_error: Option<String>,
|
||||||
|
output_command_results: Vec<PluginServiceOutputCommandResult>,
|
||||||
diagnostics: Vec<PluginInstanceDiagnostic>,
|
diagnostics: Vec<PluginInstanceDiagnostic>,
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
@ -3507,10 +3660,12 @@ impl PluginInstance {
|
||||||
ingress_name: &str,
|
ingress_name: &str,
|
||||||
event: PluginIngressEvent,
|
event: PluginIngressEvent,
|
||||||
) -> Result<PluginIngressDispatchReport, PluginWasmError> {
|
) -> Result<PluginIngressDispatchReport, PluginWasmError> {
|
||||||
match &mut self.runtime {
|
let output = match &mut self.runtime {
|
||||||
PluginInstanceRuntime::ComponentToolAdapter => Err(PluginWasmError::Module(
|
PluginInstanceRuntime::ComponentToolAdapter => {
|
||||||
"component tool runtime does not expose ingress dispatch".to_string(),
|
return Err(PluginWasmError::Module(
|
||||||
)),
|
"component tool runtime does not expose ingress dispatch".to_string(),
|
||||||
|
));
|
||||||
|
}
|
||||||
#[cfg(test)]
|
#[cfg(test)]
|
||||||
PluginInstanceRuntime::TestIngress {
|
PluginInstanceRuntime::TestIngress {
|
||||||
tool_calls,
|
tool_calls,
|
||||||
|
|
@ -3525,40 +3680,40 @@ impl PluginInstance {
|
||||||
));
|
));
|
||||||
}
|
}
|
||||||
*ingress_calls += 1;
|
*ingress_calls += 1;
|
||||||
let output = serde_json::json!({
|
let mut output = serde_json::json!({
|
||||||
"ingress": ingress_name,
|
"ingress": ingress_name,
|
||||||
"kind": event.kind,
|
"kind": event.kind.clone(),
|
||||||
"source": event.source,
|
"source": event.source.clone(),
|
||||||
"ingress_name": event.ingress_name,
|
"ingress_name": event.ingress_name.clone(),
|
||||||
"attempt": event.attempt,
|
"attempt": event.attempt,
|
||||||
"correlation_id": event.correlation_id,
|
"correlation_id": event.correlation_id.clone(),
|
||||||
"calls": *tool_calls,
|
"calls": *tool_calls,
|
||||||
"ingress_calls": *ingress_calls,
|
"ingress_calls": *ingress_calls,
|
||||||
"payload": event.payload,
|
"payload": event.payload.clone(),
|
||||||
});
|
});
|
||||||
Ok(PluginIngressDispatchReport {
|
if let (Some(map), Some(commands)) = (
|
||||||
plugin_ref: self.record.identity.to_string(),
|
output.as_object_mut(),
|
||||||
ingress: ingress_name.to_string(),
|
event.payload.get("output_commands").cloned(),
|
||||||
accepted: true,
|
) {
|
||||||
output,
|
map.insert("output_commands".to_string(), commands);
|
||||||
queue_depth: self.ingress_queue.len(),
|
}
|
||||||
dispatch_counters: self.dispatch_counters.clone(),
|
output
|
||||||
diagnostics: self.diagnostics.clone(),
|
|
||||||
})
|
|
||||||
}
|
}
|
||||||
PluginInstanceRuntime::ComponentInstance(runtime) => {
|
PluginInstanceRuntime::ComponentInstance(runtime) => {
|
||||||
let output = runtime.handle_ingress(ingress_name, &event)?;
|
runtime.handle_ingress(ingress_name, &event)?
|
||||||
Ok(PluginIngressDispatchReport {
|
|
||||||
plugin_ref: self.record.identity.to_string(),
|
|
||||||
ingress: ingress_name.to_string(),
|
|
||||||
accepted: true,
|
|
||||||
output,
|
|
||||||
queue_depth: self.ingress_queue.len(),
|
|
||||||
dispatch_counters: self.dispatch_counters.clone(),
|
|
||||||
diagnostics: self.diagnostics.clone(),
|
|
||||||
})
|
|
||||||
}
|
}
|
||||||
}
|
};
|
||||||
|
let output_command_results = self.process_service_output_commands(&output, &event);
|
||||||
|
Ok(PluginIngressDispatchReport {
|
||||||
|
plugin_ref: self.record.identity.to_string(),
|
||||||
|
ingress: ingress_name.to_string(),
|
||||||
|
accepted: true,
|
||||||
|
output,
|
||||||
|
output_command_results,
|
||||||
|
queue_depth: self.ingress_queue.len(),
|
||||||
|
dispatch_counters: self.dispatch_counters.clone(),
|
||||||
|
diagnostics: self.diagnostics.clone(),
|
||||||
|
})
|
||||||
}
|
}
|
||||||
|
|
||||||
fn stop(&mut self) -> Result<(), PluginWasmError> {
|
fn stop(&mut self) -> Result<(), PluginWasmError> {
|
||||||
|
|
@ -3608,6 +3763,7 @@ impl PluginInstance {
|
||||||
queue_capacity: self.ingress_queue_capacity,
|
queue_capacity: self.ingress_queue_capacity,
|
||||||
last_error: self.last_error.clone(),
|
last_error: self.last_error.clone(),
|
||||||
dispatch_counters: self.dispatch_counters.clone(),
|
dispatch_counters: self.dispatch_counters.clone(),
|
||||||
|
output_command_results: self.output_command_results.clone(),
|
||||||
diagnostics: self.diagnostics.clone(),
|
diagnostics: self.diagnostics.clone(),
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
@ -3656,6 +3812,262 @@ impl PluginInstance {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
fn process_service_output_commands(
|
||||||
|
&mut self,
|
||||||
|
output: &Value,
|
||||||
|
event: &PluginIngressEvent,
|
||||||
|
) -> Vec<PluginServiceOutputCommandResult> {
|
||||||
|
let results = match self.validate_service_output_commands(output, event) {
|
||||||
|
Ok(commands) => {
|
||||||
|
let mut results = Vec::with_capacity(commands.len());
|
||||||
|
for command in commands {
|
||||||
|
results.push(self.execute_service_output_command(command));
|
||||||
|
}
|
||||||
|
results
|
||||||
|
}
|
||||||
|
Err(results) => results,
|
||||||
|
};
|
||||||
|
self.record_service_output_command_results(&results);
|
||||||
|
results
|
||||||
|
}
|
||||||
|
|
||||||
|
fn validate_service_output_commands(
|
||||||
|
&self,
|
||||||
|
output: &Value,
|
||||||
|
event: &PluginIngressEvent,
|
||||||
|
) -> Result<Vec<PluginServiceOutputCommandEnvelope>, Vec<PluginServiceOutputCommandResult>>
|
||||||
|
{
|
||||||
|
let Some(values) = output.get("output_commands") else {
|
||||||
|
return Ok(Vec::new());
|
||||||
|
};
|
||||||
|
let Some(values) = values.as_array() else {
|
||||||
|
return Err(vec![PluginServiceOutputCommandResult::rejected(
|
||||||
|
"service output_commands must be an array",
|
||||||
|
)]);
|
||||||
|
};
|
||||||
|
if values.len() > PLUGIN_SERVICE_OUTPUT_COMMAND_MAX_COUNT {
|
||||||
|
return Err(vec![PluginServiceOutputCommandResult::rejected(format!(
|
||||||
|
"service output_commands exceeds {} commands",
|
||||||
|
PLUGIN_SERVICE_OUTPUT_COMMAND_MAX_COUNT
|
||||||
|
))]);
|
||||||
|
}
|
||||||
|
|
||||||
|
let mut commands = Vec::with_capacity(values.len());
|
||||||
|
let mut rejected = Vec::new();
|
||||||
|
for value in values {
|
||||||
|
let command: PluginServiceOutputCommandEnvelope =
|
||||||
|
match serde_json::from_value(value.clone()) {
|
||||||
|
Ok(command) => command,
|
||||||
|
Err(error) => {
|
||||||
|
rejected.push(PluginServiceOutputCommandResult::rejected(format!(
|
||||||
|
"malformed service output command envelope: {error}"
|
||||||
|
)));
|
||||||
|
continue;
|
||||||
|
}
|
||||||
|
};
|
||||||
|
if let Err(message) = self.validate_service_output_command_envelope(&command, event) {
|
||||||
|
rejected.push(PluginServiceOutputCommandResult::rejected_for(
|
||||||
|
&command, message,
|
||||||
|
));
|
||||||
|
continue;
|
||||||
|
}
|
||||||
|
if let Err(message) = self.grant_check_service_output_command(&command) {
|
||||||
|
rejected.push(PluginServiceOutputCommandResult::rejected_for(
|
||||||
|
&command, message,
|
||||||
|
));
|
||||||
|
continue;
|
||||||
|
}
|
||||||
|
commands.push(command);
|
||||||
|
}
|
||||||
|
|
||||||
|
if rejected.is_empty() {
|
||||||
|
Ok(commands)
|
||||||
|
} else {
|
||||||
|
Err(rejected)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
fn validate_service_output_command_envelope(
|
||||||
|
&self,
|
||||||
|
command: &PluginServiceOutputCommandEnvelope,
|
||||||
|
event: &PluginIngressEvent,
|
||||||
|
) -> Result<(), String> {
|
||||||
|
validate_output_command_id("correlation_id", &command.correlation_id)?;
|
||||||
|
validate_output_command_id("source_event_id", &command.source_event_id)?;
|
||||||
|
validate_output_command_id("command_id", &command.command_id)?;
|
||||||
|
if command.source_event_id != event.correlation_id {
|
||||||
|
return Err("source_event_id must match the ingress event correlation_id".to_string());
|
||||||
|
}
|
||||||
|
chrono::DateTime::parse_from_rfc3339(&command.requested_at)
|
||||||
|
.map_err(|error| format!("requested_at must be RFC3339: {error}"))?;
|
||||||
|
let payload_bytes = serde_json::to_vec(&command.payload)
|
||||||
|
.map_err(|error| format!("payload is not serializable JSON: {error}"))?;
|
||||||
|
if payload_bytes.len() > PLUGIN_SERVICE_OUTPUT_COMMAND_MAX_PAYLOAD_BYTES {
|
||||||
|
return Err(format!(
|
||||||
|
"payload exceeds {} bytes",
|
||||||
|
PLUGIN_SERVICE_OUTPUT_COMMAND_MAX_PAYLOAD_BYTES
|
||||||
|
));
|
||||||
|
}
|
||||||
|
match command.kind {
|
||||||
|
PluginServiceOutputCommandKind::DiagnosticStatusUpdate => {
|
||||||
|
serde_json::from_value::<PluginServiceDiagnosticStatusCommandPayload>(
|
||||||
|
command.payload.clone(),
|
||||||
|
)
|
||||||
|
.map_err(|error| format!("invalid diagnostic_status_update payload: {error}"))?;
|
||||||
|
}
|
||||||
|
PluginServiceOutputCommandKind::HostRequestDispatch => {
|
||||||
|
let request: PluginRequestRequest = serde_json::from_value(command.payload.clone())
|
||||||
|
.map_err(|error| format!("invalid host_request_dispatch payload: {error}"))?;
|
||||||
|
validate_plugin_request_request(&self.record, &request)
|
||||||
|
.map_err(|error| format!("host_request_dispatch target denied: {}", error.0))?;
|
||||||
|
}
|
||||||
|
PluginServiceOutputCommandKind::WebSocketSend => {
|
||||||
|
let payload: PluginServiceWebSocketSendCommandPayload =
|
||||||
|
serde_json::from_value(command.payload.clone())
|
||||||
|
.map_err(|error| format!("invalid websocket_send payload: {error}"))?;
|
||||||
|
self.validate_service_websocket_send_payload(&payload)?;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
Ok(())
|
||||||
|
}
|
||||||
|
|
||||||
|
fn grant_check_service_output_command(
|
||||||
|
&self,
|
||||||
|
command: &PluginServiceOutputCommandEnvelope,
|
||||||
|
) -> Result<(), String> {
|
||||||
|
match command.kind {
|
||||||
|
PluginServiceOutputCommandKind::DiagnosticStatusUpdate => Ok(()),
|
||||||
|
PluginServiceOutputCommandKind::HostRequestDispatch => {
|
||||||
|
authorize_plugin_host_api(&self.record, PluginHostApi::Request)
|
||||||
|
.map_err(|error| format!("host_request_dispatch not granted: {}", error.0))
|
||||||
|
}
|
||||||
|
PluginServiceOutputCommandKind::WebSocketSend => {
|
||||||
|
authorize_plugin_host_api(&self.record, PluginHostApi::WebSocket)
|
||||||
|
.map_err(|error| format!("websocket_send not granted: {}", error.0))
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
fn validate_service_websocket_send_payload(
|
||||||
|
&self,
|
||||||
|
payload: &PluginServiceWebSocketSendCommandPayload,
|
||||||
|
) -> Result<(), String> {
|
||||||
|
if payload.text.len() > PLUGIN_WEBSOCKET_MAX_MESSAGE_BYTES {
|
||||||
|
return Err(format!(
|
||||||
|
"websocket_send text exceeds {} bytes",
|
||||||
|
PLUGIN_WEBSOCKET_MAX_MESSAGE_BYTES
|
||||||
|
));
|
||||||
|
}
|
||||||
|
let url = reqwest::Url::parse(&payload.url)
|
||||||
|
.map_err(|error| format!("invalid WebSocket URL: {error}"))?;
|
||||||
|
match url.scheme() {
|
||||||
|
"ws" | "wss" => {}
|
||||||
|
"http" | "https" => {
|
||||||
|
return Err("HTTP URLs are not supported by websocket_send".to_string());
|
||||||
|
}
|
||||||
|
scheme => {
|
||||||
|
return Err(format!(
|
||||||
|
"unsupported WebSocket URL scheme {scheme:?}; only ws and wss are allowed"
|
||||||
|
));
|
||||||
|
}
|
||||||
|
}
|
||||||
|
if url.host_str().is_none() {
|
||||||
|
return Err("WebSocket URL must include a host".to_string());
|
||||||
|
}
|
||||||
|
if !url.username().is_empty() || url.password().is_some() {
|
||||||
|
return Err("WebSocket URLs with embedded credentials are not allowed".to_string());
|
||||||
|
}
|
||||||
|
validate_static_request_target(&url).map_err(|error| error.0)?;
|
||||||
|
authorize_websocket_allowlist(&self.record, &url).map_err(|error| error.0)?;
|
||||||
|
Ok(())
|
||||||
|
}
|
||||||
|
|
||||||
|
fn execute_service_output_command(
|
||||||
|
&mut self,
|
||||||
|
command: PluginServiceOutputCommandEnvelope,
|
||||||
|
) -> PluginServiceOutputCommandResult {
|
||||||
|
match command.kind {
|
||||||
|
PluginServiceOutputCommandKind::DiagnosticStatusUpdate => {
|
||||||
|
let payload: PluginServiceDiagnosticStatusCommandPayload =
|
||||||
|
match serde_json::from_value(command.payload.clone()) {
|
||||||
|
Ok(payload) => payload,
|
||||||
|
Err(error) => {
|
||||||
|
return PluginServiceOutputCommandResult::rejected_for(
|
||||||
|
&command,
|
||||||
|
format!("invalid diagnostic_status_update payload: {error}"),
|
||||||
|
);
|
||||||
|
}
|
||||||
|
};
|
||||||
|
if let Some(status) = payload.status {
|
||||||
|
self.component_status = Some(status);
|
||||||
|
}
|
||||||
|
let message = payload
|
||||||
|
.message
|
||||||
|
.as_deref()
|
||||||
|
.map(bounded_message)
|
||||||
|
.unwrap_or_else(|| "plugin service status update recorded".to_string());
|
||||||
|
PluginServiceOutputCommandResult::recorded(&command, message)
|
||||||
|
}
|
||||||
|
PluginServiceOutputCommandKind::HostRequestDispatch => {
|
||||||
|
PluginServiceOutputCommandResult::unsupported(
|
||||||
|
&command,
|
||||||
|
"host_request_dispatch output command is grant-checked but transport dispatch is unsupported in v0",
|
||||||
|
)
|
||||||
|
}
|
||||||
|
PluginServiceOutputCommandKind::WebSocketSend => {
|
||||||
|
PluginServiceOutputCommandResult::unsupported(
|
||||||
|
&command,
|
||||||
|
"websocket_send output command is grant-checked but WebSocket send transport is unsupported in v0",
|
||||||
|
)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
fn record_service_output_command_results(
|
||||||
|
&mut self,
|
||||||
|
results: &[PluginServiceOutputCommandResult],
|
||||||
|
) {
|
||||||
|
if results.is_empty() {
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
for result in results {
|
||||||
|
let kind = match result.status {
|
||||||
|
PluginServiceOutputCommandStatus::Recorded => {
|
||||||
|
PluginInstanceDiagnosticKind::ServiceOutputCommandRecorded
|
||||||
|
}
|
||||||
|
PluginServiceOutputCommandStatus::Rejected => {
|
||||||
|
PluginInstanceDiagnosticKind::ServiceOutputCommandRejected
|
||||||
|
}
|
||||||
|
PluginServiceOutputCommandStatus::Unsupported => {
|
||||||
|
PluginInstanceDiagnosticKind::ServiceOutputCommandUnsupported
|
||||||
|
}
|
||||||
|
};
|
||||||
|
let command_label = result.command_id.as_deref().unwrap_or("<malformed>");
|
||||||
|
let command_kind = result
|
||||||
|
.kind
|
||||||
|
.map(PluginServiceOutputCommandKind::as_str)
|
||||||
|
.unwrap_or("unknown");
|
||||||
|
let message = bounded_message(format!(
|
||||||
|
"service output command {command_label} ({command_kind}): {}",
|
||||||
|
result.message
|
||||||
|
));
|
||||||
|
if !matches!(result.status, PluginServiceOutputCommandStatus::Recorded) {
|
||||||
|
self.last_error = Some(message.clone());
|
||||||
|
}
|
||||||
|
self.diagnostics.push(PluginInstanceDiagnostic::with_kind(
|
||||||
|
kind,
|
||||||
|
self.lifecycle.clone(),
|
||||||
|
message,
|
||||||
|
));
|
||||||
|
}
|
||||||
|
self.output_command_results.extend_from_slice(results);
|
||||||
|
if self.output_command_results.len() > PLUGIN_SERVICE_OUTPUT_COMMAND_MAX_RESULTS {
|
||||||
|
let keep_from =
|
||||||
|
self.output_command_results.len() - PLUGIN_SERVICE_OUTPUT_COMMAND_MAX_RESULTS;
|
||||||
|
self.output_command_results.drain(0..keep_from);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
fn record_runtime_error(
|
fn record_runtime_error(
|
||||||
&mut self,
|
&mut self,
|
||||||
kind: PluginInstanceDiagnosticKind,
|
kind: PluginInstanceDiagnosticKind,
|
||||||
|
|
@ -4413,6 +4825,15 @@ fn bounded_message(message: impl Into<String>) -> String {
|
||||||
sanitized
|
sanitized
|
||||||
}
|
}
|
||||||
|
|
||||||
|
fn validate_output_command_id(field: &str, value: &str) -> Result<(), String> {
|
||||||
|
if value.is_empty() || value.len() > 128 || value.chars().any(char::is_control) {
|
||||||
|
return Err(format!(
|
||||||
|
"{field} is empty, too long, or contains control characters"
|
||||||
|
));
|
||||||
|
}
|
||||||
|
Ok(())
|
||||||
|
}
|
||||||
|
|
||||||
fn validate_declared_tool_names(record: &ResolvedPluginRecord) -> Result<(), FeatureInstallError> {
|
fn validate_declared_tool_names(record: &ResolvedPluginRecord) -> Result<(), FeatureInstallError> {
|
||||||
let mut seen = HashSet::new();
|
let mut seen = HashSet::new();
|
||||||
for tool in &record.manifest.tools {
|
for tool in &record.manifest.tools {
|
||||||
|
|
@ -4818,6 +5239,51 @@ mod tests {
|
||||||
PluginIngressEvent::new(ingress_name, "test", "unit", payload)
|
PluginIngressEvent::new(ingress_name, "test", "unit", payload)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
fn service_output_command(
|
||||||
|
event: &PluginIngressEvent,
|
||||||
|
command_id: &str,
|
||||||
|
kind: &str,
|
||||||
|
payload: Value,
|
||||||
|
) -> Value {
|
||||||
|
json!({
|
||||||
|
"correlation_id": event.correlation_id.clone(),
|
||||||
|
"source_event_id": event.correlation_id.clone(),
|
||||||
|
"command_id": command_id,
|
||||||
|
"kind": kind,
|
||||||
|
"payload": payload,
|
||||||
|
"requested_at": event.created_at.clone(),
|
||||||
|
})
|
||||||
|
}
|
||||||
|
|
||||||
|
fn add_request_output_grant(record: &mut ResolvedPluginRecord) {
|
||||||
|
let permission = PluginPermission::host_api(PluginHostApi::Request);
|
||||||
|
record.manifest.permissions.push(permission.clone());
|
||||||
|
record.grants.permissions.push(permission);
|
||||||
|
let target = PluginRequestGrant {
|
||||||
|
scheme: "https".to_string(),
|
||||||
|
host: "api.example.test".to_string(),
|
||||||
|
port: None,
|
||||||
|
methods: vec!["POST".to_string()],
|
||||||
|
path_prefixes: vec!["/v1".to_string()],
|
||||||
|
};
|
||||||
|
record.manifest.request.push(target.clone());
|
||||||
|
record.grants.request.push(target);
|
||||||
|
}
|
||||||
|
|
||||||
|
fn add_websocket_output_grant(record: &mut ResolvedPluginRecord) {
|
||||||
|
let permission = PluginPermission::host_api(PluginHostApi::WebSocket);
|
||||||
|
record.manifest.permissions.push(permission.clone());
|
||||||
|
record.grants.permissions.push(permission);
|
||||||
|
let target = PluginWebSocketGrant {
|
||||||
|
scheme: "wss".to_string(),
|
||||||
|
host: "ws.example.test".to_string(),
|
||||||
|
port: None,
|
||||||
|
path_prefixes: vec!["/events".to_string()],
|
||||||
|
};
|
||||||
|
record.manifest.websocket.push(target.clone());
|
||||||
|
record.grants.websocket.push(target);
|
||||||
|
}
|
||||||
|
|
||||||
#[test]
|
#[test]
|
||||||
fn service_selected_ignores_unselected_tool_without_grants() {
|
fn service_selected_ignores_unselected_tool_without_grants() {
|
||||||
let mut record = record(vec![tool("hidden_tool")]);
|
let mut record = record(vec![tool("hidden_tool")]);
|
||||||
|
|
@ -5060,6 +5526,147 @@ mod tests {
|
||||||
);
|
);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
#[test]
|
||||||
|
fn service_output_command_records_diagnostic_status_separately_from_tool_output() {
|
||||||
|
let handle = PluginInstanceHandle::new(test_service_ingress_record()).unwrap();
|
||||||
|
let mut event = test_ingress_event("shared_ingress", json!({}));
|
||||||
|
let command = service_output_command(
|
||||||
|
&event,
|
||||||
|
"cmd-status",
|
||||||
|
"diagnostic_status_update",
|
||||||
|
json!({
|
||||||
|
"message": "service became ready",
|
||||||
|
"status": {"ready": true}
|
||||||
|
}),
|
||||||
|
);
|
||||||
|
event.payload = json!({"output_commands": [command]});
|
||||||
|
|
||||||
|
let report = handle.deliver_ingress("shared_ingress", event).unwrap();
|
||||||
|
|
||||||
|
assert_eq!(report.output_command_results.len(), 1);
|
||||||
|
assert_eq!(
|
||||||
|
report.output_command_results[0].kind,
|
||||||
|
Some(PluginServiceOutputCommandKind::DiagnosticStatusUpdate)
|
||||||
|
);
|
||||||
|
assert_eq!(
|
||||||
|
report.output_command_results[0].status,
|
||||||
|
PluginServiceOutputCommandStatus::Recorded
|
||||||
|
);
|
||||||
|
assert_eq!(
|
||||||
|
report.output["payload"]["output_commands"]
|
||||||
|
.as_array()
|
||||||
|
.unwrap()
|
||||||
|
.len(),
|
||||||
|
1
|
||||||
|
);
|
||||||
|
let status = handle.status();
|
||||||
|
assert_eq!(status.component_status, Some(json!({"ready": true})));
|
||||||
|
assert_eq!(status.output_command_results.len(), 1);
|
||||||
|
assert!(status.diagnostics.iter().any(|diagnostic| {
|
||||||
|
diagnostic.kind == PluginInstanceDiagnosticKind::ServiceOutputCommandRecorded
|
||||||
|
&& diagnostic.message.contains("cmd-status")
|
||||||
|
}));
|
||||||
|
}
|
||||||
|
|
||||||
|
#[test]
|
||||||
|
fn service_output_command_rejects_ungranted_request_without_executing_status_update() {
|
||||||
|
let handle = PluginInstanceHandle::new(test_service_ingress_record()).unwrap();
|
||||||
|
let mut event = test_ingress_event("shared_ingress", json!({}));
|
||||||
|
let status_command = service_output_command(
|
||||||
|
&event,
|
||||||
|
"cmd-status",
|
||||||
|
"diagnostic_status_update",
|
||||||
|
json!({"status": {"should_not_record": true}}),
|
||||||
|
);
|
||||||
|
let request_command = service_output_command(
|
||||||
|
&event,
|
||||||
|
"cmd-request",
|
||||||
|
"host_request_dispatch",
|
||||||
|
json!({
|
||||||
|
"method": "POST",
|
||||||
|
"url": "https://api.example.test/v1/events"
|
||||||
|
}),
|
||||||
|
);
|
||||||
|
event.payload = json!({"output_commands": [status_command, request_command]});
|
||||||
|
|
||||||
|
let report = handle.deliver_ingress("shared_ingress", event).unwrap();
|
||||||
|
|
||||||
|
assert_eq!(report.output_command_results.len(), 1);
|
||||||
|
assert_eq!(
|
||||||
|
report.output_command_results[0].status,
|
||||||
|
PluginServiceOutputCommandStatus::Rejected
|
||||||
|
);
|
||||||
|
assert_eq!(handle.status().component_status, None);
|
||||||
|
assert!(handle.status().diagnostics.iter().any(|diagnostic| {
|
||||||
|
diagnostic.kind == PluginInstanceDiagnosticKind::ServiceOutputCommandRejected
|
||||||
|
&& diagnostic.message.contains("cmd-request")
|
||||||
|
}));
|
||||||
|
}
|
||||||
|
|
||||||
|
#[test]
|
||||||
|
fn service_output_command_placeholders_are_grant_checked_and_unsupported() {
|
||||||
|
let mut record = test_service_ingress_record();
|
||||||
|
add_request_output_grant(&mut record);
|
||||||
|
add_websocket_output_grant(&mut record);
|
||||||
|
let handle = PluginInstanceHandle::new(record).unwrap();
|
||||||
|
let mut event = test_ingress_event("shared_ingress", json!({}));
|
||||||
|
let request_command = service_output_command(
|
||||||
|
&event,
|
||||||
|
"cmd-request",
|
||||||
|
"host_request_dispatch",
|
||||||
|
json!({
|
||||||
|
"method": "POST",
|
||||||
|
"url": "https://api.example.test/v1/events"
|
||||||
|
}),
|
||||||
|
);
|
||||||
|
let websocket_command = service_output_command(
|
||||||
|
&event,
|
||||||
|
"cmd-websocket",
|
||||||
|
"websocket_send",
|
||||||
|
json!({
|
||||||
|
"url": "wss://ws.example.test/events",
|
||||||
|
"text": "hello"
|
||||||
|
}),
|
||||||
|
);
|
||||||
|
event.payload = json!({"output_commands": [request_command, websocket_command]});
|
||||||
|
|
||||||
|
let report = handle.deliver_ingress("shared_ingress", event).unwrap();
|
||||||
|
|
||||||
|
assert_eq!(report.output_command_results.len(), 2);
|
||||||
|
assert!(
|
||||||
|
report
|
||||||
|
.output_command_results
|
||||||
|
.iter()
|
||||||
|
.all(|result| { result.status == PluginServiceOutputCommandStatus::Unsupported })
|
||||||
|
);
|
||||||
|
let status = handle.status();
|
||||||
|
assert_eq!(status.output_command_results.len(), 2);
|
||||||
|
assert!(status.diagnostics.iter().any(|diagnostic| {
|
||||||
|
diagnostic.kind == PluginInstanceDiagnosticKind::ServiceOutputCommandUnsupported
|
||||||
|
&& diagnostic.message.contains("cmd-websocket")
|
||||||
|
}));
|
||||||
|
}
|
||||||
|
|
||||||
|
#[test]
|
||||||
|
fn service_output_command_rejects_malformed_envelope() {
|
||||||
|
let handle = PluginInstanceHandle::new(test_service_ingress_record()).unwrap();
|
||||||
|
let mut event = test_ingress_event("shared_ingress", json!({}));
|
||||||
|
event.payload = json!({"output_commands": [{"kind": "diagnostic_status_update"}]});
|
||||||
|
|
||||||
|
let report = handle.deliver_ingress("shared_ingress", event).unwrap();
|
||||||
|
|
||||||
|
assert_eq!(report.output_command_results.len(), 1);
|
||||||
|
assert_eq!(
|
||||||
|
report.output_command_results[0].status,
|
||||||
|
PluginServiceOutputCommandStatus::Rejected
|
||||||
|
);
|
||||||
|
assert!(
|
||||||
|
report.output_command_results[0]
|
||||||
|
.message
|
||||||
|
.contains("malformed service output command envelope")
|
||||||
|
);
|
||||||
|
}
|
||||||
|
|
||||||
#[test]
|
#[test]
|
||||||
fn installed_ingress_dispatch_uses_retained_shared_instance() {
|
fn installed_ingress_dispatch_uses_retained_shared_instance() {
|
||||||
let mut record = record(vec![tool("shared_tool")]);
|
let mut record = record(vec![tool("shared_tool")]);
|
||||||
|
|
@ -5144,6 +5751,7 @@ mod tests {
|
||||||
ingress_queue_capacity: PLUGIN_SERVICE_INGRESS_QUEUE_CAPACITY,
|
ingress_queue_capacity: PLUGIN_SERVICE_INGRESS_QUEUE_CAPACITY,
|
||||||
dispatch_counters: PluginIngressDispatchCounters::default(),
|
dispatch_counters: PluginIngressDispatchCounters::default(),
|
||||||
last_error: None,
|
last_error: None,
|
||||||
|
output_command_results: Vec::new(),
|
||||||
diagnostics: Vec::new(),
|
diagnostics: Vec::new(),
|
||||||
})));
|
})));
|
||||||
|
|
||||||
|
|
|
||||||
Loading…
Reference in New Issue
Block a user