@@ -5,7 +5,20 @@ use std::process::Stdio;
55use regex:: Regex ;
66use reqwest:: Client ;
77use rmcp:: model:: {
8- CallToolRequestParam , CallToolResult , ErrorCode , GetPromptRequestParam , GetPromptResult , Implementation , InitializeRequestParam , ListPromptsResult , ListToolsResult , LoggingLevel , LoggingMessageNotificationParam , PaginatedRequestParam , ServerNotification , ServerRequest
8+ CallToolRequestParam ,
9+ CallToolResult ,
10+ ErrorCode ,
11+ GetPromptRequestParam ,
12+ GetPromptResult ,
13+ Implementation ,
14+ InitializeRequestParam ,
15+ ListPromptsResult ,
16+ ListToolsResult ,
17+ LoggingLevel ,
18+ LoggingMessageNotificationParam ,
19+ PaginatedRequestParam ,
20+ ServerNotification ,
21+ ServerRequest ,
922} ;
1023use rmcp:: service:: {
1124 ClientInitializeError ,
@@ -31,6 +44,7 @@ use tokio::process::{
3144} ;
3245use tokio:: task:: JoinHandle ;
3346use tracing:: {
47+ debug,
3448 error,
3549 info,
3650} ;
@@ -152,7 +166,7 @@ macro_rules! decorate_with_auth_retry {
152166 // TODO: discern error type prior to retrying
153167 // Not entirely sure what is thrown when auth is required
154168 if let Some ( auth_client) = self . get_auth_client( ) {
155- let refresh_result = auth_client. auth_manager . lock ( ) . await . refresh_token ( ) . await ;
169+ let refresh_result = auth_client. get_access_token ( ) . await ;
156170 match refresh_result {
157171 Ok ( _) => {
158172 // Retry the operation after token refresh
@@ -163,8 +177,11 @@ macro_rules! decorate_with_auth_retry {
163177 } ,
164178 Err ( _) => {
165179 // If refresh fails, return the original error
180+ // Currently our event loop just does not allow us easy ways to
181+ // reauth entirely once a session starts since this would mean
182+ // swapping of transport (which also means swapping of client)
166183 Err ( e)
167- }
184+ } ,
168185 }
169186 } else {
170187 // No auth client available, return original error
@@ -176,6 +193,11 @@ macro_rules! decorate_with_auth_retry {
176193 } ;
177194}
178195
196+ /// Wrapper around rmcp service types to enable cloning.
197+ ///
198+ /// This exists because `rmcp::service::RunningService` is not directly cloneable as it is a
199+ /// pointer type to `Peer<C>`. This enum allows us to hold either the original service or its
200+ /// peer representation, enabling cloning by converting the original service to a peer when needed.
179201pub enum InnerService {
180202 Original ( rmcp:: service:: RunningService < RoleClient , Box < dyn DynService < RoleClient > > > ) ,
181203 Peer ( rmcp:: service:: Peer < RoleClient > ) ,
@@ -194,11 +216,22 @@ impl Clone for InnerService {
194216 fn clone ( & self ) -> Self {
195217 match self {
196218 InnerService :: Original ( rs) => InnerService :: Peer ( ( * rs) . clone ( ) ) ,
197- InnerService :: Peer ( peer) => InnerService :: Peer ( peer. clone ( ) )
219+ InnerService :: Peer ( peer) => InnerService :: Peer ( peer. clone ( ) ) ,
198220 }
199221 }
200222}
201223
224+ /// A wrapper around MCP (Model Context Protocol) service instances that manages
225+ /// authentication and enables cloning functionality.
226+ ///
227+ /// This struct holds either an original `RunningService` or its peer representation,
228+ /// along with an optional authentication drop guard for managing OAuth tokens.
229+ /// The authentication drop guard handles token lifecycle and cleanup when the
230+ /// service is dropped.
231+ ///
232+ /// # Fields
233+ /// * `inner_service` - The underlying MCP service instance (original or peer)
234+ /// * `auth_dropguard` - Optional authentication manager for OAuth token handling
202235#[ derive( Debug ) ]
203236pub struct RunningService {
204237 pub inner_service : InnerService ,
@@ -215,18 +248,19 @@ impl Clone for RunningService {
215248
216249 RunningService {
217250 inner_service : self . inner_service . clone ( ) ,
218- auth_dropguard
251+ auth_dropguard,
219252 }
220253 }
221254}
222255
223256impl RunningService {
257+ decorate_with_auth_retry ! ( CallToolRequestParam , call_tool, CallToolResult ) ;
258+
259+ decorate_with_auth_retry ! ( GetPromptRequestParam , get_prompt, GetPromptResult ) ;
260+
224261 pub fn get_auth_client ( & self ) -> Option < AuthClient < Client > > {
225262 self . auth_dropguard . as_ref ( ) . map ( |a| a. auth_client . clone ( ) )
226263 }
227-
228- decorate_with_auth_retry ! ( CallToolRequestParam , call_tool, CallToolResult ) ;
229- decorate_with_auth_retry ! ( GetPromptRequestParam , get_prompt, GetPromptResult ) ;
230264}
231265
232266pub type StdioTransport = ( TokioChildProcess , Option < ChildStderr > ) ;
@@ -304,16 +338,17 @@ impl McpClientService {
304338 let service = match self . into_dyn ( ) . serve ( transport) . await . map_err ( Box :: new) {
305339 Ok ( service) => service,
306340 Err ( e) if matches ! ( * e, ClientInitializeError :: ConnectionClosed ( _) ) => {
341+ debug ! ( "## mcp: first hand shake attempt failed: {:?}" , e) ;
307342 let refresh_res =
308- auth_dg. auth_client . auth_manager . lock ( ) . await . refresh_token ( ) . await ;
343+ auth_dg. auth_client . get_access_token ( ) . await ;
309344 let new_self = McpClientService :: new (
310345 server_name. clone ( ) ,
311346 backup_config,
312347 messenger_clone. clone ( ) ,
313348 ) ;
314349
315350 let new_transport =
316- get_http_transport ( & os_clone, true , & url, & * messenger_dup) . await ?;
351+ get_http_transport ( & os_clone, true , & url, Some ( auth_dg . auth_client . clone ( ) ) , & * messenger_dup) . await ?;
317352
318353 match new_transport {
319354 HttpTransport :: WithAuth ( ( new_transport, new_auth_dg) ) => {
@@ -325,13 +360,14 @@ impl McpClientService {
325360 new_self. into_dyn ( ) . serve ( new_transport) . await . map_err ( Box :: new) ?
326361 } ,
327362 Err ( e) => {
363+ error ! ( "## mcp: token refresh attempt failed: {:?}" , e) ;
328364 info ! ( "Retry for http transport failed {e}. Possible reauth needed" ) ;
329365 // This could be because the refresh token is expired, in which
330366 // case we would need to have user go through the auth flow
331367 // again
332368 let new_transport =
333- get_http_transport ( & os_clone, true , & url, & * messenger_dup) . await ?;
334-
369+ get_http_transport ( & os_clone, true , & url, None , & * messenger_dup) . await ?;
370+
335371 match new_transport {
336372 HttpTransport :: WithAuth ( ( new_transport, new_auth_dg) ) => {
337373 auth_dg = new_auth_dg;
@@ -345,7 +381,7 @@ impl McpClientService {
345381 } ,
346382 }
347383 } ,
348- HttpTransport :: WithoutAuth ( new_transport) =>
384+ HttpTransport :: WithoutAuth ( new_transport) =>
349385 new_self. into_dyn ( ) . serve ( new_transport) . await . map_err ( Box :: new) ?,
350386 }
351387 } ,
@@ -487,7 +523,7 @@ impl McpClientService {
487523 Ok ( Transport :: Stdio ( ( tokio_child_process, child_stderr) ) )
488524 } ,
489525 TransportType :: Http => {
490- let http_transport = get_http_transport ( os, false , url, messenger) . await ?;
526+ let http_transport = get_http_transport ( os, false , url, None , messenger) . await ?;
491527
492528 Ok ( Transport :: Http ( http_transport) )
493529 } ,
0 commit comments