@@ -108,6 +108,7 @@ def __init__(
108108 receive_notification_type : type [ReceiveNotificationT ],
109109 # If none, reading will never time out
110110 read_timeout_seconds : timedelta | None = None ,
111+ cleanup_interval_seconds : float = 60.0 ,
111112 ) -> None :
112113 self ._read_stream = read_stream
113114 self ._write_stream = write_stream
@@ -116,6 +117,8 @@ def __init__(
116117 self ._receive_request_type = receive_request_type
117118 self ._receive_notification_type = receive_notification_type
118119 self ._read_timeout_seconds = read_timeout_seconds
120+ self ._cleanup_interval = cleanup_interval_seconds
121+ self ._in_flight : dict [RequestId , RequestResponder ] = {}
119122
120123 self ._incoming_message_stream_writer , self ._incoming_message_stream_reader = (
121124 anyio .create_memory_object_stream [
@@ -129,6 +132,7 @@ async def __aenter__(self):
129132 self ._task_group = anyio .create_task_group ()
130133 await self ._task_group .__aenter__ ()
131134 self ._task_group .start_soon (self ._receive_loop )
135+ self ._task_group .start_soon (self ._cleanup_loop )
132136 return self
133137
134138 async def __aexit__ (self , exc_type , exc_val , exc_tb ):
@@ -221,8 +225,20 @@ async def _send_response(
221225 )
222226 await self ._write_stream .send (JSONRPCMessage (jsonrpc_response ))
223227
228+ async def _cleanup_loop (self ) -> None :
229+ """Periodically clean up completed and cancelled requests."""
230+ while True :
231+ with anyio .move_on_after (self ._cleanup_interval ):
232+ # Clean up completed requests
233+ self ._in_flight = {
234+ req_id : responder
235+ for req_id , responder in self ._in_flight .items ()
236+ if responder .in_flight
237+ }
238+ await anyio .sleep (self ._cleanup_interval )
239+
224240 async def _receive_loop (self ) -> None :
225- in_flight : dict [ RequestId , RequestResponder ] = {}
241+ """Handle incoming messages and maintain request state."""
226242
227243 async with (
228244 self ._read_stream ,
@@ -231,9 +247,9 @@ async def _receive_loop(self) -> None:
231247 ):
232248 async for message in self ._read_stream :
233249 # Clean up completed requests
234- in_flight = {
250+ self . _in_flight = {
235251 req_id : responder
236- for req_id , responder in in_flight .items ()
252+ for req_id , responder in self . _in_flight .items ()
237253 if responder .in_flight
238254 }
239255
@@ -257,7 +273,7 @@ async def _receive_loop(self) -> None:
257273 cancel_scope = scope ,
258274 )
259275
260- in_flight [message .root .id ] = responder
276+ self . _in_flight [message .root .id ] = responder
261277
262278 await self ._received_request (responder )
263279 if not responder ._responded :
@@ -272,8 +288,8 @@ async def _receive_loop(self) -> None:
272288 # Handle cancellation notifications
273289 if isinstance (notification .root , CancelledNotification ):
274290 cancelled_id = notification .root .params .requestId
275- if cancelled_id in in_flight :
276- await in_flight [cancelled_id ].cancel ()
291+ if cancelled_id in self . _in_flight :
292+ await self . _in_flight [cancelled_id ].cancel ()
277293 else :
278294 await self ._received_notification (notification )
279295 await self ._incoming_message_stream_writer .send (notification )
0 commit comments