@@ -3,7 +3,6 @@ use std::collections::HashMap;
33use std:: process:: Stdio ;
44
55use regex:: Regex ;
6- use reqwest:: Client ;
76use rmcp:: model:: {
87 CallToolRequestParam ,
98 CallToolResult ,
@@ -25,7 +24,6 @@ use rmcp::service::{
2524 DynService ,
2625 NotificationContext ,
2726} ;
28- use rmcp:: transport:: auth:: AuthClient ;
2927use rmcp:: transport:: {
3028 ConfigureCommandExt ,
3129 TokioChildProcess ,
@@ -52,7 +50,7 @@ use tracing::{
5250use super :: messenger:: Messenger ;
5351use super :: oauth_util:: HttpTransport ;
5452use super :: {
55- AuthClientDropGuard ,
53+ AuthClientWrapper ,
5654 OauthUtilError ,
5755 get_http_transport,
5856} ;
@@ -175,10 +173,11 @@ macro_rules! decorate_with_auth_retry {
175173 Err ( e) => {
176174 // TODO: discern error type prior to retrying
177175 // Not entirely sure what is thrown when auth is required
178- if let Some ( auth_client) = self . get_auth_client ( ) {
179- let refresh_result = auth_client. auth_manager . lock ( ) . await . refresh_token( ) . await ;
176+ if let Some ( auth_client) = self . auth_client . as_ref ( ) {
177+ let refresh_result = auth_client. refresh_token( ) . await ;
180178 match refresh_result {
181179 Ok ( _) => {
180+ info!( "Token refreshed" ) ;
182181 // Retry the operation after token refresh
183182 match & self . inner_service {
184183 InnerService :: Original ( rs) => rs. $method_name( param) . await ,
@@ -245,20 +244,14 @@ impl Clone for InnerService {
245244#[ derive( Debug ) ]
246245pub struct RunningService {
247246 pub inner_service : InnerService ,
248- auth_dropguard : Option < AuthClientDropGuard > ,
247+ auth_client : Option < AuthClientWrapper > ,
249248}
250249
251250impl Clone for RunningService {
252251 fn clone ( & self ) -> Self {
253- let auth_dropguard = self . auth_dropguard . as_ref ( ) . map ( |dg| {
254- let mut dg = dg. clone ( ) ;
255- dg. should_write = false ;
256- dg
257- } ) ;
258-
259252 RunningService {
260253 inner_service : self . inner_service . clone ( ) ,
261- auth_dropguard ,
254+ auth_client : self . auth_client . clone ( ) ,
262255 }
263256 }
264257}
@@ -267,10 +260,6 @@ impl RunningService {
267260 decorate_with_auth_retry ! ( CallToolRequestParam , call_tool, CallToolResult ) ;
268261
269262 decorate_with_auth_retry ! ( GetPromptRequestParam , get_prompt, GetPromptResult ) ;
270-
271- pub fn get_auth_client ( & self ) -> Option < AuthClient < Client > > {
272- self . auth_dropguard . as_ref ( ) . map ( |a| a. auth_client . clone ( ) )
273- }
274263}
275264
276265pub type StdioTransport = ( TokioChildProcess , Option < ChildStderr > ) ;
@@ -341,32 +330,30 @@ impl McpClientService {
341330 } ,
342331 Transport :: Http ( http_transport) => {
343332 match http_transport {
344- HttpTransport :: WithAuth ( ( transport, mut auth_dg ) ) => {
333+ HttpTransport :: WithAuth ( ( transport, mut auth_client ) ) => {
345334 // The crate does not automatically refresh tokens when they expire. We
346335 // would need to handle that here
347336 let url = self . config . url . clone ( ) ;
348337 let service = match self . into_dyn ( ) . serve ( transport) . await . map_err ( Box :: new) {
349338 Ok ( service) => service,
350339 Err ( e) if matches ! ( * e, ClientInitializeError :: ConnectionClosed ( _) ) => {
351340 debug ! ( "## mcp: first hand shake attempt failed: {:?}" , e) ;
352- let refresh_res =
353- auth_dg. auth_client . auth_manager . lock ( ) . await . refresh_token ( ) . await ;
341+ let refresh_res = auth_client. refresh_token ( ) . await ;
354342 let new_self = McpClientService :: new (
355343 server_name. clone ( ) ,
356344 backup_config,
357345 messenger_clone. clone ( ) ,
358346 ) ;
359347
360348 let new_transport =
361- get_http_transport ( & os_clone, true , & url, Some ( auth_dg . auth_client . clone ( ) ) , & * messenger_dup) . await ?;
349+ get_http_transport ( & os_clone, true , & url, Some ( auth_client . auth_client . clone ( ) ) , & * messenger_dup) . await ?;
362350
363351 match new_transport {
364- HttpTransport :: WithAuth ( ( new_transport, new_auth_dg) ) => {
365- auth_dg. should_write = false ;
366- auth_dg = new_auth_dg;
352+ HttpTransport :: WithAuth ( ( new_transport, new_auth_client) ) => {
353+ auth_client = new_auth_client;
367354
368355 match refresh_res {
369- Ok ( _token ) => {
356+ Ok ( _ ) => {
370357 new_self. into_dyn ( ) . serve ( new_transport) . await . map_err ( Box :: new) ?
371358 } ,
372359 Err ( e) => {
@@ -379,9 +366,8 @@ impl McpClientService {
379366 get_http_transport ( & os_clone, true , & url, None , & * messenger_dup) . await ?;
380367
381368 match new_transport {
382- HttpTransport :: WithAuth ( ( new_transport, new_auth_dg) ) => {
383- auth_dg = new_auth_dg;
384- auth_dg. should_write = false ;
369+ HttpTransport :: WithAuth ( ( new_transport, new_auth_client) ) => {
370+ auth_client = new_auth_client;
385371 new_self. into_dyn ( ) . serve ( new_transport) . await . map_err ( Box :: new) ?
386372 } ,
387373 HttpTransport :: WithoutAuth ( new_transport) => {
@@ -398,7 +384,7 @@ impl McpClientService {
398384 Err ( e) => return Err ( e. into ( ) ) ,
399385 } ;
400386
401- ( service, None , Some ( auth_dg ) )
387+ ( service, None , Some ( auth_client ) )
402388 } ,
403389 HttpTransport :: WithoutAuth ( transport) => {
404390 let service = self . into_dyn ( ) . serve ( transport) . await . map_err ( Box :: new) ?;
@@ -496,7 +482,7 @@ impl McpClientService {
496482
497483 Ok ( RunningService {
498484 inner_service : InnerService :: Original ( service) ,
499- auth_dropguard,
485+ auth_client : auth_dropguard,
500486 } )
501487 } ) ;
502488
0 commit comments