22from contextlib import asynccontextmanager
33from typing import Any , cast
44from uuid import UUID
5- from pydantic import ValidationError
65
76import anyio
87from anyio import CapacityLimiter , lowlevel
8+ from pydantic import ValidationError
99
1010import mcp .types as types
1111from mcp .server .message_queue .base import MessageCallback
@@ -68,7 +68,7 @@ async def subscribe(self, session_id: UUID, callback: MessageCallback):
6868 await self ._pubsub .unsubscribe (channel ) # type: ignore
6969 await self ._redis .srem (self ._active_sessions_key , session_id .hex )
7070 del self ._callbacks [session_id ]
71- logger .debug (f"Unsubscribed from Redis channel for session { session_id } " )
71+ logger .debug (f"Unsubscribed from Redis channel: { session_id } " )
7272
7373 async def _listen_for_messages (self ) -> None :
7474 """Background task that listens for messages on subscribed channels."""
@@ -84,37 +84,37 @@ async def _listen_for_messages(self) -> None:
8484
8585 channel : str = cast (str , message ["channel" ])
8686 expected_prefix = f"{ self ._prefix } session:"
87-
87+
8888 if not channel .startswith (expected_prefix ):
8989 logger .debug (f"Ignoring message from non-MCP channel: { channel } " )
9090 continue
91-
92- session_hex = channel [len (expected_prefix ):]
91+
92+ session_hex = channel [len (expected_prefix ) :]
9393 try :
9494 session_id = UUID (hex = session_hex )
9595 expected_channel = self ._session_channel (session_id )
9696 if channel != expected_channel :
9797 logger .error (f"Channel format mismatch: { channel } " )
9898 continue
9999 except ValueError :
100- logger .error (f"Received message with invalid UUID in channel: { channel } " )
100+ logger .error (f"Invalid UUID in channel: { channel } " )
101101 continue
102102
103103 data : str = cast (str , message ["data" ])
104104 try :
105105 if session_id not in self ._callbacks :
106- logger .warning (f"Message dropped: no callback for session { session_id } " )
106+ logger .warning (f"Message dropped: no callback for { session_id } " )
107107 continue
108-
108+
109109 # Try to parse as valid message or recreate original ValidationError
110110 try :
111111 msg = types .JSONRPCMessage .model_validate_json (data )
112112 await self ._callbacks [session_id ](msg )
113113 except ValidationError as exc :
114- # Pass the identical validation error that would have occurred originally
114+ # Pass the identical validation error that would have occurred
115115 await self ._callbacks [session_id ](exc )
116116 except Exception as e :
117- logger .error (f"Error processing message for session { session_id } : { e } " )
117+ logger .error (f"Error processing message for { session_id } : { e } " )
118118
119119 async def publish_message (
120120 self , session_id : UUID , message : types .JSONRPCMessage | str
0 commit comments