11use bytes:: Bytes ;
22use rmcp:: {
3- ErrorData as McpError , Json , ServerHandler , ServiceExt ,
3+ ErrorData as McpError , Json , ServerHandler ,
44 handler:: server:: tool:: ToolRouter ,
55 handler:: server:: wrapper:: Parameters ,
66 model:: { ServerCapabilities , ServerInfo } ,
77 tool, tool_handler, tool_router,
8- transport:: stdio,
8+ transport:: streamable_http_server:: {
9+ StreamableHttpServerConfig ,
10+ StreamableHttpService ,
11+ session:: local:: LocalSessionManager ,
12+ } ,
913} ;
1014use schemars:: JsonSchema ;
1115use serde:: { Deserialize , Serialize } ;
@@ -18,11 +22,12 @@ use std::{
1822} ;
1923use thiserror:: Error ;
2024use tokio:: time:: sleep;
25+ use tokio_util:: sync:: CancellationToken ;
2126use tracing:: { error, info, warn} ;
2227use uuid:: Uuid ;
2328use walkdir:: WalkDir ;
2429
25- // HTTP 健康检查相关
30+ // HTTP 相关
2631use axum:: {
2732 extract:: State ,
2833 response:: Json as AxumJson ,
@@ -1010,22 +1015,6 @@ fn init_tracing() {
10101015 tracing_subscriber:: fmt ( ) . with_env_filter ( filter) . init ( ) ;
10111016}
10121017
1013- /// 启动 HTTP 健康检查服务器
1014- pub async fn run_health_check_server (
1015- server : MineruServer ,
1016- port : u16 ,
1017- ) -> Result < ( ) , Box < dyn std:: error:: Error > > {
1018- let app = Router :: new ( )
1019- . route ( "/health" , get ( health_handler) )
1020- . route ( "/" , get ( root_handler) )
1021- . with_state ( server) ;
1022-
1023- let listener = tokio:: net:: TcpListener :: bind ( format ! ( "0.0.0.0:{port}" ) ) . await ?;
1024- info ! ( "健康检查服务器启动在 http://0.0.0.0:{}" , port) ;
1025- axum:: serve ( listener, app) . await ?;
1026- Ok ( ( ) )
1027- }
1028-
10291018async fn health_handler ( State ( server) : State < MineruServer > ) -> AxumJson < HealthCheckResponse > {
10301019 let is_healthy = server. healthy . load ( Ordering :: Relaxed ) ;
10311020 AxumJson ( HealthCheckResponse {
@@ -1044,37 +1033,56 @@ async fn health_handler(State(server): State<MineruServer>) -> AxumJson<HealthCh
10441033}
10451034
10461035async fn root_handler ( ) -> & ' static str {
1047- "MinerU MCP DragonOS Server - Health Check: GET /health "
1036+ "MinerU MCP DragonOS Server\n \n Endpoints: \n GET /health - Health check \n POST /mcp - MCP JSON-RPC requests \n GET /mcp - SSE event stream "
10481037}
10491038
1039+ /// 启动 HTTP/SSE MCP 服务器
10501040pub async fn run ( ) -> Result < ( ) , Box < dyn std:: error:: Error > > {
10511041 init_tracing ( ) ;
10521042 let settings = Settings :: from_env ( ) ;
10531043 if !settings. use_local_api && settings. mineru_api_key . is_none ( ) {
10541044 warn ! ( "MINERU_API_KEY 未设置,远程解析将失败" ) ;
10551045 }
1056- let server = MineruServer :: new ( settings) ?;
10571046
1058- // 从环境变量读取健康检查端口,默认 3000
1059- let health_port = env:: var ( "HEALTH_CHECK_PORT " )
1047+ let ct = CancellationToken :: new ( ) ;
1048+ let port = env:: var ( "MCP_PORT " )
10601049 . ok ( )
10611050 . and_then ( |v| v. parse :: < u16 > ( ) . ok ( ) )
1062- . unwrap_or ( 3000 ) ;
1051+ . unwrap_or ( 8080 ) ;
1052+
1053+ // 创建 HTTP/SSE MCP 服务
1054+ let mcp_service: StreamableHttpService < MineruServer , LocalSessionManager > =
1055+ StreamableHttpService :: new (
1056+ || {
1057+ let s = Settings :: from_env ( ) ;
1058+ MineruServer :: new ( s) . map_err ( |e| std:: io:: Error :: other ( e. to_string ( ) ) )
1059+ } ,
1060+ Arc :: new ( LocalSessionManager :: default ( ) ) ,
1061+ StreamableHttpServerConfig {
1062+ stateful_mode : true ,
1063+ sse_keep_alive : Some ( Duration :: from_secs ( 15 ) ) ,
1064+ cancellation_token : ct. child_token ( ) ,
1065+ ..Default :: default ( )
1066+ } ,
1067+ ) ;
1068+
1069+ // 创建一个用于健康检查的 server 实例
1070+ let health_server = MineruServer :: new ( settings) ?;
1071+
1072+ // 统一的 Axum 路由
1073+ let app = Router :: new ( )
1074+ . route ( "/health" , get ( health_handler) )
1075+ . route ( "/" , get ( root_handler) )
1076+ . with_state ( health_server)
1077+ . nest_service ( "/mcp" , mcp_service) ;
1078+
1079+ let listener = tokio:: net:: TcpListener :: bind ( format ! ( "0.0.0.0:{}" , port) ) . await ?;
1080+ info ! ( "MCP HTTP/SSE 服务启动在 http://0.0.0.0:{}" , port) ;
1081+ info ! ( "端点: GET /health, POST /mcp, GET /mcp" ) ;
1082+
1083+ axum:: serve ( listener, app)
1084+ . with_graceful_shutdown ( async move { ct. cancelled ( ) . await } )
1085+ . await ?;
10631086
1064- // 启动 HTTP 健康检查服务器
1065- let server_clone = server. clone ( ) ;
1066- tokio:: spawn ( async move {
1067- if let Err ( err) = run_health_check_server ( server_clone, health_port) . await {
1068- error ! ( "健康检查服务器错误: {err}" ) ;
1069- }
1070- } ) ;
1071-
1072- // 启动 MCP stdio 服务
1073- let service = server. serve ( stdio ( ) )
1074- . await
1075- . inspect_err ( |err| {
1076- error ! ( "启动MCP服务失败: {err}" ) ;
1077- } ) ?;
1078- service. waiting ( ) . await ?;
10791087 Ok ( ( ) )
10801088}
0 commit comments