Skip to content

Commit 07663a3

Browse files
committed
WIP check-in
1 parent 8f54649 commit 07663a3

File tree

1 file changed

+43
-66
lines changed

1 file changed

+43
-66
lines changed

async_substrate_interface/async_substrate.py

Lines changed: 43 additions & 66 deletions
Original file line numberDiff line numberDiff line change
@@ -604,7 +604,7 @@ async def _cancel(self):
604604
)
605605

606606
async def connect(self, force=False):
607-
# TODO after connecting, move from _inflight to the queue
607+
logger.debug("Connecting.")
608608
now = await self.loop_time()
609609
self.last_received = now
610610
self.last_sent = now
@@ -620,25 +620,30 @@ async def connect(self, force=False):
620620
self.ws = await asyncio.wait_for(
621621
connect(self.ws_url, **self._options), timeout=10.0
622622
)
623+
logger.debug("Connected.")
623624
if self._send_recv_task is None or self._send_recv_task.done():
624625
self._send_recv_task = asyncio.get_running_loop().create_task(
625626
self._handler(self.ws)
626627
)
628+
logger.debug("Recd task started.")
627629
self._initialized = True
628630

629631
async def _handler(self, ws: ClientConnection) -> None:
630632
recv_task = asyncio.create_task(self._start_receiving(ws))
631633
send_task = asyncio.create_task(self._start_sending(ws))
634+
logger.debug("Starting send/recv tasks.")
632635
done, pending = await asyncio.wait(
633636
[recv_task, send_task],
634637
return_when=asyncio.FIRST_COMPLETED,
635638
)
639+
logger.debug("send/recv tasks done.")
636640
loop = asyncio.get_running_loop()
637641
should_reconnect = False
638642
for task in pending:
639643
task.cancel()
640-
if isinstance(task.exception(), asyncio.TimeoutError):
641-
should_reconnect = True
644+
if isinstance(recv_task.exception(), asyncio.TimeoutError):
645+
# TODO check the logic here
646+
should_reconnect = True
642647
if should_reconnect is True:
643648
for original_id, payload in list(self._inflight.items()):
644649
self._received[original_id] = loop.create_future()
@@ -648,7 +653,6 @@ async def _handler(self, ws: ClientConnection) -> None:
648653
await self.connect(True)
649654
await self._handler(ws=ws)
650655

