@@ -543,6 +543,7 @@ def __init__(
543
543
self .max_connections = max_connections
544
544
self .shutdown_timer = shutdown_timer
545
545
self ._received : dict [str , asyncio .Future ] = {}
546
+ self ._received_subscriptions : dict [str , asyncio .Queue ] = {}
546
547
self ._sending = asyncio .Queue ()
547
548
self ._receiving_task = None # TODO rename, as this now does send/recv
548
549
self ._attempts = 0
@@ -673,7 +674,8 @@ async def _recv(self, recd: bytes) -> None:
673
674
self ._received [response ["id" ]].set_result (response )
674
675
self ._in_use_ids .remove (response ["id" ])
675
676
elif "params" in response :
676
- self ._received [response ["params" ]["subscription" ]].set_result (response )
677
+ sub_id = response ["params" ]["subscription" ]
678
+ await self ._received_subscriptions [sub_id ].put (response )
677
679
else :
678
680
raise KeyError (response )
679
681
@@ -708,6 +710,9 @@ async def _start_sending(self, ws) -> Exception:
708
710
self ._received [i ].cancel ()
709
711
return
710
712
713
+ async def add_subscription (self , subscription_id : str ) -> None :
714
+ self ._received_subscriptions [subscription_id ] = asyncio .Queue ()
715
+
711
716
async def send (self , payload : dict ) -> str :
712
717
"""
713
718
Sends a payload to the websocket connection.
@@ -729,7 +734,7 @@ async def send(self, payload: dict) -> str:
729
734
await self ._sending .put (to_send )
730
735
return original_id
731
736
732
- async def retrieve (self , item_id : int ) -> Optional [dict ]:
737
+ async def retrieve (self , item_id : str ) -> Optional [dict ]:
733
738
"""
734
739
Retrieves a single item from received responses dict queue
735
740
@@ -739,14 +744,20 @@ async def retrieve(self, item_id: int) -> Optional[dict]:
739
744
Returns:
740
745
retrieved item
741
746
"""
742
- item : asyncio .Future = self ._received .get (item_id )
743
- if item .done ():
744
- self .max_subscriptions .release ()
745
- del self ._received [item_id ]
746
- return item .result ()
747
+ item : Optional [asyncio .Future ] = self ._received .get (item_id )
748
+ if item is not None :
749
+ if item .done ():
750
+ self .max_subscriptions .release ()
751
+ del self ._received [item_id ]
752
+ return item .result ()
747
753
else :
748
- await asyncio .sleep (0.1 )
749
- return None
754
+ try :
755
+ return self ._received_subscriptions [item_id ].get_nowait ()
756
+ # TODO make sure to delete during unsubscribe
757
+ except asyncio .QueueEmpty :
758
+ pass
759
+ await asyncio .sleep (0.1 )
760
+ return None
750
761
751
762
752
763
class AsyncSubstrateInterface (SubstrateMixin ):
@@ -2304,6 +2315,7 @@ async def _make_rpc_request(
2304
2315
item_id = request_manager .overwrite_request (
2305
2316
item_id , response ["result" ]
2306
2317
)
2318
+ await ws .add_subscription (response ["result" ])
2307
2319
subscription_added = True
2308
2320
except KeyError :
2309
2321
raise SubstrateRequestException (str (response ))
@@ -2347,12 +2359,13 @@ async def _make_rpc_request(
2347
2359
f"Retrying attempt { attempt + 1 } of { self .max_retries } "
2348
2360
)
2349
2361
return await self ._make_rpc_request (
2350
- payloads ,
2351
- value_scale_type ,
2352
- storage_item ,
2353
- result_handler ,
2354
- attempt + 1 ,
2355
- force_legacy_decode ,
2362
+ payloads = payloads ,
2363
+ value_scale_type = value_scale_type ,
2364
+ storage_item = storage_item ,
2365
+ result_handler = result_handler ,
2366
+ attempt = attempt + 1 ,
2367
+ runtime = runtime ,
2368
+ force_legacy_decode = force_legacy_decode ,
2356
2369
)
2357
2370
2358
2371
return request_manager .get_results ()
0 commit comments