Skip to content

Commit 6cb12cf

Browse files
committed
Okay. Seems to work well.
1 parent 7d8c5b4 commit 6cb12cf

File tree

1 file changed

+50
-43
lines changed

1 file changed

+50
-43
lines changed

async_substrate_interface/async_substrate.py

Lines changed: 50 additions & 43 deletions
Original file line numberDiff line numberDiff line change
@@ -664,30 +664,28 @@ async def shutdown(self):
664664
self._is_closing = False
665665

666666
async def _recv(self, recd) -> None:
667-
try:
668-
if self._log_raw_websockets:
669-
raw_websocket_logger.debug(f"WEBSOCKET_RECEIVE> {recd}")
670-
response = json.loads(recd)
671-
self.last_received = await self.loop_time()
672-
if "id" in response:
673-
self._received[response["id"]].set_result(response)
674-
self._in_use_ids.remove(response["id"])
675-
elif "params" in response:
676-
self._received[response["params"]["subscription"]].set_result(response)
677-
else:
678-
raise KeyError(response)
679-
except ssl.SSLError:
680-
raise ConnectionClosed
681-
except (ConnectionClosed, KeyError):
682-
raise
667+
if self._log_raw_websockets:
668+
raw_websocket_logger.debug(f"WEBSOCKET_RECEIVE> {recd}")
669+
response = json.loads(recd)
670+
self.last_received = await self.loop_time()
671+
if "id" in response:
672+
self._received[response["id"]].set_result(response)
673+
self._in_use_ids.remove(response["id"])
674+
elif "params" in response:
675+
self._received[response["params"]["subscription"]].set_result(response)
676+
else:
677+
raise KeyError(response)
683678

684679
async def _start_receiving(self, ws: ClientConnection) -> Exception:
685680
try:
686681
async for recd in ws:
687682
await self._recv(recd)
688683
except Exception as e:
684+
if isinstance(e, ssl.SSLError):
685+
e = ConnectionClosed
689686
for i in self._received.keys():
690687
self._received[i].set_exception(e)
688+
self._received[i].cancel()
691689
return
692690

693691
async def _start_sending(self, ws) -> Exception:
@@ -702,9 +700,11 @@ async def _start_sending(self, ws) -> Exception:
702700
except Exception as e:
703701
if to_send is not None:
704702
self._received[to_send["id"]].set_exception(e)
703+
self._received[to_send["id"]].cancel()
705704
else:
706705
for i in self._received.keys():
707706
self._received[i].set_exception(e)
707+
self._received[i].cancel()
708708
return
709709

710710
async def send(self, payload: dict) -> str:
@@ -2270,6 +2270,7 @@ async def _make_rpc_request(
22702270
request_manager = RequestManager(payloads)
22712271

22722272
subscription_added = False
2273+
should_retry = False
22732274

22742275
async with self.ws as ws:
22752276
for payload in payloads:
@@ -2282,37 +2283,43 @@ async def _make_rpc_request(
22822283
item_id not in request_manager.responses
22832284
or asyncio.iscoroutinefunction(result_handler)
22842285
):
2285-
if response := await ws.retrieve(item_id):
2286-
if (
2287-
asyncio.iscoroutinefunction(result_handler)
2288-
and not subscription_added
2289-
):
2290-
# handles subscriptions, overwrites the previous mapping of {item_id : payload_id}
2291-
# with {subscription_id : payload_id}
2292-
try:
2293-
item_id = request_manager.overwrite_request(
2294-
item_id, response["result"]
2295-
)
2296-
subscription_added = True
2297-
except KeyError:
2298-
raise SubstrateRequestException(str(response))
2299-
decoded_response, complete = await self._process_response(
2300-
response,
2301-
item_id,
2302-
value_scale_type,
2303-
storage_item,
2304-
result_handler,
2305-
runtime=runtime,
2306-
force_legacy_decode=force_legacy_decode,
2307-
)
2286+
try:
2287+
if response := await ws.retrieve(item_id):
2288+
if (
2289+
asyncio.iscoroutinefunction(result_handler)
2290+
and not subscription_added
2291+
):
2292+
# handles subscriptions, overwrites the previous mapping of {item_id : payload_id}
2293+
# with {subscription_id : payload_id}
2294+
try:
2295+
item_id = request_manager.overwrite_request(
2296+
item_id, response["result"]
2297+
)
2298+
subscription_added = True
2299+
except KeyError:
2300+
raise SubstrateRequestException(str(response))
2301+
(
2302+
decoded_response,
2303+
complete,
2304+
) = await self._process_response(
2305+
response,
2306+
item_id,
2307+
value_scale_type,
2308+
storage_item,
2309+
result_handler,
2310+
runtime=runtime,
2311+
force_legacy_decode=force_legacy_decode,
2312+
)
23082313

2309-
request_manager.add_response(
2310-
item_id, decoded_response, complete
2311-
)
2314+
request_manager.add_response(
2315+
item_id, decoded_response, complete
2316+
)
2317+
except ConnectionClosed:
2318+
should_retry = True
23122319

23132320
if request_manager.is_complete:
23142321
break
2315-
if (
2322+
if should_retry or (
23162323
(current_time := await ws.loop_time()) - ws.last_received
23172324
>= self.retry_timeout
23182325
and current_time - ws.last_sent >= self.retry_timeout

0 commit comments

Comments
 (0)