@@ -636,7 +636,7 @@ async def _handler(self, ws: ClientConnection) -> None:
636
636
[recv_task , send_task ],
637
637
return_when = asyncio .FIRST_COMPLETED ,
638
638
)
639
- logger .debug ("send/recv tasks done. " )
639
+ logger .debug (f "send/recv tasks done: { done } \n { pending } " )
640
640
loop = asyncio .get_running_loop ()
641
641
should_reconnect = False
642
642
for task in pending :
@@ -652,6 +652,10 @@ async def _handler(self, ws: ClientConnection) -> None:
652
652
logger .info ("Timeout occurred. Reconnecting." )
653
653
await self .connect (True )
654
654
await self ._handler (ws = ws )
655
+ elif isinstance (e := recv_task .result (), Exception ):
656
+ return e
657
+ elif isinstance (e := send_task .result (), Exception ):
658
+ return e
655
659
656
660
async def __aexit__ (self , exc_type , exc_val , exc_tb ):
657
661
if not self .state != State .CONNECTING :
@@ -699,26 +703,25 @@ async def _recv(self, recd: bytes) -> None:
699
703
elif "params" in response :
700
704
# TODO self._inflight won't work with subscriptions
701
705
sub_id = response ["params" ]["subscription" ]
706
+ logger .debug (f"Adding { sub_id } to subscriptions." )
702
707
await self ._received_subscriptions [sub_id ].put (response )
703
708
else :
704
709
raise KeyError (response )
705
710
706
711
async def _start_receiving (self , ws : ClientConnection ) -> Exception :
707
712
try :
708
713
while True :
709
- if self ._inflight :
710
- recd = await asyncio .wait_for (ws .recv (decode = False ), timeout = self .retry_timeout )
711
- await self ._recv (recd )
712
- else :
713
- await asyncio .sleep (0.1 )
714
+ recd = await asyncio .wait_for (ws .recv (decode = False ), timeout = self .retry_timeout )
715
+ await self ._recv (recd )
714
716
except Exception as e :
717
+ logger .exception ("Start receving exception" , exc_info = e )
715
718
if isinstance (e , ssl .SSLError ):
716
719
e = ConnectionClosed
717
720
for fut in self ._received .values ():
718
721
if not fut .done ():
719
722
fut .set_exception (e )
720
723
fut .cancel ()
721
- return
724
+ return e
722
725
723
726
async def _start_sending (self , ws ) -> Exception :
724
727
to_send = None
@@ -742,9 +745,10 @@ async def _start_sending(self, ws) -> Exception:
742
745
for i in self ._received .keys ():
743
746
self ._received [i ].set_exception (e )
744
747
self ._received [i ].cancel ()
745
- return
748
+ return e
746
749
747
750
async def add_subscription (self , subscription_id : str ) -> None :
751
+ logger .debug (f"Adding { subscription_id } to subscriptions." )
748
752
self ._received_subscriptions [subscription_id ] = asyncio .Queue ()
749
753
750
754
async def send (self , payload : dict ) -> str :
@@ -770,6 +774,22 @@ async def send(self, payload: dict) -> str:
770
774
logger .debug ("767 queue put" )
771
775
return original_id
772
776
777
+ async def unsubscribe (self , subscription_id : str ) -> None :
778
+ """
779
+ Unwatches a watched extrinsic subscription.
780
+
781
+ Args:
782
+ subscription_id: the internal ID of the subscription (typically a hex string)
783
+ """
784
+ async with self ._lock :
785
+ original_id = get_next_id ()
786
+ while original_id in self ._in_use_ids :
787
+ original_id = get_next_id ()
788
+ del self ._received_subscriptions [subscription_id ]
789
+
790
+ to_send = {"jsonrpc" : "2.0" , "method" : "author_unwatchExtrinsic" , "params" : [subscription_id ]}
791
+ await self ._sending .put (to_send )
792
+
773
793
async def retrieve (self , item_id : str ) -> Optional [dict ]:
774
794
"""
775
795
Retrieves a single item from received responses dict queue
@@ -789,9 +809,11 @@ async def retrieve(self, item_id: str) -> Optional[dict]:
789
809
else :
790
810
try :
791
811
return self ._received_subscriptions [item_id ].get_nowait ()
792
- # TODO make sure to delete during unsubscribe
793
812
except asyncio .QueueEmpty :
794
813
pass
814
+ if self ._send_recv_task .done ():
815
+ if isinstance (e := self ._send_recv_task .result (), Exception ):
816
+ raise e
795
817
await asyncio .sleep (0.1 )
796
818
return None
797
819
@@ -3776,10 +3798,8 @@ async def result_handler(message: dict, subscription_id) -> tuple[dict, bool]:
3776
3798
}
3777
3799
3778
3800
if "finalized" in message_result and wait_for_finalization :
3779
- # Created as a task because we don't actually care about the result
3780
- self ._forgettable_task = asyncio .create_task (
3781
- self .rpc_request ("author_unwatchExtrinsic" , [subscription_id ])
3782
- )
3801
+ async with self .ws as ws :
3802
+ await ws .unsubscribe (subscription_id )
3783
3803
return {
3784
3804
"block_hash" : message_result ["finalized" ],
3785
3805
"extrinsic_hash" : "0x{}" .format (extrinsic .extrinsic_hash .hex ()),
@@ -3790,10 +3810,8 @@ async def result_handler(message: dict, subscription_id) -> tuple[dict, bool]:
3790
3810
and wait_for_inclusion
3791
3811
and not wait_for_finalization
3792
3812
):
3793
- # Created as a task because we don't actually care about the result
3794
- self ._forgettable_task = asyncio .create_task (
3795
- self .rpc_request ("author_unwatchExtrinsic" , [subscription_id ])
3796
- )
3813
+ async with self .ws as ws :
3814
+ await ws .unsubscribe (subscription_id )
3797
3815
return {
3798
3816
"block_hash" : message_result ["inblock" ],
3799
3817
"extrinsic_hash" : "0x{}" .format (extrinsic .extrinsic_hash .hex ()),
0 commit comments