Skip to content

Commit a8e34de

Browse files
authored
upgrade rmcp crate (#7518)
1 parent 7ad6e87 commit a8e34de

File tree

10 files changed

+73
-55
lines changed

10 files changed

+73
-55
lines changed

backend/Cargo.lock

Lines changed: 47 additions & 7 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

backend/windmill-api/Cargo.toml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -40,7 +40,7 @@ mcp = ["dep:rmcp"]
4040
python = []
4141

4242
[dependencies]
43-
rmcp = { version = "0.8.1", features=["transport-streamable-http-server", "transport-streamable-http-server-session", "transport-worker"], optional = true }
43+
rmcp = { version = "0.12.0", features=["transport-streamable-http-server", "transport-streamable-http-server-session", "transport-worker"], optional = true }
4444
windmill-queue.workspace = true
4545
windmill-common = { workspace = true, default-features = false }
4646
windmill-audit.workspace = true

backend/windmill-api/src/lib.rs

Lines changed: 10 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -19,10 +19,8 @@ use crate::oauth2_oss::SlackVerifier;
1919
use crate::smtp_server_oss::SmtpServer;
2020

2121
#[cfg(feature = "mcp")]
22-
use crate::mcp::{extract_and_store_workspace_id, setup_mcp_server, shutdown_mcp_server};
22+
use crate::mcp::{extract_and_store_workspace_id, setup_mcp_server};
2323
use crate::triggers::start_all_listeners;
24-
#[cfg(feature = "mcp")]
25-
use rmcp::transport::streamable_http_server::session::local::LocalSessionManager;
2624
use tower_http::catch_panic::CatchPanicLayer;
2725

2826
use crate::tracing_init::MyOnFailure;
@@ -400,14 +398,17 @@ pub async fn run_server(
400398

401399
// Setup MCP server
402400
#[allow(unused_variables)]
403-
let (mcp_router, mcp_session_manager) = {
401+
let (mcp_router, mcp_cancellation_token) = {
404402
#[cfg(feature = "mcp")]
405403
if server_mode || mcp_mode {
406-
let (mcp_router, mcp_session_manager) = setup_mcp_server().await?;
404+
let (mcp_router, mcp_cancellation_token) = setup_mcp_server().await?;
407405
let mcp_middleware = axum::middleware::from_fn(extract_and_store_workspace_id);
408-
(mcp_router.layer(mcp_middleware), Some(mcp_session_manager))
406+
(
407+
mcp_router.layer(mcp_middleware),
408+
Some(mcp_cancellation_token),
409+
)
409410
} else {
410-
(Router::new(), Option::<Arc<LocalSessionManager>>::None)
411+
(Router::new(), None)
411412
}
412413

413414
#[cfg(not(feature = "mcp"))]
@@ -757,8 +758,8 @@ pub async fn run_server(
757758
tracing::info!("Graceful shutdown of server");
758759

759760
#[cfg(feature = "mcp")]
760-
if let Some(mcp_session_manager) = mcp_session_manager {
761-
shutdown_mcp_server(mcp_session_manager).await;
761+
if let Some(mcp_cancellation_token) = mcp_cancellation_token {
762+
mcp_cancellation_token.cancel();
762763
tracing::info!("MCP server shutdown");
763764
}
764765
});

backend/windmill-api/src/mcp/mod.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -8,4 +8,4 @@ pub mod tools;
88
pub mod utils;
99

1010
// Re-export main components
11-
pub use server::{extract_and_store_workspace_id, setup_mcp_server, shutdown_mcp_server, list_tools_service};
11+
pub use server::{extract_and_store_workspace_id, list_tools_service, setup_mcp_server};

backend/windmill-api/src/mcp/server.rs

Lines changed: 9 additions & 32 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@ use rmcp::{
1818
};
1919
use serde_json::Value;
2020
use tokio::try_join;
21+
use tokio_util::sync::CancellationToken;
2122
use windmill_common::db::UserDB;
2223
use windmill_common::worker::to_raw_value;
2324
use windmill_common::{utils::StripPath, DB};
@@ -47,7 +48,7 @@ use axum::{
4748
extract::Path, http::Request, middleware::Next, response::Response, routing::get, Json, Router,
4849
};
4950
use rmcp::transport::streamable_http_server::{
50-
session::local::LocalSessionManager, SessionManager, StreamableHttpService,
51+
session::local::LocalSessionManager, StreamableHttpService,
5152
};
5253
use windmill_common::error::JsonResult;
5354

@@ -126,6 +127,7 @@ impl Runner {
126127
idempotent_hint: Some(false), // Are not guaranteed to be idempotent
127128
open_world_hint: Some(true), // Can interact with external services
128129
}),
130+
meta: None,
129131
})
130132
}
131133
}
@@ -471,7 +473,7 @@ impl ServerHandler for Runner {
471473
);
472474
}
473475

474-
Ok(ListToolsResult { tools, next_cursor: None })
476+
Ok(ListToolsResult { tools, next_cursor: None, meta: None })
475477
}
476478

