9
9
import logging
10
10
import ssl
11
11
import warnings
12
+ from contextlib import suppress
12
13
from unittest .mock import AsyncMock
13
14
from hashlib import blake2b
14
15
from typing import (
@@ -524,7 +525,7 @@ def __init__(
524
525
shutdown_timer = 5 ,
525
526
options : Optional [dict ] = None ,
526
527
_log_raw_websockets : bool = False ,
527
- retry_timeout : float = 60.0
528
+ retry_timeout : float = 60.0 ,
528
529
):
529
530
"""
530
531
Websocket manager object. Allows for the use of a single websocket connection by multiple
@@ -604,7 +605,6 @@ async def _cancel(self):
604
605
)
605
606
606
607
async def connect (self , force = False ):
607
- logger .debug ("Connecting." )
608
608
now = await self .loop_time ()
609
609
self .last_received = now
610
610
self .last_sent = now
@@ -620,28 +620,24 @@ async def connect(self, force=False):
620
620
self .ws = await asyncio .wait_for (
621
621
connect (self .ws_url , ** self ._options ), timeout = 10.0
622
622
)
623
- logger .debug ("Connected." )
624
623
if self ._send_recv_task is None or self ._send_recv_task .done ():
625
624
self ._send_recv_task = asyncio .get_running_loop ().create_task (
626
625
self ._handler (self .ws )
627
626
)
628
- logger .debug ("Recd task started." )
629
627
self ._initialized = True
630
628
631
629
async def _handler (self , ws : ClientConnection ) -> None :
632
630
recv_task = asyncio .create_task (self ._start_receiving (ws ))
633
631
send_task = asyncio .create_task (self ._start_sending (ws ))
634
- logger .debug ("Starting send/recv tasks." )
635
632
done , pending = await asyncio .wait (
636
633
[recv_task , send_task ],
637
634
return_when = asyncio .FIRST_COMPLETED ,
638
635
)
639
- logger .debug (f"send/recv tasks done: { done } \n { pending } " )
640
636
loop = asyncio .get_running_loop ()
641
637
should_reconnect = False
642
638
for task in pending :
643
639
task .cancel ()
644
- if isinstance (recv_task .exception (), asyncio .TimeoutError ):
640
+ if isinstance (recv_task .result (), asyncio .TimeoutError ):
645
641
# TODO check the logic here
646
642
should_reconnect = True
647
643
if should_reconnect is True :
@@ -680,15 +676,13 @@ async def _exit_with_timer(self):
680
676
pass
681
677
682
678
async def shutdown (self ):
683
- self ._is_closing = True
684
679
try :
685
680
await asyncio .wait_for (self ._cancel (), timeout = 10.0 )
686
681
except asyncio .TimeoutError :
687
682
pass
688
683
self .ws = None
689
684
self ._initialized = False
690
685
self ._send_recv_task = None
691
- self ._is_closing = False
692
686
693
687
async def _recv (self , recd : bytes ) -> None :
694
688
if self ._log_raw_websockets :
@@ -728,7 +722,6 @@ async def _start_sending(self, ws) -> Exception:
728
722
try :
729
723
while True :
730
724
to_send_ = await self ._sending .get ()
731
- logger .debug (f"Pulled { to_send_ } from the queue" )
732
725
send_id = to_send_ ["id" ]
733
726
to_send = json .dumps (to_send_ )
734
727
async with self ._lock :
@@ -747,10 +740,6 @@ async def _start_sending(self, ws) -> Exception:
747
740
self ._received [i ].cancel ()
748
741
return e
749
742
750
- async def add_subscription (self , subscription_id : str ) -> None :
751
- logger .debug (f"Adding { subscription_id } to subscriptions." )
752
- self ._received_subscriptions [subscription_id ] = asyncio .Queue ()
753
-
754
743
async def send (self , payload : dict ) -> str :
755
744
"""
756
745
Sends a payload to the websocket connection.
@@ -762,7 +751,6 @@ async def send(self, payload: dict) -> str:
762
751
id: the internal ID of the request (incremented int)
763
752
"""
764
753
await self .max_subscriptions .acquire ()
765
- logger .debug (f"Sending payload: { payload } " )
766
754
async with self ._lock :
767
755
original_id = get_next_id ()
768
756
while original_id in self ._in_use_ids :
@@ -771,7 +759,6 @@ async def send(self, payload: dict) -> str:
771
759
self ._received [original_id ] = asyncio .get_running_loop ().create_future ()
772
760
to_send = {** payload , ** {"id" : original_id }}
773
761
await self ._sending .put (to_send )
774
- logger .debug ("767 queue put" )
775
762
return original_id
776
763
777
764
async def unsubscribe (self , subscription_id : str ) -> None :
@@ -2374,7 +2361,6 @@ async def _make_rpc_request(
2374
2361
item_id = request_manager .overwrite_request (
2375
2362
item_id , response ["result" ]
2376
2363
)
2377
- await ws .add_subscription (response ["result" ])
2378
2364
subscription_added = True
2379
2365
except KeyError :
2380
2366
raise SubstrateRequestException (str (response ))
0 commit comments