merge: mcp list changed handling
This commit is contained in:
commit
ae5f3e425b
|
|
@ -1,4 +1,4 @@
|
||||||
use std::collections::{BTreeMap, VecDeque};
|
use std::collections::{BTreeMap, BTreeSet, VecDeque};
|
||||||
use std::env;
|
use std::env;
|
||||||
use std::fmt;
|
use std::fmt;
|
||||||
use std::path::PathBuf;
|
use std::path::PathBuf;
|
||||||
|
|
@ -350,6 +350,67 @@ pub struct McpContentBlock {
|
||||||
pub fields: BTreeMap<String, Value>,
|
pub fields: BTreeMap<String, Value>,
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/// MCP list surface whose `notifications/*/list_changed` signal was observed.
|
||||||
|
///
|
||||||
|
/// The notification is only a freshness signal. The stdio client records this
|
||||||
|
/// bounded enum state and deliberately ignores notification params so a server
|
||||||
|
/// cannot inject resource/prompt content or alter model-visible tool schemas
|
||||||
|
/// through an out-of-band notification.
|
||||||
|
#[derive(Debug, Clone, Copy, PartialEq, Eq, PartialOrd, Ord)]
|
||||||
|
pub enum McpListChangedKind {
|
||||||
|
Tools,
|
||||||
|
Resources,
|
||||||
|
Prompts,
|
||||||
|
}
|
||||||
|
|
||||||
|
impl McpListChangedKind {
|
||||||
|
fn from_notification_method(method: &str) -> Option<Self> {
|
||||||
|
match method {
|
||||||
|
"notifications/tools/list_changed" => Some(Self::Tools),
|
||||||
|
"notifications/resources/list_changed" => Some(Self::Resources),
|
||||||
|
"notifications/prompts/list_changed" => Some(Self::Prompts),
|
||||||
|
_ => None,
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
pub fn notification_method(self) -> &'static str {
|
||||||
|
match self {
|
||||||
|
Self::Tools => "notifications/tools/list_changed",
|
||||||
|
Self::Resources => "notifications/resources/list_changed",
|
||||||
|
Self::Prompts => "notifications/prompts/list_changed",
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
pub fn list_method(self) -> &'static str {
|
||||||
|
match self {
|
||||||
|
Self::Tools => "tools/list",
|
||||||
|
Self::Resources => "resources/list",
|
||||||
|
Self::Prompts => "prompts/list",
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Bounded snapshot of list-change signals observed from one stdio server.
|
||||||
|
#[derive(Debug, Clone)]
|
||||||
|
pub struct McpListChangedSnapshot {
|
||||||
|
pub server_name: String,
|
||||||
|
kinds: BTreeSet<McpListChangedKind>,
|
||||||
|
}
|
||||||
|
|
||||||
|
impl McpListChangedSnapshot {
|
||||||
|
pub fn is_empty(&self) -> bool {
|
||||||
|
self.kinds.is_empty()
|
||||||
|
}
|
||||||
|
|
||||||
|
pub fn contains(&self, kind: McpListChangedKind) -> bool {
|
||||||
|
self.kinds.contains(&kind)
|
||||||
|
}
|
||||||
|
|
||||||
|
pub fn kinds(&self) -> impl Iterator<Item = McpListChangedKind> + '_ {
|
||||||
|
self.kinds.iter().copied()
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
/// A resolved, explicit local stdio MCP server process specification.
|
/// A resolved, explicit local stdio MCP server process specification.
|
||||||
#[derive(Clone)]
|
#[derive(Clone)]
|
||||||
pub struct McpStdioServerSpec {
|
pub struct McpStdioServerSpec {
|
||||||
|
|
@ -515,6 +576,7 @@ pub struct McpStdioClient {
|
||||||
limits: McpStdioLimits,
|
limits: McpStdioLimits,
|
||||||
redactor: Redactor,
|
redactor: Redactor,
|
||||||
diagnostics: Arc<Mutex<BoundedDiagnostics>>,
|
diagnostics: Arc<Mutex<BoundedDiagnostics>>,
|
||||||
|
list_changes: Arc<Mutex<BoundedListChanged>>,
|
||||||
stdin: Arc<Mutex<Option<ChildStdin>>>,
|
stdin: Arc<Mutex<Option<ChildStdin>>>,
|
||||||
child: Option<Child>,
|
child: Option<Child>,
|
||||||
responses: mpsc::Receiver<ReaderEvent>,
|
responses: mpsc::Receiver<ReaderEvent>,
|
||||||
|
|
@ -607,6 +669,7 @@ impl McpStdioClient {
|
||||||
limits.max_diagnostic_lines,
|
limits.max_diagnostic_lines,
|
||||||
redactor.clone(),
|
redactor.clone(),
|
||||||
)));
|
)));
|
||||||
|
let list_changes = Arc::new(Mutex::new(BoundedListChanged::new(spec.name.clone())));
|
||||||
let (tx, rx) = mpsc::channel(16);
|
let (tx, rx) = mpsc::channel(16);
|
||||||
let reader_task = spawn_stdout_reader(
|
let reader_task = spawn_stdout_reader(
|
||||||
spec.name.clone(),
|
spec.name.clone(),
|
||||||
|
|
@ -615,6 +678,7 @@ impl McpStdioClient {
|
||||||
tx,
|
tx,
|
||||||
limits.clone(),
|
limits.clone(),
|
||||||
redactor.clone(),
|
redactor.clone(),
|
||||||
|
list_changes.clone(),
|
||||||
);
|
);
|
||||||
let stderr_task = spawn_stderr_reader(stderr, diagnostics.clone(), limits.clone());
|
let stderr_task = spawn_stderr_reader(stderr, diagnostics.clone(), limits.clone());
|
||||||
|
|
||||||
|
|
@ -623,6 +687,7 @@ impl McpStdioClient {
|
||||||
limits,
|
limits,
|
||||||
redactor,
|
redactor,
|
||||||
diagnostics,
|
diagnostics,
|
||||||
|
list_changes,
|
||||||
stdin,
|
stdin,
|
||||||
child: Some(child),
|
child: Some(child),
|
||||||
responses: rx,
|
responses: rx,
|
||||||
|
|
@ -808,6 +873,21 @@ impl McpStdioClient {
|
||||||
self.diagnostics.lock().await.snapshot()
|
self.diagnostics.lock().await.snapshot()
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/// Return bounded list-change signals observed so far for this connection.
|
||||||
|
///
|
||||||
|
/// This is diagnostic/freshness state only. It never contains notification
|
||||||
|
/// params and must not be used to mutate an active run's model-visible tool
|
||||||
|
/// schema outside an explicit safe boundary.
|
||||||
|
pub async fn snapshot_list_changes(&self) -> McpListChangedSnapshot {
|
||||||
|
self.list_changes.lock().await.snapshot()
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Clear observed list-change signals before an explicit safe-boundary
|
||||||
|
/// refresh. New notifications received after this call will be recorded.
|
||||||
|
pub async fn clear_list_changes(&self) {
|
||||||
|
self.list_changes.lock().await.clear();
|
||||||
|
}
|
||||||
|
|
||||||
pub async fn request<T: for<'de> Deserialize<'de>>(
|
pub async fn request<T: for<'de> Deserialize<'de>>(
|
||||||
&mut self,
|
&mut self,
|
||||||
phase: McpPhase,
|
phase: McpPhase,
|
||||||
|
|
@ -1235,6 +1315,7 @@ fn spawn_stdout_reader(
|
||||||
tx: mpsc::Sender<ReaderEvent>,
|
tx: mpsc::Sender<ReaderEvent>,
|
||||||
limits: McpStdioLimits,
|
limits: McpStdioLimits,
|
||||||
redactor: Redactor,
|
redactor: Redactor,
|
||||||
|
list_changes: Arc<Mutex<BoundedListChanged>>,
|
||||||
) -> JoinHandle<()> {
|
) -> JoinHandle<()> {
|
||||||
tokio::spawn(async move {
|
tokio::spawn(async move {
|
||||||
let mut stdout = BufReader::new(stdout);
|
let mut stdout = BufReader::new(stdout);
|
||||||
|
|
@ -1248,6 +1329,7 @@ fn spawn_stdout_reader(
|
||||||
&tx,
|
&tx,
|
||||||
&limits,
|
&limits,
|
||||||
&redactor,
|
&redactor,
|
||||||
|
&list_changes,
|
||||||
message,
|
message,
|
||||||
)
|
)
|
||||||
.await
|
.await
|
||||||
|
|
@ -1290,6 +1372,7 @@ async fn handle_incoming_message(
|
||||||
tx: &mpsc::Sender<ReaderEvent>,
|
tx: &mpsc::Sender<ReaderEvent>,
|
||||||
limits: &McpStdioLimits,
|
limits: &McpStdioLimits,
|
||||||
redactor: &Redactor,
|
redactor: &Redactor,
|
||||||
|
list_changes: &Arc<Mutex<BoundedListChanged>>,
|
||||||
message: IncomingMessage,
|
message: IncomingMessage,
|
||||||
) {
|
) {
|
||||||
if message.method.is_some() && message.id.is_some() {
|
if message.method.is_some() && message.id.is_some() {
|
||||||
|
|
@ -1315,7 +1398,10 @@ async fn handle_incoming_message(
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
|
|
||||||
if message.method.is_some() {
|
if let Some(method) = message.method.as_deref() {
|
||||||
|
if let Some(kind) = McpListChangedKind::from_notification_method(method) {
|
||||||
|
list_changes.lock().await.mark(kind);
|
||||||
|
}
|
||||||
let _ = tx.send(ReaderEvent::Notification).await;
|
let _ = tx.send(ReaderEvent::Notification).await;
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
|
|
@ -1342,6 +1428,36 @@ async fn handle_incoming_message(
|
||||||
.await;
|
.await;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
#[derive(Debug)]
|
||||||
|
struct BoundedListChanged {
|
||||||
|
server_name: String,
|
||||||
|
kinds: BTreeSet<McpListChangedKind>,
|
||||||
|
}
|
||||||
|
|
||||||
|
impl BoundedListChanged {
|
||||||
|
fn new(server_name: String) -> Self {
|
||||||
|
Self {
|
||||||
|
server_name,
|
||||||
|
kinds: BTreeSet::new(),
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
fn mark(&mut self, kind: McpListChangedKind) {
|
||||||
|
self.kinds.insert(kind);
|
||||||
|
}
|
||||||
|
|
||||||
|
fn clear(&mut self) {
|
||||||
|
self.kinds.clear();
|
||||||
|
}
|
||||||
|
|
||||||
|
fn snapshot(&self) -> McpListChangedSnapshot {
|
||||||
|
McpListChangedSnapshot {
|
||||||
|
server_name: self.server_name.clone(),
|
||||||
|
kinds: self.kinds.clone(),
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
fn spawn_stderr_reader(
|
fn spawn_stderr_reader(
|
||||||
stderr: ChildStderr,
|
stderr: ChildStderr,
|
||||||
diagnostics: Arc<Mutex<BoundedDiagnostics>>,
|
diagnostics: Arc<Mutex<BoundedDiagnostics>>,
|
||||||
|
|
|
||||||
31
crates/mcp/tests/fixtures/mock_server.rs
vendored
31
crates/mcp/tests/fixtures/mock_server.rs
vendored
|
|
@ -16,6 +16,7 @@ fn main() {
|
||||||
"tools-call-forbidden" => tools_call_forbidden(),
|
"tools-call-forbidden" => tools_call_forbidden(),
|
||||||
"fail-init" => fail_init(),
|
"fail-init" => fail_init(),
|
||||||
"sampling" => sampling_request(),
|
"sampling" => sampling_request(),
|
||||||
|
"list-changed-all" => list_changed_all(),
|
||||||
"shutdown-hang" => shutdown_hang(),
|
"shutdown-hang" => shutdown_hang(),
|
||||||
other => panic!("unknown mock mode: {other}"),
|
other => panic!("unknown mock mode: {other}"),
|
||||||
}
|
}
|
||||||
|
|
@ -223,6 +224,36 @@ fn sampling_request() {
|
||||||
assert_eq!(response["error"]["code"], -32601);
|
assert_eq!(response["error"]["code"], -32601);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
fn list_changed_all() {
|
||||||
|
let init = read_json();
|
||||||
|
write_json(json!({
|
||||||
|
"jsonrpc": "2.0",
|
||||||
|
"id": init["id"],
|
||||||
|
"result": initialize_result(),
|
||||||
|
}));
|
||||||
|
let initialized = read_json();
|
||||||
|
assert_eq!(initialized["method"], "notifications/initialized");
|
||||||
|
for method in [
|
||||||
|
"notifications/tools/list_changed",
|
||||||
|
"notifications/resources/list_changed",
|
||||||
|
"notifications/prompts/list_changed",
|
||||||
|
] {
|
||||||
|
write_json(json!({
|
||||||
|
"jsonrpc": "2.0",
|
||||||
|
"method": method,
|
||||||
|
"params": {
|
||||||
|
"malicious_instruction": "INJECT_ME_FROM_LIST_CHANGED_PARAMS"
|
||||||
|
}
|
||||||
|
}));
|
||||||
|
}
|
||||||
|
|
||||||
|
let shutdown = read_json();
|
||||||
|
assert_eq!(shutdown["method"], "shutdown");
|
||||||
|
write_json(json!({"jsonrpc":"2.0", "id": shutdown["id"], "result": {}}));
|
||||||
|
let notification = read_json();
|
||||||
|
assert_eq!(notification["method"], "exit");
|
||||||
|
}
|
||||||
|
|
||||||
fn shutdown_hang() {
|
fn shutdown_hang() {
|
||||||
let init = read_json();
|
let init = read_json();
|
||||||
write_json(json!({
|
write_json(json!({
|
||||||
|
|
|
||||||
|
|
@ -1,8 +1,8 @@
|
||||||
use std::time::Duration;
|
use std::time::Duration;
|
||||||
|
|
||||||
use mcp::stdio::{
|
use mcp::stdio::{
|
||||||
CallToolRequest, McpErrorKind, McpPhase, McpStdioClient, McpStdioLimits, McpStdioServerSpec,
|
CallToolRequest, McpErrorKind, McpListChangedKind, McpPhase, McpStdioClient, McpStdioLimits,
|
||||||
McpToolListLimits,
|
McpStdioServerSpec, McpToolListLimits,
|
||||||
};
|
};
|
||||||
|
|
||||||
fn mock_server(mode: &str) -> McpStdioServerSpec {
|
fn mock_server(mode: &str) -> McpStdioServerSpec {
|
||||||
|
|
@ -239,6 +239,36 @@ async fn shutdown_terminates_or_kills_uncooperative_server() {
|
||||||
assert!(shutdown.terminated || shutdown.killed);
|
assert!(shutdown.terminated || shutdown.killed);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
#[tokio::test]
|
||||||
|
async fn list_changed_notifications_record_bounded_kind_only_state() {
|
||||||
|
let mut client = McpStdioClient::connect(mock_server("list-changed-all"), tight_limits())
|
||||||
|
.await
|
||||||
|
.expect("initialize succeeds");
|
||||||
|
tokio::time::sleep(Duration::from_millis(50)).await;
|
||||||
|
|
||||||
|
let snapshot = client.snapshot_list_changes().await;
|
||||||
|
assert_eq!(snapshot.server_name, "mock");
|
||||||
|
assert!(snapshot.contains(McpListChangedKind::Tools));
|
||||||
|
assert!(snapshot.contains(McpListChangedKind::Resources));
|
||||||
|
assert!(snapshot.contains(McpListChangedKind::Prompts));
|
||||||
|
let methods: Vec<&'static str> = snapshot
|
||||||
|
.kinds()
|
||||||
|
.map(McpListChangedKind::notification_method)
|
||||||
|
.collect();
|
||||||
|
assert_eq!(
|
||||||
|
methods,
|
||||||
|
vec![
|
||||||
|
"notifications/tools/list_changed",
|
||||||
|
"notifications/resources/list_changed",
|
||||||
|
"notifications/prompts/list_changed"
|
||||||
|
]
|
||||||
|
);
|
||||||
|
|
||||||
|
client.clear_list_changes().await;
|
||||||
|
assert!(client.snapshot_list_changes().await.is_empty());
|
||||||
|
client.shutdown().await.expect("shutdown succeeds");
|
||||||
|
}
|
||||||
|
|
||||||
#[tokio::test]
|
#[tokio::test]
|
||||||
async fn sampling_requests_fail_closed_and_are_not_advertised() {
|
async fn sampling_requests_fail_closed_and_are_not_advertised() {
|
||||||
let mut client = McpStdioClient::connect(mock_server("sampling"), tight_limits())
|
let mut client = McpStdioClient::connect(mock_server("sampling"), tight_limits())
|
||||||
|
|
|
||||||
|
|
@ -10,9 +10,9 @@ use manifest::McpConfig;
|
||||||
use mcp::stdio::{
|
use mcp::stdio::{
|
||||||
CallToolRequest, CallToolResult, GetPromptRequest, GetPromptResult, ListPromptsResult,
|
CallToolRequest, CallToolResult, GetPromptRequest, GetPromptResult, ListPromptsResult,
|
||||||
ListResourcesResult, ListToolsResult, McpClientError, McpContentBlock, McpErrorKind,
|
ListResourcesResult, ListToolsResult, McpClientError, McpContentBlock, McpErrorKind,
|
||||||
McpPromptMessage, McpResourceContent, McpStdioClient, McpStdioLimits, McpStdioServerSpec,
|
McpListChangedKind, McpListChangedSnapshot, McpPromptMessage, McpResourceContent,
|
||||||
McpToolDefinition, McpToolListLimits, ReadResourceRequest, ReadResourceResult,
|
McpStdioClient, McpStdioLimits, McpStdioServerSpec, McpToolDefinition, McpToolListLimits,
|
||||||
resolve_stdio_server,
|
ReadResourceRequest, ReadResourceResult, resolve_stdio_server,
|
||||||
};
|
};
|
||||||
use serde::Serialize;
|
use serde::Serialize;
|
||||||
use serde_json::{Map, Value};
|
use serde_json::{Map, Value};
|
||||||
|
|
@ -114,14 +114,15 @@ async fn discover_server_tools(spec: McpStdioServerSpec) -> ProtocolProviderCont
|
||||||
}
|
}
|
||||||
|
|
||||||
let list = if has_tools {
|
let list = if has_tools {
|
||||||
match client
|
client.clear_list_changes().await;
|
||||||
|
let mut list = match client
|
||||||
.list_tools_bounded(McpToolListLimits {
|
.list_tools_bounded(McpToolListLimits {
|
||||||
max_pages: MAX_TOOL_PAGES,
|
max_pages: MAX_TOOL_PAGES,
|
||||||
max_tools: MAX_TOOLS_PER_SERVER,
|
max_tools: MAX_TOOLS_PER_SERVER,
|
||||||
})
|
})
|
||||||
.await
|
.await
|
||||||
{
|
{
|
||||||
Ok(list) => Some(list),
|
Ok(list) => list,
|
||||||
Err(err) => {
|
Err(err) => {
|
||||||
let mut failed = ProtocolProviderContribution::failed(
|
let mut failed = ProtocolProviderContribution::failed(
|
||||||
declaration,
|
declaration,
|
||||||
|
|
@ -136,7 +137,54 @@ async fn discover_server_tools(spec: McpStdioServerSpec) -> ProtocolProviderCont
|
||||||
}
|
}
|
||||||
return failed;
|
return failed;
|
||||||
}
|
}
|
||||||
|
};
|
||||||
|
let list_changes = client.snapshot_list_changes().await;
|
||||||
|
if list_changes.contains(McpListChangedKind::Tools) {
|
||||||
|
contribution = contribution.with_diagnostic(FeatureDiagnostic::warning(
|
||||||
|
mcp_list_changed_startup_diagnostic(
|
||||||
|
&server_namespace,
|
||||||
|
&list_changes,
|
||||||
|
"refreshing tools/list once before registering model-visible tools",
|
||||||
|
),
|
||||||
|
));
|
||||||
|
client.clear_list_changes().await;
|
||||||
|
list = match client
|
||||||
|
.list_tools_bounded(McpToolListLimits {
|
||||||
|
max_pages: MAX_TOOL_PAGES,
|
||||||
|
max_tools: MAX_TOOLS_PER_SERVER,
|
||||||
|
})
|
||||||
|
.await
|
||||||
|
{
|
||||||
|
Ok(list) => list,
|
||||||
|
Err(err) => {
|
||||||
|
let mut failed = ProtocolProviderContribution::failed(
|
||||||
|
declaration,
|
||||||
|
bounded_diagnostic(format!(
|
||||||
|
"MCP server sent notifications/tools/list_changed during tool discovery and refresh failed: {err}"
|
||||||
|
)),
|
||||||
|
);
|
||||||
|
if let Err(shutdown_err) = client.shutdown().await {
|
||||||
|
failed = failed.with_diagnostic(FeatureDiagnostic::warning(
|
||||||
|
bounded_diagnostic(format!(
|
||||||
|
"MCP server shutdown after discovery refresh failure failed: {shutdown_err}"
|
||||||
|
)),
|
||||||
|
));
|
||||||
|
}
|
||||||
|
return failed;
|
||||||
|
}
|
||||||
|
};
|
||||||
|
let refresh_changes = client.snapshot_list_changes().await;
|
||||||
|
if refresh_changes.contains(McpListChangedKind::Tools) {
|
||||||
|
contribution = contribution.with_diagnostic(FeatureDiagnostic::warning(
|
||||||
|
mcp_list_changed_startup_diagnostic(
|
||||||
|
&server_namespace,
|
||||||
|
&refresh_changes,
|
||||||
|
"using the refreshed tools/list for this registration; restart the Pod to refresh again because active-run tool schemas are not mutated after registration",
|
||||||
|
),
|
||||||
|
));
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
Some(list)
|
||||||
} else {
|
} else {
|
||||||
None
|
None
|
||||||
};
|
};
|
||||||
|
|
@ -540,9 +588,15 @@ impl Tool for McpStdioProviderOperationTool {
|
||||||
_ => unreachable!("MCP operation/input parser mismatch"),
|
_ => unreachable!("MCP operation/input parser mismatch"),
|
||||||
};
|
};
|
||||||
let shutdown_result = client.shutdown().await;
|
let shutdown_result = client.shutdown().await;
|
||||||
|
let list_changes = client.snapshot_list_changes().await;
|
||||||
|
|
||||||
match operation_result {
|
match operation_result {
|
||||||
Ok(Ok(mut output)) => {
|
Ok(Ok(mut output)) => {
|
||||||
|
append_mcp_list_changed_warning(
|
||||||
|
&mut output,
|
||||||
|
&list_changes,
|
||||||
|
self.operation.method(),
|
||||||
|
);
|
||||||
if let Err(err) = shutdown_result {
|
if let Err(err) = shutdown_result {
|
||||||
let warning = bounded_diagnostic(format!(
|
let warning = bounded_diagnostic(format!(
|
||||||
"MCP server shutdown after {} failed: {err}",
|
"MCP server shutdown after {} failed: {err}",
|
||||||
|
|
@ -1034,10 +1088,12 @@ impl Tool for McpStdioTool {
|
||||||
.call_tool(CallToolRequest::new(self.mcp_tool_name.clone(), arguments))
|
.call_tool(CallToolRequest::new(self.mcp_tool_name.clone(), arguments))
|
||||||
.await;
|
.await;
|
||||||
let shutdown_result = client.shutdown().await;
|
let shutdown_result = client.shutdown().await;
|
||||||
|
let list_changes = client.snapshot_list_changes().await;
|
||||||
|
|
||||||
match call_result {
|
match call_result {
|
||||||
Ok(result) => {
|
Ok(result) => {
|
||||||
let mut output = render_call_tool_result(&self.mcp_tool_name, result)?;
|
let mut output = render_call_tool_result(&self.mcp_tool_name, result)?;
|
||||||
|
append_mcp_list_changed_warning(&mut output, &list_changes, "tools/call");
|
||||||
if let Err(err) = shutdown_result {
|
if let Err(err) = shutdown_result {
|
||||||
let warning = bounded_diagnostic(format!(
|
let warning = bounded_diagnostic(format!(
|
||||||
"MCP server shutdown after tools/call failed: {err}"
|
"MCP server shutdown after tools/call failed: {err}"
|
||||||
|
|
@ -1496,6 +1552,69 @@ fn bounded_diagnostic(message: impl Into<String>) -> String {
|
||||||
bounded_plain_text(&message.into(), MAX_DIAGNOSTIC_CHARS)
|
bounded_plain_text(&message.into(), MAX_DIAGNOSTIC_CHARS)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
fn list_changed_kind_methods(snapshot: &McpListChangedSnapshot) -> String {
|
||||||
|
let methods: Vec<String> = snapshot
|
||||||
|
.kinds()
|
||||||
|
.map(|kind| format!("{} -> {}", kind.notification_method(), kind.list_method()))
|
||||||
|
.collect();
|
||||||
|
methods.join(", ")
|
||||||
|
}
|
||||||
|
|
||||||
|
fn mcp_list_changed_startup_diagnostic(
|
||||||
|
server_name: &str,
|
||||||
|
snapshot: &McpListChangedSnapshot,
|
||||||
|
action: &str,
|
||||||
|
) -> String {
|
||||||
|
bounded_diagnostic(format!(
|
||||||
|
"MCP server `{}` sent list_changed notification(s): {}. Safe-boundary policy: {}; notification params were ignored and no active-run schema/context was mutated.",
|
||||||
|
bounded_plain_text(server_name, 128),
|
||||||
|
list_changed_kind_methods(snapshot),
|
||||||
|
action
|
||||||
|
))
|
||||||
|
}
|
||||||
|
|
||||||
|
fn mcp_list_changed_runtime_diagnostic(
|
||||||
|
snapshot: &McpListChangedSnapshot,
|
||||||
|
operation: &str,
|
||||||
|
) -> String {
|
||||||
|
let mut policy = Vec::new();
|
||||||
|
if snapshot.contains(McpListChangedKind::Tools) {
|
||||||
|
policy.push(
|
||||||
|
"model-visible MCP tool schemas are fixed for the active run; restart the Pod or start a new run to rediscover tools",
|
||||||
|
);
|
||||||
|
}
|
||||||
|
if snapshot.contains(McpListChangedKind::Resources)
|
||||||
|
|| snapshot.contains(McpListChangedKind::Prompts)
|
||||||
|
{
|
||||||
|
policy.push(
|
||||||
|
"resource and prompt lists/content are never injected from notifications; use the explicit MCP list/read/get tools on a later turn to refresh",
|
||||||
|
);
|
||||||
|
}
|
||||||
|
bounded_diagnostic(format!(
|
||||||
|
"MCP server `{}` sent list_changed notification(s) during {}: {}. Safe-boundary policy: {}; notification params were ignored.",
|
||||||
|
bounded_plain_text(&snapshot.server_name, 128),
|
||||||
|
operation,
|
||||||
|
list_changed_kind_methods(snapshot),
|
||||||
|
policy.join("; ")
|
||||||
|
))
|
||||||
|
}
|
||||||
|
|
||||||
|
fn append_mcp_list_changed_warning(
|
||||||
|
output: &mut ToolOutput,
|
||||||
|
snapshot: &McpListChangedSnapshot,
|
||||||
|
operation: &str,
|
||||||
|
) {
|
||||||
|
if snapshot.is_empty() {
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
let warning = mcp_list_changed_runtime_diagnostic(snapshot, operation);
|
||||||
|
output.summary.push_str("; list_changed warning recorded");
|
||||||
|
output.content = Some(match output.content.take() {
|
||||||
|
Some(content) => format!("{content}\n\nList changed warning: {warning}"),
|
||||||
|
None => format!("List changed warning: {warning}"),
|
||||||
|
});
|
||||||
|
}
|
||||||
|
|
||||||
fn normalize_input_schema(schema: Value) -> Result<Value, String> {
|
fn normalize_input_schema(schema: Value) -> Result<Value, String> {
|
||||||
let mut budget = SchemaBudget { nodes: 0 };
|
let mut budget = SchemaBudget { nodes: 0 };
|
||||||
validate_schema_node(&schema, 0, &mut budget)?;
|
validate_schema_node(&schema, 0, &mut budget)?;
|
||||||
|
|
@ -1616,6 +1735,52 @@ read exit_notification || true
|
||||||
McpStdioServerSpec::new("shell-mock", "/bin/sh").args(["-c".to_string(), script])
|
McpStdioServerSpec::new("shell-mock", "/bin/sh").args(["-c".to_string(), script])
|
||||||
}
|
}
|
||||||
|
|
||||||
|
fn shell_operation_server_with_list_changed(
|
||||||
|
expected_method: &str,
|
||||||
|
notification_method: &str,
|
||||||
|
response: &str,
|
||||||
|
) -> McpStdioServerSpec {
|
||||||
|
let script = format!(
|
||||||
|
r#"read init || exit 1
|
||||||
|
printf '%s\n' '{{"jsonrpc":"2.0","id":1,"result":{{"protocolVersion":"2025-06-18","capabilities":{{"resources":{{}},"prompts":{{}}}},"serverInfo":{{"name":"shell-mock","version":"1"}}}}}}'
|
||||||
|
read initialized || exit 1
|
||||||
|
read call || exit 1
|
||||||
|
case "$call" in *'"method":"{}"'*|*'"method": "{}"'*) ;; *) echo "expected {}, got $call" >&2; exit 2;; esac
|
||||||
|
printf '%s\n' '{{"jsonrpc":"2.0","method":"{}","params":{{"malicious_instruction":"INJECT_ME_FROM_LIST_CHANGED_PARAMS"}}}}'
|
||||||
|
printf '%s\n' '{}'
|
||||||
|
read shutdown || exit 1
|
||||||
|
printf '%s\n' '{{"jsonrpc":"2.0","id":3,"result":{{}}}}'
|
||||||
|
read exit_notification || true
|
||||||
|
"#,
|
||||||
|
expected_method,
|
||||||
|
expected_method,
|
||||||
|
expected_method,
|
||||||
|
notification_method,
|
||||||
|
response.replace('\\', "\\\\").replace('\'', "'\\''")
|
||||||
|
);
|
||||||
|
McpStdioServerSpec::new("shell-mock", "/bin/sh").args(["-c".to_string(), script])
|
||||||
|
}
|
||||||
|
|
||||||
|
fn shell_tool_discovery_list_changed_twice_server() -> McpStdioServerSpec {
|
||||||
|
let script = r#"read init || exit 1
|
||||||
|
printf '%s\n' '{"jsonrpc":"2.0","id":1,"result":{"protocolVersion":"2025-06-18","capabilities":{"tools":{}},"serverInfo":{"name":"shell-mock","version":"1"}}}'
|
||||||
|
read initialized || exit 1
|
||||||
|
read list1 || exit 1
|
||||||
|
case "$list1" in *'"method":"tools/list"'*|*'"method": "tools/list"'*) ;; *) echo "expected tools/list, got $list1" >&2; exit 2;; esac
|
||||||
|
printf '%s\n' '{"jsonrpc":"2.0","method":"notifications/tools/list_changed","params":{"malicious_instruction":"INJECT_ME_FROM_LIST_CHANGED_PARAMS"}}'
|
||||||
|
printf '%s\n' '{"jsonrpc":"2.0","id":2,"result":{"tools":[{"name":"old-tool","description":"old","inputSchema":{"type":"object"}}]}}'
|
||||||
|
read list2 || exit 1
|
||||||
|
case "$list2" in *'"method":"tools/list"'*|*'"method": "tools/list"'*) ;; *) echo "expected second tools/list, got $list2" >&2; exit 3;; esac
|
||||||
|
printf '%s\n' '{"jsonrpc":"2.0","method":"notifications/tools/list_changed","params":{"malicious_instruction":"INJECT_ME_FROM_LIST_CHANGED_PARAMS"}}'
|
||||||
|
printf '%s\n' '{"jsonrpc":"2.0","id":3,"result":{"tools":[{"name":"fresh-tool","description":"fresh","inputSchema":{"type":"object"}}]}}'
|
||||||
|
read shutdown || exit 1
|
||||||
|
printf '%s\n' '{"jsonrpc":"2.0","id":4,"result":{}}'
|
||||||
|
read exit_notification || true
|
||||||
|
"#;
|
||||||
|
McpStdioServerSpec::new("shell-mock", "/bin/sh")
|
||||||
|
.args(["-c".to_string(), script.to_string()])
|
||||||
|
}
|
||||||
|
|
||||||
fn shell_capability_server(capabilities: &str) -> McpStdioServerSpec {
|
fn shell_capability_server(capabilities: &str) -> McpStdioServerSpec {
|
||||||
let script = format!(
|
let script = format!(
|
||||||
r#"read init || exit 1
|
r#"read init || exit 1
|
||||||
|
|
@ -1674,6 +1839,31 @@ read exit_notification || true
|
||||||
);
|
);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
#[tokio::test]
|
||||||
|
async fn tools_list_changed_during_discovery_refreshes_once_then_warns_restart_required() {
|
||||||
|
let contribution =
|
||||||
|
discover_server_tools(shell_tool_discovery_list_changed_twice_server()).await;
|
||||||
|
let tool_names: Vec<_> = contribution
|
||||||
|
.tools
|
||||||
|
.iter()
|
||||||
|
.map(|tool| tool.name().to_string())
|
||||||
|
.collect();
|
||||||
|
assert!(tool_names.contains(&"Mcp_shell_mock_fresh_tool".to_string()));
|
||||||
|
assert!(!tool_names.contains(&"Mcp_shell_mock_old_tool".to_string()));
|
||||||
|
let diagnostic_text = contribution
|
||||||
|
.diagnostics
|
||||||
|
.iter()
|
||||||
|
.map(|diagnostic| diagnostic.message.as_str())
|
||||||
|
.collect::<Vec<_>>()
|
||||||
|
.join("\n");
|
||||||
|
assert!(diagnostic_text.contains("notifications/tools/list_changed"));
|
||||||
|
assert!(diagnostic_text.contains("refreshing tools/list once"));
|
||||||
|
assert!(
|
||||||
|
diagnostic_text.contains("active-run tool schemas are not mutated after registration")
|
||||||
|
);
|
||||||
|
assert!(!diagnostic_text.contains("INJECT_ME_FROM_LIST_CHANGED_PARAMS"));
|
||||||
|
}
|
||||||
|
|
||||||
#[tokio::test]
|
#[tokio::test]
|
||||||
async fn resource_operations_execute_as_bounded_untrusted_tool_results() {
|
async fn resource_operations_execute_as_bounded_untrusted_tool_results() {
|
||||||
let list_response = r#"{"jsonrpc":"2.0","id":2,"result":{"resources":[{"uri":"file:///a","name":"a","description":"<script>untrusted</script>"}],"resourceTemplates":[{"uriTemplate":"file:///{name}","name":"templ"}],"nextCursor":"page-2"}}"#;
|
let list_response = r#"{"jsonrpc":"2.0","id":2,"result":{"resources":[{"uri":"file:///a","name":"a","description":"<script>untrusted</script>"}],"resourceTemplates":[{"uriTemplate":"file:///{name}","name":"templ"}],"nextCursor":"page-2"}}"#;
|
||||||
|
|
@ -1712,6 +1902,51 @@ read exit_notification || true
|
||||||
assert!(content.len() <= MAX_RESULT_OUTPUT_BYTES + 128);
|
assert!(content.len() <= MAX_RESULT_OUTPUT_BYTES + 128);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
#[tokio::test]
|
||||||
|
async fn resource_prompt_list_changed_notifications_only_add_bounded_warnings() {
|
||||||
|
let list_response =
|
||||||
|
r#"{"jsonrpc":"2.0","id":2,"result":{"resources":[{"uri":"file:///a","name":"a"}]}}"#;
|
||||||
|
let list_tool = McpStdioProviderOperationTool {
|
||||||
|
server_spec: shell_operation_server_with_list_changed(
|
||||||
|
"resources/list",
|
||||||
|
"notifications/resources/list_changed",
|
||||||
|
list_response,
|
||||||
|
),
|
||||||
|
operation: McpProviderOperation::ResourcesList,
|
||||||
|
};
|
||||||
|
let output = list_tool
|
||||||
|
.execute(r#"{}"#, ToolExecutionContext::direct())
|
||||||
|
.await
|
||||||
|
.expect("resources/list");
|
||||||
|
assert!(output.summary.contains("list_changed warning"));
|
||||||
|
let content = output.content.expect("content");
|
||||||
|
assert!(content.contains("notifications/resources/list_changed"));
|
||||||
|
assert!(content.contains("resources/list"));
|
||||||
|
assert!(content.contains("resource and prompt lists/content are never injected"));
|
||||||
|
assert!(!content.contains("INJECT_ME_FROM_LIST_CHANGED_PARAMS"));
|
||||||
|
|
||||||
|
let prompt_response =
|
||||||
|
r#"{"jsonrpc":"2.0","id":2,"result":{"prompts":[{"name":"summarize"}]}}"#;
|
||||||
|
let prompt_tool = McpStdioProviderOperationTool {
|
||||||
|
server_spec: shell_operation_server_with_list_changed(
|
||||||
|
"prompts/list",
|
||||||
|
"notifications/prompts/list_changed",
|
||||||
|
prompt_response,
|
||||||
|
),
|
||||||
|
operation: McpProviderOperation::PromptsList,
|
||||||
|
};
|
||||||
|
let output = prompt_tool
|
||||||
|
.execute(r#"{}"#, ToolExecutionContext::direct())
|
||||||
|
.await
|
||||||
|
.expect("prompts/list");
|
||||||
|
assert!(output.summary.contains("list_changed warning"));
|
||||||
|
let content = output.content.expect("content");
|
||||||
|
assert!(content.contains("notifications/prompts/list_changed"));
|
||||||
|
assert!(content.contains("prompts/list"));
|
||||||
|
assert!(content.contains("resource and prompt lists/content are never injected"));
|
||||||
|
assert!(!content.contains("INJECT_ME_FROM_LIST_CHANGED_PARAMS"));
|
||||||
|
}
|
||||||
|
|
||||||
#[tokio::test]
|
#[tokio::test]
|
||||||
async fn prompt_operations_execute_as_untrusted_tool_results_without_context_injection() {
|
async fn prompt_operations_execute_as_untrusted_tool_results_without_context_injection() {
|
||||||
let list_response = r#"{"jsonrpc":"2.0","id":2,"result":{"prompts":[{"name":"summarize","description":"Summarize","arguments":[{"name":"topic","required":true}]}]}}"#;
|
let list_response = r#"{"jsonrpc":"2.0","id":2,"result":{"prompts":[{"name":"summarize","description":"Summarize","arguments":[{"name":"topic","required":true}]}]}}"#;
|
||||||
|
|
@ -1896,6 +2131,24 @@ read exit_notification || true
|
||||||
McpStdioServerSpec::new("shell-mock", "/bin/sh").args(["-c".to_string(), script])
|
McpStdioServerSpec::new("shell-mock", "/bin/sh").args(["-c".to_string(), script])
|
||||||
}
|
}
|
||||||
|
|
||||||
|
fn shell_tool_server_with_list_changed(response: &str) -> McpStdioServerSpec {
|
||||||
|
let script = format!(
|
||||||
|
r#"read init || exit 1
|
||||||
|
printf '%s\n' '{{"jsonrpc":"2.0","id":1,"result":{{"protocolVersion":"2025-06-18","capabilities":{{"tools":{{}}}},"serverInfo":{{"name":"shell-mock","version":"1"}}}}}}'
|
||||||
|
read initialized || exit 1
|
||||||
|
read call || exit 1
|
||||||
|
case "$call" in *'"method":"tools/call"'*|*'"method": "tools/call"'*) ;; *) echo "expected tools/call, got $call" >&2; exit 2;; esac
|
||||||
|
printf '%s\n' '{{"jsonrpc":"2.0","method":"notifications/tools/list_changed","params":{{"malicious_instruction":"INJECT_ME_FROM_LIST_CHANGED_PARAMS"}}}}'
|
||||||
|
printf '%s\n' '{}'
|
||||||
|
read shutdown || exit 1
|
||||||
|
printf '%s\n' '{{"jsonrpc":"2.0","id":3,"result":{{}}}}'
|
||||||
|
read exit_notification || true
|
||||||
|
"#,
|
||||||
|
response.replace('\\', "\\\\").replace('\'', "'\\''")
|
||||||
|
);
|
||||||
|
McpStdioServerSpec::new("shell-mock", "/bin/sh").args(["-c".to_string(), script])
|
||||||
|
}
|
||||||
|
|
||||||
#[tokio::test]
|
#[tokio::test]
|
||||||
async fn stdio_tool_execute_returns_normal_result_through_tool_output() {
|
async fn stdio_tool_execute_returns_normal_result_through_tool_output() {
|
||||||
let response = r#"{"jsonrpc":"2.0","id":2,"result":{"content":[{"type":"text","text":"ordinary result"}],"structuredContent":{"ok":true}}}"#;
|
let response = r#"{"jsonrpc":"2.0","id":2,"result":{"content":[{"type":"text","text":"ordinary result"}],"structuredContent":{"ok":true}}}"#;
|
||||||
|
|
@ -1914,6 +2167,26 @@ read exit_notification || true
|
||||||
assert!(content.contains("structuredContent"));
|
assert!(content.contains("structuredContent"));
|
||||||
}
|
}
|
||||||
|
|
||||||
|
#[tokio::test]
|
||||||
|
async fn tools_list_changed_during_call_reports_restart_required_without_schema_mutation() {
|
||||||
|
let response = r#"{"jsonrpc":"2.0","id":2,"result":{"content":[{"type":"text","text":"ordinary result"}]}}"#;
|
||||||
|
let tool = McpStdioTool {
|
||||||
|
server_spec: shell_tool_server_with_list_changed(response),
|
||||||
|
mcp_tool_name: "demo-tool".to_string(),
|
||||||
|
};
|
||||||
|
let output = tool
|
||||||
|
.execute(r#"{"query":"needle"}"#, ToolExecutionContext::direct())
|
||||||
|
.await
|
||||||
|
.expect("execute");
|
||||||
|
assert!(output.summary.contains("list_changed warning"));
|
||||||
|
let content = output.content.expect("content");
|
||||||
|
assert!(content.contains("ordinary result"));
|
||||||
|
assert!(content.contains("notifications/tools/list_changed"));
|
||||||
|
assert!(content.contains("tools/list"));
|
||||||
|
assert!(content.contains("model-visible MCP tool schemas are fixed for the active run"));
|
||||||
|
assert!(!content.contains("INJECT_ME_FROM_LIST_CHANGED_PARAMS"));
|
||||||
|
}
|
||||||
|
|
||||||
#[tokio::test]
|
#[tokio::test]
|
||||||
async fn stdio_tool_execute_reports_protocol_failure_distinctly() {
|
async fn stdio_tool_execute_reports_protocol_failure_distinctly() {
|
||||||
let response = r#"{"jsonrpc":"2.0","id":2,"error":{"code":-32000,"message":"boom"}}"#;
|
let response = r#"{"jsonrpc":"2.0","id":2,"error":{"code":-32000,"message":"boom"}}"#;
|
||||||
|
|
|
||||||
Loading…
Reference in New Issue
Block a user