477479
fn get_info(&self) -> ServerInfo {
@@ -498,7 +500,7 @@ impl ServerHandler for Runner {
498500
_request: Option<PaginatedRequestParam>,
499501
_context: RequestContext<RoleServer>,
500502
) -> Result<ListResourcesResult, ErrorData> {
501-
Ok(ListResourcesResult { resources: vec![], next_cursor: None })
503+
Ok(ListResourcesResult { resources: vec![], next_cursor: None, meta: None })
502504
}
503505

504506
async fn list_prompts(
@@ -530,11 +532,13 @@ pub async fn extract_and_store_workspace_id(
530532
}
531533

532534
/// Setup the MCP server with HTTP transport
533-
pub async fn setup_mcp_server() -> anyhow::Result<(Router, Arc<LocalSessionManager>)> {
535+
pub async fn setup_mcp_server() -> anyhow::Result<(Router, CancellationToken)> {
536+
let cancellation_token = CancellationToken::new();
534537
let session_manager = Arc::new(LocalSessionManager::default());
535538
let service_config = StreamableHttpServerConfig {
536539
sse_keep_alive: Some(Duration::from_secs(15)),
537540
stateful_mode: false,
541+
cancellation_token: cancellation_token.clone(),
538542
};
539543
let service = StreamableHttpService::new(
540544
|| Ok(Runner::new()),
@@ -543,34 +547,7 @@ pub async fn setup_mcp_server() -> anyhow::Result<(Router, Arc<LocalSessionManag
543547
);
544548

545549
let router = axum::Router::new().nest_service("/", service);
546-
Ok((router, session_manager))
547-
}
548-
549-
/// Shutdown the MCP server gracefully by closing all active sessions
550-
pub async fn shutdown_mcp_server(session_manager: Arc<LocalSessionManager>) {
551-
let session_ids_to_close = {
552-
let sessions_map = session_manager.sessions.read().await;
553-
sessions_map.keys().cloned().collect::<Vec<_>>()
554-
};
555-
556-
if !session_ids_to_close.is_empty() {
557-
tracing::info!(
558-
"Closing {} active MCP session(s)...",
559-
session_ids_to_close.len()
560-
);
561-
let close_futures = session_ids_to_close
562-
.iter()
563-
.map(|session_id| {
564-
let manager_clone = session_manager.clone();
565-
async move {
566-
if let Err(_) = manager_clone.close_session(session_id).await {
567-
tracing::warn!("Error closing MCP session");
568-
}
569-
}
570-
})
571-
.collect::<Vec<_>>();
572-
futures::future::join_all(close_futures).await;
573-
}
550+
Ok((router, cancellation_token))
574551
}
575552

576553
/// HTTP handler to list MCP tools as JSON

backend/windmill-api/src/mcp/tools/endpoint_tools.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -61,6 +61,7 @@ pub fn endpoint_tool_to_mcp_tool(tool: &EndpointTool) -> Tool {
6161
output_schema: None,
6262
icons: None,
6363
annotations: Some(annotations),
64+
meta: None,
6465
}
6566
}
6667

backend/windmill-common/Cargo.toml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -108,7 +108,7 @@ tempfile.workspace = true
108108
systemstat.workspace = true
109109
size.workspace = true
110110
globset.workspace = true
111-
rmcp = { version = "0.8.1", features = ["client", "transport-streamable-http-client", "transport-streamable-http-client-reqwest"] }
111+
rmcp = { version = "0.12.0", features = ["client", "transport-streamable-http-client", "transport-streamable-http-client-reqwest"] }
112112

113113
opentelemetry-semantic-conventions = { workspace = true, optional = true }
114114
opentelemetry-otlp = { workspace = true, optional = true }

backend/windmill-common/src/mcp_client.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -5,7 +5,7 @@ use reqwest::header::{HeaderMap, HeaderName, HeaderValue};
55
use serde_json::{json, Value};
66
use std::str::FromStr;
77

8-
use rmcp::model::Tool as McpTool;
8+
pub use rmcp::model::Tool as McpTool;
99
use rmcp::{
1010
model::{
1111
CallToolRequestParam, ClientCapabilities, ClientInfo, Implementation,

backend/windmill-worker/Cargo.toml

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -57,7 +57,6 @@ windmill-parser-sql.workspace = true
5757
windmill-parser-graphql.workspace = true
5858
windmill-parser-php = { workspace = true, optional = true }
5959
windmill-git-sync.workspace = true
60-
rmcp = { version = "0.8.1", features = ["client", "transport-streamable-http-client", "transport-streamable-http-client-reqwest"] }
6160
aws-sdk-bedrockruntime.workspace = true
6261
aws-config.workspace = true
6362
aws-credential-types.workspace = true

backend/windmill-worker/src/ai/utils.rs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -20,7 +20,7 @@ use windmill_common::{
2020
};
2121
use windmill_common::{
2222
flows::FlowModuleValue,
23-
mcp_client::{McpClient, McpResource, McpToolSource},
23+
mcp_client::{McpClient, McpResource, McpTool, McpToolSource},
2424
};
2525
use windmill_queue::{flow_status::get_step_of_flow_status, MiniPulledJob};
2626

@@ -352,7 +352,7 @@ pub async fn cleanup_mcp_clients(mcp_clients: HashMap<String, Arc<McpClient>>) {
352352

353353
/// Convert raw MCP tools to Windmill Tool format with source tracking
354354
fn convert_mcp_tools_to_windmill_tools(
355-
mcp_tools: &[rmcp::model::Tool],
355+
mcp_tools: &[McpTool],
356356
resource_name: &str,
357357
resource_path: &str,
358358
) -> Result<Vec<Tool>, Error> {

0 commit comments

Comments
 (0)