651-
652656
async def __aexit__(self, exc_type, exc_val, exc_tb):
653657
if not self.state != State.CONNECTING:
654658
if self._exit_task is not None:
@@ -705,6 +709,8 @@ async def _start_receiving(self, ws: ClientConnection) -> Exception:
705709
if self._inflight:
706710
recd = await asyncio.wait_for(ws.recv(decode=False), timeout=self.retry_timeout)
707711
await self._recv(recd)
712+
else:
713+
await asyncio.sleep(0.1)
708714
except Exception as e:
709715
if isinstance(e, ssl.SSLError):
710716
e = ConnectionClosed
@@ -719,6 +725,7 @@ async def _start_sending(self, ws) -> Exception:
719725
try:
720726
while True:
721727
to_send_ = await self._sending.get()
728+
logger.debug(f"Pulled {to_send_} from the queue")
722729
send_id = to_send_["id"]
723730
to_send = json.dumps(to_send_)
724731
async with self._lock:
@@ -751,6 +758,7 @@ async def send(self, payload: dict) -> str:
751758
id: the internal ID of the request (incremented int)
752759
"""
753760
await self.max_subscriptions.acquire()
761+
logger.debug(f"Sending payload: {payload}")
754762
async with self._lock:
755763
original_id = get_next_id()
756764
while original_id in self._in_use_ids:
@@ -759,6 +767,7 @@ async def send(self, payload: dict) -> str:
759767
self._received[original_id] = asyncio.get_running_loop().create_future()
760768
to_send = {**payload, **{"id": original_id}}
761769
await self._sending.put(to_send)
770+
logger.debug("767 queue put")
762771
return original_id
763772

764773
async def retrieve(self, item_id: str) -> Optional[dict]:
@@ -2320,7 +2329,6 @@ async def _make_rpc_request(
23202329
# TODO use that to determine when it's completed. But how would this work with subscriptions?
23212330

23222331
subscription_added = False
2323-
should_retry = False
23242332

23252333
async with self.ws as ws:
23262334
for payload in payloads:
@@ -2333,71 +2341,40 @@ async def _make_rpc_request(
23332341
item_id not in request_manager.responses
23342342
or asyncio.iscoroutinefunction(result_handler)
23352343
):
2336-
try:
2337-
if response := await ws.retrieve(item_id):
2338-
if (
2339-
asyncio.iscoroutinefunction(result_handler)
2340-
and not subscription_added
2341-
):
2342-
# handles subscriptions, overwrites the previous mapping of {item_id : payload_id}
2343-
# with {subscription_id : payload_id}
2344-
try:
2345-
item_id = request_manager.overwrite_request(
2346-
item_id, response["result"]
2347-
)
2348-
await ws.add_subscription(response["result"])
2349-
subscription_added = True
2350-
except KeyError:
2351-
raise SubstrateRequestException(str(response))
2352-
(
2353-
decoded_response,
2354-
complete,
2355-
) = await self._process_response(
2356-
response,
2357-
item_id,
2358-
value_scale_type,
2359-
storage_item,
2360-
result_handler,
2361-
runtime=runtime,
2362-
force_legacy_decode=force_legacy_decode,
2363-
)
2344+
if response := await ws.retrieve(item_id):
2345+
if (
2346+
asyncio.iscoroutinefunction(result_handler)
2347+
and not subscription_added
2348+
):
2349+
# handles subscriptions, overwrites the previous mapping of {item_id : payload_id}
2350+
# with {subscription_id : payload_id}
2351+
try:
2352+
item_id = request_manager.overwrite_request(
2353+
item_id, response["result"]
2354+
)
2355+
await ws.add_subscription(response["result"])
2356+
subscription_added = True
2357+
except KeyError:
2358+
raise SubstrateRequestException(str(response))
2359+
(
2360+
decoded_response,
2361+
complete,
2362+
) = await self._process_response(
2363+
response,
2364+
item_id,
2365+
value_scale_type,
2366+
storage_item,
2367+
result_handler,
2368+
runtime=runtime,
2369+
force_legacy_decode=force_legacy_decode,
2370+
)
23642371

2365-
request_manager.add_response(
2366-
item_id, decoded_response, complete
2367-
)
2368-
except ConnectionClosed:
2369-
should_retry = True
2372+
request_manager.add_response(
2373+
item_id, decoded_response, complete
2374+
)
23702375

23712376
if request_manager.is_complete:
23722377
break
2373-
# TODO I sometimes get timeouts immediately. Why?
2374-
if should_retry or (
2375-
(current_time := await ws.loop_time()) - ws.last_received
2376-
>= self.retry_timeout
2377-
and current_time - ws.last_sent >= self.retry_timeout
2378-
):
2379-
# TODO this retry logic should really live inside the Websocket
2380-
if attempt >= self.max_retries:
2381-
logger.error(
2382-
f"Timed out waiting for RPC requests {attempt} times. Exiting."
2383-
)
2384-
raise MaxRetriesExceeded("Max retries reached.")
2385-
else:
2386-
self.ws.last_received = await ws.loop_time()
2387-
await self.ws.connect(force=True)
2388-
logger.warning(
2389-
f"Timed out waiting for RPC requests. "
2390-
f"Retrying attempt {attempt + 1} of {self.max_retries}"
2391-
)
2392-
return await self._make_rpc_request(
2393-
payloads=payloads,
2394-
value_scale_type=value_scale_type,
2395-
storage_item=storage_item,
2396-
result_handler=result_handler,
2397-
attempt=attempt + 1,
2398-
runtime=runtime,
2399-
force_legacy_decode=force_legacy_decode,
2400-
)
24012378

24022379
return request_manager.get_results()
24032380

0 commit comments

Comments
 (0)