diff --git a/CHANGELOG.md b/CHANGELOG.md index 41791c7..179f423 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -1,5 +1,11 @@ # Changelog +## 1.5.1 /2025-08-05 +* query multiple/decoding fix by @thewhaleking in https://github.com/opentensor/async-substrate-interface/pull/168 +* Fix reconnection logic by @thewhaleking in https://github.com/opentensor/async-substrate-interface/pull/169 + +**Full Changelog**: https://github.com/opentensor/async-substrate-interface/compare/v1.5.0...v1.5.1 + ## 1.5.0 /2025-08-04 * ConcurrencyError fix by @thewhaleking in https://github.com/opentensor/async-substrate-interface/pull/162 * Added better typing by @thewhaleking in https://github.com/opentensor/async-substrate-interface/pull/163 diff --git a/async_substrate_interface/async_substrate.py b/async_substrate_interface/async_substrate.py index c827377..7ffb78b 100644 --- a/async_substrate_interface/async_substrate.py +++ b/async_substrate_interface/async_substrate.py @@ -526,6 +526,7 @@ def __init__( options: Optional[dict] = None, _log_raw_websockets: bool = False, retry_timeout: float = 60.0, + max_retries: int = 5, ): """ Websocket manager object. Allows for the use of a single websocket connection by multiple @@ -536,6 +537,10 @@ def __init__( max_subscriptions: Maximum number of subscriptions per websocket connection max_connections: Maximum number of connections total shutdown_timer: Number of seconds to shut down websocket connection after last use + options: Options to pass to the websocket connection + _log_raw_websockets: Whether to log raw websockets in the "raw_websocket" logger + retry_timeout: Timeout in seconds to retry websocket connection + max_retries: Maximum number of retries following a timeout """ # TODO allow setting max concurrent connections and rpc subscriptions per connection self.ws_url = ws_url @@ -555,6 +560,7 @@ def __init__( self._options = options if options else {} self._log_raw_websockets = _log_raw_websockets self._in_use_ids = set() + self._max_retries = max_retries @property def state(self): @@ -575,7 +581,6 @@ async def loop_time() -> float: async def _cancel(self): try: self._send_recv_task.cancel() - await self._send_recv_task await self.ws.close() except ( AttributeError, @@ -616,19 +621,30 @@ async def _handler(self, ws: ClientConnection) -> None: ) loop = asyncio.get_running_loop() should_reconnect = False + is_retry = False for task in pending: task.cancel() for task in done: - if isinstance(task.result(), (asyncio.TimeoutError, ConnectionClosed)): + task_res = task.result() + if isinstance( + task_res, (asyncio.TimeoutError, ConnectionClosed, TimeoutError) + ): should_reconnect = True + if isinstance(task_res, (asyncio.TimeoutError, TimeoutError)): + self._attempts += 1 + is_retry = True if should_reconnect is True: for original_id, payload in list(self._inflight.items()): self._received[original_id] = loop.create_future() to_send = json.loads(payload) await self._sending.put(to_send) - logger.info("Timeout occurred. Reconnecting.") + if is_retry: + # Otherwise the connection was just closed due to no activity, which should not count against retries + logger.info( + f"Timeout occurred. Reconnecting. Attempt {self._attempts} of {self._max_retries}" + ) await self.connect(True) - await self._handler(ws=ws) + await self._handler(ws=self.ws) elif isinstance(e := recv_task.result(), Exception): return e elif isinstance(e := send_task.result(), Exception): @@ -689,15 +705,22 @@ async def _start_receiving(self, ws: ClientConnection) -> Exception: recd = await asyncio.wait_for( ws.recv(decode=False), timeout=self.retry_timeout ) + # reset the counter once we successfully receive something back + self._attempts = 0 await self._recv(recd) except Exception as e: - logger.exception("Start receiving exception", exc_info=e) if isinstance(e, ssl.SSLError): e = ConnectionClosed - for fut in self._received.values(): - if not fut.done(): - fut.set_exception(e) - fut.cancel() + if not isinstance( + e, (asyncio.TimeoutError, TimeoutError, ConnectionClosed) + ): + logger.exception("Websocket receiving exception", exc_info=e) + for fut in self._received.values(): + if not fut.done(): + fut.set_exception(e) + fut.cancel() + else: + logger.debug("Timeout occurred. Reconnecting.") return e async def _start_sending(self, ws) -> Exception: @@ -713,14 +736,21 @@ async def _start_sending(self, ws) -> Exception: raw_websocket_logger.debug(f"WEBSOCKET_SEND> {to_send}") await ws.send(to_send) except Exception as e: - logger.exception("Start sending exception", exc_info=e) - if to_send is not None: - self._received[to_send["id"]].set_exception(e) - self._received[to_send["id"]].cancel() + if isinstance(e, ssl.SSLError): + e = ConnectionClosed + if not isinstance( + e, (asyncio.TimeoutError, TimeoutError, ConnectionClosed) + ): + logger.exception("Websocket sending exception", exc_info=e) + if to_send is not None: + self._received[to_send["id"]].set_exception(e) + self._received[to_send["id"]].cancel() + else: + for i in self._received.keys(): + self._received[i].set_exception(e) + self._received[i].cancel() else: - for i in self._received.keys(): - self._received[i].set_exception(e) - self._received[i].cancel() + logger.debug("Timeout occurred. Reconnecting.") return e async def send(self, payload: dict) -> str: @@ -784,9 +814,9 @@ async def retrieve(self, item_id: str) -> Optional[dict]: if item is not None: if item.done(): self.max_subscriptions.release() + res = item.result() del self._received[item_id] - - return item.result() + return res else: try: return self._received_subscriptions[item_id].get_nowait() @@ -860,6 +890,7 @@ def __init__( }, shutdown_timer=ws_shutdown_timer, retry_timeout=self.retry_timeout, + max_retries=max_retries, ) else: self.ws = AsyncMock(spec=Websocket) @@ -1165,7 +1196,7 @@ async def get_runtime_for_version( async def _get_runtime_for_version( self, runtime_version: int, block_hash: Optional[str] = None ) -> Runtime: - runtime_config = RuntimeConfigurationObject() + runtime_config = RuntimeConfigurationObject(ss58_format=self.ss58_format) runtime_config.clear_type_registry() runtime_config.update_type_registry(load_type_registry_preset(name="core")) @@ -2337,7 +2368,7 @@ async def _make_rpc_request( request_manager.add_request(item_id, payload["id"]) while True: - for item_id in list(request_manager.response_map.keys()): + for item_id in request_manager.unresponded(): if ( item_id not in request_manager.responses or asyncio.iscoroutinefunction(result_handler) @@ -2368,7 +2399,6 @@ async def _make_rpc_request( runtime=runtime, force_legacy_decode=force_legacy_decode, ) - request_manager.add_response( item_id, decoded_response, complete ) diff --git a/async_substrate_interface/sync_substrate.py b/async_substrate_interface/sync_substrate.py index 44fd158..692db3b 100644 --- a/async_substrate_interface/sync_substrate.py +++ b/async_substrate_interface/sync_substrate.py @@ -1924,7 +1924,7 @@ def _make_rpc_request( _received[response["params"]["subscription"]] = response else: raise SubstrateRequestException(response) - for item_id in list(request_manager.response_map.keys()): + for item_id in request_manager.unresponded(): if item_id not in request_manager.responses or isinstance( result_handler, Callable ): diff --git a/async_substrate_interface/types.py b/async_substrate_interface/types.py index f1efbc3..ecacca8 100644 --- a/async_substrate_interface/types.py +++ b/async_substrate_interface/types.py @@ -418,6 +418,15 @@ def get_results(self) -> RequestResults: request_id: info["results"] for request_id, info in self.responses.items() } + def unresponded(self): + """ + Yields items from response_map whose corresponding response is missing or incomplete. + """ + for item_id, request_id in list(self.response_map.items()): + response_info = self.responses.get(request_id) + if response_info is None or not response_info["complete"]: + yield item_id + @dataclass class Preprocessed: diff --git a/pyproject.toml b/pyproject.toml index bdddd5c..0e9acb0 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -1,6 +1,6 @@ [project] name = "async-substrate-interface" -version = "1.5.0" +version = "1.5.1" description = "Asyncio library for interacting with substrate. Mostly API-compatible with py-substrate-interface" readme = "README.md" license = { file = "LICENSE" } diff --git a/tests/integration_tests/test_async_substrate_interface.py b/tests/integration_tests/test_async_substrate_interface.py index 2c50213..8f3c3f6 100644 --- a/tests/integration_tests/test_async_substrate_interface.py +++ b/tests/integration_tests/test_async_substrate_interface.py @@ -1,3 +1,4 @@ +import asyncio import time import pytest @@ -126,9 +127,37 @@ async def test_get_events_proper_decoding(): async with AsyncSubstrateInterface(ARCHIVE_ENTRYPOINT) as substrate: all_events = await substrate.get_events(block_hash=block_hash) event = all_events[1] - print(type(event["attributes"])) assert event["attributes"] == ( "5G1NjW9YhXLadMWajvTkfcJy6up3yH2q1YzMXDTi6ijanChe", 30, "0xa6b4e5c8241d60ece0c25056b19f7d21ae845269fc771ad46bf3e011865129a5", ) + + +@pytest.mark.asyncio +async def test_query_multiple(): + block = 6153277 + cks = [ + "5FH9AQM4kqbkdC9jyV5FrdEWVYt41nkhFstop7Vhyfb9ZsXt", + "5GQxLKxjZWNZDsghmYcw7P6ahC7XJCjx1WD94WGh92quSycx", + "5EcaPiDT1cv951SkCFsvdHDs2yAEUWhJDuRP9mHb343WnaVn", + ] + async with AsyncSubstrateInterface(ARCHIVE_ENTRYPOINT) as substrate: + block_hash = await substrate.get_block_hash(block_id=block) + assert await substrate.query_multiple( + params=cks, + module="SubtensorModule", + storage_function="OwnedHotkeys", + block_hash=block_hash, + ) + + +@pytest.mark.asyncio +async def test_reconnection(): + async with AsyncSubstrateInterface( + ARCHIVE_ENTRYPOINT, ss58_format=42, retry_timeout=8.0 + ) as substrate: + await asyncio.sleep(9) # sleep for longer than the retry timeout + bh = await substrate.get_chain_finalised_head() + assert isinstance(bh, str) + assert isinstance(await substrate.get_block_number(bh), int) diff --git a/tests/integration_tests/test_substrate_interface.py b/tests/integration_tests/test_substrate_interface.py index be4eb29..27e134a 100644 --- a/tests/integration_tests/test_substrate_interface.py +++ b/tests/integration_tests/test_substrate_interface.py @@ -78,9 +78,25 @@ def test_get_events_proper_decoding(): with SubstrateInterface(ARCHIVE_ENTRYPOINT) as substrate: all_events = substrate.get_events(block_hash=block_hash) event = all_events[1] - print(type(event["attributes"])) assert event["attributes"] == ( "5G1NjW9YhXLadMWajvTkfcJy6up3yH2q1YzMXDTi6ijanChe", 30, "0xa6b4e5c8241d60ece0c25056b19f7d21ae845269fc771ad46bf3e011865129a5", ) + + +def test_query_multiple(): + block = 6153277 + cks = [ + "5FH9AQM4kqbkdC9jyV5FrdEWVYt41nkhFstop7Vhyfb9ZsXt", + "5GQxLKxjZWNZDsghmYcw7P6ahC7XJCjx1WD94WGh92quSycx", + "5EcaPiDT1cv951SkCFsvdHDs2yAEUWhJDuRP9mHb343WnaVn", + ] + with SubstrateInterface(ARCHIVE_ENTRYPOINT) as substrate: + block_hash = substrate.get_block_hash(block_id=block) + assert substrate.query_multiple( + params=cks, + module="SubtensorModule", + storage_function="OwnedHotkeys", + block_hash=block_hash, + ) diff --git a/tests/unit_tests/asyncio_/test_substrate_interface.py b/tests/unit_tests/asyncio_/test_substrate_interface.py index 817cdf3..d4d692f 100644 --- a/tests/unit_tests/asyncio_/test_substrate_interface.py +++ b/tests/unit_tests/asyncio_/test_substrate_interface.py @@ -3,6 +3,7 @@ import pytest from websockets.exceptions import InvalidURI +from websockets.protocol import State from async_substrate_interface.async_substrate import AsyncSubstrateInterface from async_substrate_interface.types import ScaleObj @@ -103,9 +104,9 @@ async def test_websocket_shutdown_timer(): async with AsyncSubstrateInterface("wss://lite.sub.latent.to:443") as substrate: await substrate.get_chain_head() await asyncio.sleep(6) - assert ( - substrate.ws._initialized is False - ) # connection should have closed automatically + assert ( + substrate.ws.state is State.CLOSED + ) # connection should have closed automatically # using custom ws shutdown timer of 10.0 seconds async with AsyncSubstrateInterface( @@ -113,7 +114,7 @@ async def test_websocket_shutdown_timer(): ) as substrate: await substrate.get_chain_head() await asyncio.sleep(6) # same sleep time as before - assert substrate.ws._initialized is True # connection should still be open + assert substrate.ws.state is State.OPEN # connection should still be open @pytest.mark.asyncio