@@ -404,6 +404,7 @@ async def disconnect(self) -> None:
404404 return
405405
406406 await self ._drain_rpc_invocation_tasks ()
407+ await self ._data_stream_tasks ()
407408
408409 req = proto_ffi .FfiRequest ()
409410 req .disconnect .room_handle = self ._ffi_handle .handle # type: ignore
@@ -444,6 +445,7 @@ async def _listen_task(self) -> None:
444445
445446 # Clean up any pending RPC invocation tasks
446447 await self ._drain_rpc_invocation_tasks ()
448+ await self ._data_stream_tasks ()
447449
448450 def _on_rpc_method_invocation (self , rpc_invocation : RpcMethodInvocationEvent ):
449451 if self ._local_participant is None :
@@ -723,6 +725,7 @@ def _on_room_event(self, event: proto_room.RoomEvent):
723725 )
724726 )
725727 self ._data_stream_tasks .add (task )
728+ task .add_done_callback (self ._data_stream_tasks .discard )
726729
727730 elif which == "stream_trailer_received" :
728731 task = asyncio .create_task (
@@ -731,6 +734,7 @@ def _on_room_event(self, event: proto_room.RoomEvent):
731734 )
732735 )
733736 self ._data_stream_tasks .add (task )
737+ task .add_done_callback (self ._data_stream_tasks .discard )
734738
735739 async def _drain_rpc_invocation_tasks (self ) -> None :
736740 if self ._rpc_invocation_tasks :
0 commit comments