@@ -692,20 +692,25 @@ async def _recv(self, recd: bytes) -> None:
692
692
if "id" in response :
693
693
async with self ._lock :
694
694
self ._inflight .pop (response ["id" ])
695
- self ._received [response ["id" ]].set_result (response )
696
- self ._in_use_ids .remove (response ["id" ])
695
+ with suppress (KeyError ):
696
+ # These would be subscriptions that were unsubscribed
697
+ self ._received [response ["id" ]].set_result (response )
698
+ self ._in_use_ids .remove (response ["id" ])
697
699
elif "params" in response :
698
700
# TODO self._inflight won't work with subscriptions
699
701
sub_id = response ["params" ]["subscription" ]
700
- logger .debug (f"Adding { sub_id } to subscriptions." )
702
+ if sub_id not in self ._received_subscriptions :
703
+ self ._received_subscriptions [sub_id ] = asyncio .Queue ()
701
704
await self ._received_subscriptions [sub_id ].put (response )
702
705
else :
703
706
raise KeyError (response )
704
707
705
708
async def _start_receiving (self , ws : ClientConnection ) -> Exception :
706
709
try :
707
710
while True :
708
- recd = await asyncio .wait_for (ws .recv (decode = False ), timeout = self .retry_timeout )
711
+ recd = await asyncio .wait_for (
712
+ ws .recv (decode = False ), timeout = self .retry_timeout
713
+ )
709
714
await self ._recv (recd )
710
715
except Exception as e :
711
716
logger .exception ("Start receving exception" , exc_info = e )
@@ -774,7 +779,12 @@ async def unsubscribe(self, subscription_id: str) -> None:
774
779
original_id = get_next_id ()
775
780
del self ._received_subscriptions [subscription_id ]
776
781
777
- to_send = {"jsonrpc" : "2.0" , "method" : "author_unwatchExtrinsic" , "params" : [subscription_id ]}
782
+ to_send = {
783
+ "jsonrpc" : "2.0" ,
784
+ "id" : original_id ,
785
+ "method" : "author_unwatchExtrinsic" ,
786
+ "params" : [subscription_id ],
787
+ }
778
788
await self ._sending .put (to_send )
779
789
780
790
async def retrieve (self , item_id : str ) -> Optional [dict ]:
0 commit comments