Skip to content

Commit 656022f

Browse files
dingfelikensave
authored andcommitted
feat(mcp): adds sse support (#2995)
* adds sse support * makes retry agnostic to error type
1 parent 7278289 commit 656022f

File tree

4 files changed

+356
-233
lines changed

4 files changed

+356
-233
lines changed

Cargo.lock

Lines changed: 4 additions & 4 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

Cargo.toml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -129,7 +129,7 @@ winnow = "=0.6.2"
129129
winreg = "0.55.0"
130130
schemars = "1.0.4"
131131
jsonschema = "0.30.0"
132-
rmcp = { version = "0.6.3", features = ["client", "transport-sse-client-reqwest", "reqwest", "transport-streamable-http-client-reqwest", "transport-child-process", "tower", "auth"] }
132+
rmcp = { version = "0.7.0", features = ["client", "transport-sse-client-reqwest", "reqwest", "transport-streamable-http-client-reqwest", "transport-child-process", "tower", "auth"] }
133133

134134
[workspace.lints.rust]
135135
future_incompatible = "warn"

crates/chat-cli/src/mcp_client/client.rs

Lines changed: 46 additions & 133 deletions
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,7 @@ use regex::Regex;
66
use rmcp::model::{
77
CallToolRequestParam,
88
CallToolResult,
9+
ClientResult,
910
ErrorCode,
1011
GetPromptRequestParam,
1112
GetPromptResult,
@@ -42,17 +43,15 @@ use tokio::process::{
4243
};
4344
use tokio::task::JoinHandle;
4445
use tracing::{
45-
debug,
4646
error,
4747
info,
4848
};
4949

5050
use super::messenger::Messenger;
51-
use super::oauth_util::HttpTransport;
5251
use super::{
5352
AuthClientWrapper,
53+
HttpServiceBuilder,
5454
OauthUtilError,
55-
get_http_transport,
5655
};
5756
use crate::cli::chat::server_messenger::ServerMessenger;
5857
use crate::cli::chat::tools::custom_tool::{
@@ -266,37 +265,10 @@ impl RunningService {
266265
decorate_with_auth_retry!(GetPromptRequestParam, get_prompt, GetPromptResult);
267266
}
268267

269-
pub type StdioTransport = (TokioChildProcess, Option<ChildStderr>);
270-
271-
// TODO: add sse support (even though it's deprecated)
272-
/// Represents the different transport mechanisms available for MCP (Model Context Protocol)
273-
/// communication.
274-
///
275-
/// This enum encapsulates the two primary ways to communicate with MCP servers:
276-
/// - HTTP-based transport for remote servers
277-
/// - Standard I/O transport for local process-based servers
278-
pub enum Transport {
279-
/// HTTP transport for communicating with remote MCP servers over network protocols.
280-
/// Uses a streamable HTTP client with authentication support.
281-
Http(HttpTransport),
282-
/// Standard I/O transport for communicating with local MCP servers via child processes.
283-
/// Communication happens through stdin/stdout pipes.
284-
Stdio(StdioTransport),
285-
}
286-
287-
impl std::fmt::Debug for Transport {
288-
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
289-
match self {
290-
Transport::Http(_) => f.debug_tuple("Http").field(&"HttpTransport").finish(),
291-
Transport::Stdio(_) => f.debug_tuple("Stdio").field(&"TokioChildProcess").finish(),
292-
}
293-
}
294-
}
295-
296268
/// This struct implements the [Service] trait from rmcp. It is within this trait the logic of
297269
/// server driven data flow (i.e. requests and notifications that are sent from the server) are
298270
/// handled.
299-
#[derive(Debug)]
271+
#[derive(Clone, Debug)]
300272
pub struct McpClientService {
301273
pub config: CustomToolConfig,
302274
server_name: String,
@@ -312,103 +284,14 @@ impl McpClientService {
312284
}
313285
}
314286

315-
pub async fn init(mut self, os: &Os) -> Result<InitializedMcpClient, McpClientError> {
287+
pub async fn init(self, os: &Os) -> Result<InitializedMcpClient, McpClientError> {
316288
let os_clone = os.clone();
317289

318290
let handle: JoinHandle<Result<RunningService, McpClientError>> = tokio::spawn(async move {
319291
let messenger_clone = self.messenger.clone();
320292
let server_name = self.server_name.clone();
321-
let backup_config = self.config.clone();
322-
323-
let result: Result<_, McpClientError> = async {
324-
let messenger_dup = messenger_clone.duplicate();
325-
let (service, stderr, auth_client) = match self.get_transport(&os_clone, &*messenger_dup).await? {
326-
Transport::Stdio((child_process, stderr)) => {
327-
let service = self
328-
.into_dyn()
329-
.serve::<TokioChildProcess, _, _>(child_process)
330-
.await
331-
.map_err(Box::new)?;
332-
333-
(service, stderr, None)
334-
},
335-
Transport::Http(http_transport) => {
336-
match http_transport {
337-
HttpTransport::WithAuth((transport, mut auth_client)) => {
338-
// The crate does not automatically refresh tokens when they expire. We
339-
// would need to handle that here
340-
let url = &backup_config.url;
341-
let service = match self.into_dyn().serve(transport).await.map_err(Box::new) {
342-
Ok(service) => service,
343-
Err(e) if matches!(*e, ClientInitializeError::ConnectionClosed(_)) => {
344-
debug!("## mcp: first hand shake attempt failed: {:?}", e);
345-
let refresh_res = auth_client.refresh_token().await;
346-
let new_self = McpClientService::new(
347-
server_name.clone(),
348-
backup_config.clone(),
349-
messenger_clone.clone(),
350-
);
351-
352-
let scopes = &backup_config.oauth_scopes;
353-
let timeout = backup_config.timeout;
354-
let headers = &backup_config.headers;
355-
let new_transport =
356-
get_http_transport(&os_clone, url, timeout, scopes, headers,Some(auth_client.auth_client.clone()), &*messenger_dup).await?;
357-
358-
match new_transport {
359-
HttpTransport::WithAuth((new_transport, new_auth_client)) => {
360-
auth_client = new_auth_client;
361-
362-
match refresh_res {
363-
Ok(_) => {
364-
new_self.into_dyn().serve(new_transport).await.map_err(Box::new)?
365-
},
366-
Err(e) => {
367-
error!("## mcp: token refresh attempt failed: {:?}", e);
368-
info!("Retry for http transport failed {e}. Possible reauth needed");
369-
// This could be because the refresh token is expired, in which
370-
// case we would need to have user go through the auth flow
371-
// again. We do this by deleting the cred
372-
// and discarding the client to trigger a full auth flow
373-
tokio::fs::remove_file(&auth_client.cred_full_path).await?;
374-
let new_transport =
375-
get_http_transport(&os_clone, url, timeout, scopes,headers,None, &*messenger_dup).await?;
376-
377-
match new_transport {
378-
HttpTransport::WithAuth((new_transport, new_auth_client)) => {
379-
auth_client = new_auth_client;
380-
new_self.into_dyn().serve(new_transport).await.map_err(Box::new)?
381-
},
382-
HttpTransport::WithoutAuth(new_transport) => {
383-
new_self.into_dyn().serve(new_transport).await.map_err(Box::new)?
384-
},
385-
}
386-
},
387-
}
388-
},
389-
HttpTransport::WithoutAuth(new_transport) =>
390-
new_self.into_dyn().serve(new_transport).await.map_err(Box::new)?,
391-
}
392-
},
393-
Err(e) => return Err(e.into()),
394-
};
395-
396-
(service, None, Some(auth_client))
397-
},
398-
HttpTransport::WithoutAuth(transport) => {
399-
let service = self.into_dyn().serve(transport).await.map_err(Box::new)?;
400-
401-
(service, None, None)
402-
},
403-
}
404-
},
405-
};
406293

407-
Ok((service, stderr, auth_client))
408-
}
409-
.await;
410-
411-
let (service, child_stderr, auth_dropguard) = match result {
294+
let (service, child_stderr, auth_dropguard) = match self.into_service(&os_clone, &messenger_clone).await {
412295
Ok((service, stderr, auth_dg)) => (service, stderr, auth_dg),
413296
Err(e) => {
414297
let msg = e.to_string();
@@ -498,18 +381,24 @@ impl McpClientService {
498381
Ok(InitializedMcpClient::Pending(handle))
499382
}
500383

501-
async fn get_transport(&mut self, os: &Os, messenger: &dyn Messenger) -> Result<Transport, McpClientError> {
384+
async fn into_service(
385+
mut self,
386+
os: &Os,
387+
messenger: &dyn Messenger,
388+
) -> Result<
389+
(
390+
rmcp::service::RunningService<RoleClient, Box<dyn DynService<RoleClient>>>,
391+
Option<ChildStderr>,
392+
Option<AuthClientWrapper>,
393+
),
394+
McpClientError,
395+
> {
502396
let CustomToolConfig {
503397
r#type,
504398
url,
505-
headers,
506-
oauth_scopes: scopes,
507399
command: command_as_str,
508-
args,
509-
env: config_envs,
510-
timeout,
511400
..
512-
} = &mut self.config;
401+
} = &self.config;
513402

514403
let is_malformed_http = matches!(r#type, TransportType::Http) && url.is_empty();
515404
let is_malformed_stdio = matches!(r#type, TransportType::Stdio) && command_as_str.is_empty();
@@ -526,6 +415,13 @@ impl McpClientService {
526415

527416
match r#type {
528417
TransportType::Stdio => {
418+
let CustomToolConfig {
419+
command: command_as_str,
420+
args,
421+
env: config_envs,
422+
..
423+
} = &mut self.config;
424+
529425
let context = |input: &str| Ok(os.env.get(input).ok());
530426
let home_dir = || os.env.home().map(|p| p.to_string_lossy().to_string());
531427
let expanded_cmd = shellexpand::full_with_context(command_as_str, home_dir, context)?;
@@ -544,12 +440,28 @@ impl McpClientService {
544440
let (tokio_child_process, child_stderr) =
545441
TokioChildProcess::builder(command).stderr(Stdio::piped()).spawn()?;
546442

547-
Ok(Transport::Stdio((tokio_child_process, child_stderr)))
443+
let service = self
444+
.into_dyn()
445+
.serve::<TokioChildProcess, _, _>(tokio_child_process)
446+
.await
447+
.map_err(Box::new)?;
448+
449+
Ok((service, child_stderr, None))
548450
},
549451
TransportType::Http => {
550-
let http_transport = get_http_transport(os, url, *timeout, scopes, headers, None, messenger).await?;
452+
let CustomToolConfig {
453+
url,
454+
headers,
455+
oauth_scopes: scopes,
456+
timeout,
457+
..
458+
} = &self.config;
459+
460+
let http_service_builder = HttpServiceBuilder::new(url, os, url, *timeout, scopes, headers, messenger);
461+
462+
let (service, auth_client_wrapper) = http_service_builder.try_build(&self).await?;
551463

552-
Ok(Transport::Http(http_transport))
464+
Ok((service, None, auth_client_wrapper))
553465
},
554466
}
555467
}
@@ -620,7 +532,7 @@ impl Service<RoleClient> for McpClientService {
620532
_context: rmcp::service::RequestContext<RoleClient>,
621533
) -> Result<<RoleClient as rmcp::service::ServiceRole>::Resp, rmcp::ErrorData> {
622534
match request {
623-
ServerRequest::PingRequest(_) => Err(rmcp::ErrorData::method_not_found::<rmcp::model::PingRequestMethod>()),
535+
ServerRequest::PingRequest(_) => Ok(ClientResult::empty(())),
624536
ServerRequest::CreateMessageRequest(_) => Err(rmcp::ErrorData::method_not_found::<
625537
rmcp::model::CreateMessageRequestMethod,
626538
>()),
@@ -660,6 +572,7 @@ impl Service<RoleClient> for McpClientService {
660572
client_info: Implementation {
661573
name: "Q DEV CLI".to_string(),
662574
version: "1.0.0".to_string(),
575+
..Default::default()
663576
},
664577
}
665578
}

0 commit comments

Comments
 (0)