88
99import httpx
1010from httpx_sse import connect_sse , EventSource
11+ from dify_plugin .config .logger_format import plugin_logger_handler
1112
13+ logger = logging .getLogger (__name__ )
14+ logger .setLevel (logging .DEBUG )
15+ logger .addHandler (plugin_logger_handler )
1216
1317class McpClient (ABC ):
1418 """Interface for MCP client."""
@@ -61,38 +65,38 @@ def __init__(self, name: str, url: str,
6165
6266 def _listen_messages (self ) -> None :
6367 try :
64- logging .info (f"{ self .name } - Connecting to SSE endpoint: { remove_request_params (self .url )} " )
68+ logger .info (f"{ self .name } - Connecting to SSE endpoint: { remove_request_params (self .url )} " )
6569 with connect_sse (
6670 client = self .client ,
6771 method = "GET" ,
6872 url = self .url ,
6973 timeout = httpx .Timeout (self .timeout , read = self .sse_read_timeout ),
7074 ) as event_source :
7175 event_source .response .raise_for_status ()
72- logging .debug (f"{ self .name } - SSE connection established" )
76+ logger .debug (f"{ self .name } - SSE connection established" )
7377 for sse in event_source .iter_sse ():
74- logging .debug (f"{ self .name } - Received SSE event: { sse .event } " )
78+ logger .debug (f"{ self .name } - Received SSE event: { sse .event } " )
7579 if self .should_stop .is_set ():
7680 break
7781 match sse .event :
7882 case "endpoint" :
7983 self .endpoint_url = urljoin (self .url , sse .data )
80- logging .info (f"{ self .name } - Received endpoint URL: { self .endpoint_url } " )
84+ logger .info (f"{ self .name } - Received endpoint URL: { self .endpoint_url } " )
8185 self ._connected .set ()
8286 url_parsed = urlparse (self .url )
8387 endpoint_parsed = urlparse (self .endpoint_url )
8488 if (url_parsed .netloc != endpoint_parsed .netloc
8589 or url_parsed .scheme != endpoint_parsed .scheme ):
8690 error_msg = f"{ self .name } - Endpoint origin does not match connection origin: { self .endpoint_url } "
87- logging .error (error_msg )
91+ logger .error (error_msg )
8892 raise ValueError (error_msg )
8993 case "message" :
9094 message = json .loads (sse .data )
91- logging .debug (f"{ self .name } - Received server message: { message } " )
95+ logger .debug (f"{ self .name } - Received server message: { message } " )
9296 self .message_dict [message ["id" ]] = message
9397 self .response_ready .set ()
9498 case _:
95- logging .warning (f"{ self .name } - Unknown SSE event: { sse .event } " )
99+ logger .warning (f"{ self .name } - Unknown SSE event: { sse .event } " )
96100 except Exception as e :
97101 self ._thread_exception = e
98102 self ._error_event .set ()
@@ -104,7 +108,7 @@ def send_message(self, data: dict):
104108 raise ConnectionError (f"{ self .name } - MCP Server connection failed: { self ._thread_exception } " )
105109 else :
106110 raise RuntimeError (f"{ self .name } - Please call connect() first" )
107- logging .debug (f"{ self .name } - Sending client message: { data } " )
111+ logger .debug (f"{ self .name } - Sending client message: { data } " )
108112 response = self .client .post (
109113 url = self .endpoint_url ,
110114 json = data ,
@@ -113,15 +117,18 @@ def send_message(self, data: dict):
113117 follow_redirects = True ,
114118 )
115119 response .raise_for_status ()
120+ logger .info (f"response status: { response .status_code } { response .reason_phrase } " )
116121 if not response .is_success :
117- raise ValueError (f"{ self .name } - MCP Server response: { response .status_code } { response .reason_phrase } " )
122+ raise ValueError (f"{ self .name } - MCP Server response: { response .status_code } { response .reason_phrase } ( { response . content } ) " )
118123 if "id" in data :
119124 message_id = data ["id" ]
120125 while True :
121126 self .response_ready .wait ()
122127 self .response_ready .clear ()
123128 if message_id in self .message_dict :
129+ logger .info (f"message_id: { message_id } " )
124130 message = self .message_dict .pop (message_id , None )
131+ logger .info (f"message: { message } " )
125132 return message
126133 return {}
127134
@@ -228,19 +235,22 @@ def send_message(self, data: dict):
228235 headers = {"Content-Type" : "application/json" , "Accept" : "application/json, text/event-stream" }
229236 if self .session_id :
230237 headers ["Mcp-Session-Id" ] = self .session_id
231- logging .debug (f"{ self .name } - Sending client message: { data } " )
238+ logger .debug (f"{ self .name } - Sending client message: { data } " )
232239 response = self .client .post (
233240 url = self .url ,
234241 json = data ,
235242 headers = headers ,
236243 timeout = self .timeout ,
237244 follow_redirects = True ,
238245 )
246+ logger .info (f"response status: { response .status_code } { response .reason_phrase } " )
239247 if not response .is_success :
240- raise ValueError (f"{ self .name } - MCP Server response: { response .status_code } { response .reason_phrase } " )
248+ raise ValueError (f"{ self .name } - MCP Server response: { response .status_code } { response .reason_phrase } ({ response .content } )" )
249+ logger .info (f"response headers: { response .headers } " )
241250 if "mcp-session-id" in response .headers :
242251 self .session_id = response .headers .get ("mcp-session-id" )
243252 content_type = response .headers .get ("content-type" , "None" )
253+ logger .info (f"response content: { response .content } " )
244254 message = {}
245255 if content_type == "text/event-stream" :
246256 for sse in EventSource (response ).iter_sse ():
@@ -251,6 +261,7 @@ def send_message(self, data: dict):
251261 message = (response .json () if response .content else None ) or {}
252262 else :
253263 raise Exception (f"{ self .name } - Unsupported Content-Type: { content_type } " )
264+ logger .info (f"message: { message } " )
254265 return message
255266
256267 def initialize (self ):
@@ -367,19 +378,19 @@ def execute_tool(self, tool_name: str, tool_args: dict[str, Any]):
367378 progress = result ["progress" ]
368379 total = result ["total" ]
369380 percentage = (progress / total ) * 100
370- logging .info (
381+ logger .info (
371382 f"Progress: { progress } /{ total } "
372383 f"({ percentage :.1f} %)"
373384 )
374385 return f"Tool execution result: { result } "
375386 except Exception as e :
376387 error_msg = f"Error executing tool: { str (e )} "
377- logging .error (error_msg )
388+ logger .error (error_msg )
378389 return error_msg
379390
380391 def close (self ) -> None :
381392 for client in self ._clients .values ():
382393 try :
383394 client .close ()
384395 except Exception as e :
385- logging .error (e )
396+ logger .error (e )
0 commit comments