diff --git a/CHANGELOG.md b/CHANGELOG.md index 6e0ca60..138c8a6 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -1,5 +1,12 @@ # Changelog +## 1.3.1 /2025-06-11 +* Fixes default vals for archive_nodes by @thewhaleking in https://github.com/opentensor/async-substrate-interface/pull/134 +* Adds ability to log raw websockets for debugging. by @thewhaleking in https://github.com/opentensor/async-substrate-interface/pull/133 + + +**Full Changelog**: https://github.com/opentensor/async-substrate-interface/compare/v1.3.0...v1.3.1 + ## 1.3.0 /2025-06-10 * Add GH test runner by @thewhaleking in https://github.com/opentensor/async-substrate-interface/pull/129 diff --git a/async_substrate_interface/async_substrate.py b/async_substrate_interface/async_substrate.py index 1f2d659..598a882 100644 --- a/async_substrate_interface/async_substrate.py +++ b/async_substrate_interface/async_substrate.py @@ -32,7 +32,7 @@ MultiAccountId, ) from websockets.asyncio.client import connect -from websockets.exceptions import ConnectionClosed +from websockets.exceptions import ConnectionClosed, WebSocketException from async_substrate_interface.const import SS58_FORMAT from async_substrate_interface.errors import ( @@ -75,6 +75,7 @@ ResultHandler = Callable[[dict, Any], Awaitable[tuple[dict, bool]]] logger = logging.getLogger("async_substrate_interface") +raw_websocket_logger = logging.getLogger("raw_websocket") class AsyncExtrinsicReceipt: @@ -505,6 +506,7 @@ def __init__( max_connections=100, shutdown_timer=5, options: Optional[dict] = None, + _log_raw_websockets: bool = False, ): """ Websocket manager object. Allows for the use of a single websocket connection by multiple @@ -532,6 +534,10 @@ def __init__( self._exit_task = None self._open_subscriptions = 0 self._options = options if options else {} + self._log_raw_websockets = _log_raw_websockets + self._is_connecting = False + self._is_closing = False + try: now = asyncio.get_running_loop().time() except RuntimeError: @@ -556,38 +562,63 @@ async def __aenter__(self): async def loop_time() -> float: return asyncio.get_running_loop().time() + async def _cancel(self): + try: + self._receiving_task.cancel() + await self._receiving_task + await self.ws.close() + except ( + AttributeError, + asyncio.CancelledError, + WebSocketException, + ): + pass + except Exception as e: + logger.warning( + f"{e} encountered while trying to close websocket connection." + ) + async def connect(self, force=False): - now = await self.loop_time() - self.last_received = now - self.last_sent = now - if self._exit_task: - self._exit_task.cancel() - async with self._lock: - if not self._initialized or force: - try: - self._receiving_task.cancel() - await self._receiving_task - await self.ws.close() - except (AttributeError, asyncio.CancelledError): - pass - self.ws = await asyncio.wait_for( - connect(self.ws_url, **self._options), timeout=10 - ) - self._receiving_task = asyncio.create_task(self._start_receiving()) - self._initialized = True + self._is_connecting = True + try: + now = await self.loop_time() + self.last_received = now + self.last_sent = now + if self._exit_task: + self._exit_task.cancel() + if not self._is_closing: + if not self._initialized or force: + try: + await asyncio.wait_for(self._cancel(), timeout=10.0) + except asyncio.TimeoutError: + pass + + self.ws = await asyncio.wait_for( + connect(self.ws_url, **self._options), timeout=10.0 + ) + self._receiving_task = asyncio.get_running_loop().create_task( + self._start_receiving() + ) + self._initialized = True + finally: + self._is_connecting = False async def __aexit__(self, exc_type, exc_val, exc_tb): - async with self._lock: # TODO is this actually what I want to happen? - self._in_use -= 1 - if self._exit_task is not None: - self._exit_task.cancel() - try: - await self._exit_task - except asyncio.CancelledError: - pass - if self._in_use == 0 and self.ws is not None: - self._open_subscriptions = 0 - self._exit_task = asyncio.create_task(self._exit_with_timer()) + self._is_closing = True + try: + if not self._is_connecting: + self._in_use -= 1 + if self._exit_task is not None: + self._exit_task.cancel() + try: + await self._exit_task + except asyncio.CancelledError: + pass + if self._in_use == 0 and self.ws is not None: + self._open_subscriptions = 0 + self._exit_task = asyncio.create_task(self._exit_with_timer()) + finally: + self._is_closing = False async def _exit_with_timer(self): """ @@ -601,26 +632,24 @@ async def _exit_with_timer(self): pass async def shutdown(self): - async with self._lock: - try: - self._receiving_task.cancel() - await self._receiving_task - await self.ws.close() - except (AttributeError, asyncio.CancelledError): - pass - self.ws = None - self._initialized = False - self._receiving_task = None + self._is_closing = True + try: + await asyncio.wait_for(self._cancel(), timeout=10.0) + except asyncio.TimeoutError: + pass + self.ws = None + self._initialized = False + self._receiving_task = None + self._is_closing = False async def _recv(self) -> None: try: # TODO consider wrapping this in asyncio.wait_for and use that for the timeout logic - response = json.loads(await self.ws.recv(decode=False)) + recd = await self.ws.recv(decode=False) + if self._log_raw_websockets: + raw_websocket_logger.debug(f"WEBSOCKET_RECEIVE> {recd.decode()}") + response = json.loads(recd) self.last_received = await self.loop_time() - async with self._lock: - # note that these 'subscriptions' are all waiting sent messages which have not received - # responses, and thus are not the same as RPC 'subscriptions', which are unique - self._open_subscriptions -= 1 if "id" in response: self._received[response["id"]] = response self._in_use_ids.remove(response["id"]) @@ -640,8 +669,7 @@ async def _start_receiving(self): except asyncio.CancelledError: pass except ConnectionClosed: - async with self._lock: - await self.connect(force=True) + await self.connect(force=True) async def send(self, payload: dict) -> int: """ @@ -660,12 +688,14 @@ async def send(self, payload: dict) -> int: # self._open_subscriptions += 1 await self.max_subscriptions.acquire() try: - await self.ws.send(json.dumps({**payload, **{"id": original_id}})) + to_send = {**payload, **{"id": original_id}} + if self._log_raw_websockets: + raw_websocket_logger.debug(f"WEBSOCKET_SEND> {to_send}") + await self.ws.send(json.dumps(to_send)) self.last_sent = await self.loop_time() return original_id except (ConnectionClosed, ssl.SSLError, EOFError): - async with self._lock: - await self.connect(force=True) + await self.connect(force=True) async def retrieve(self, item_id: int) -> Optional[dict]: """ @@ -699,6 +729,8 @@ def __init__( max_retries: int = 5, retry_timeout: float = 60.0, _mock: bool = False, + _log_raw_websockets: bool = False, + ws_shutdown_timer: float = 5.0, ): """ The asyncio-compatible version of the subtensor interface commands we use in bittensor. It is important to @@ -716,6 +748,8 @@ def __init__( max_retries: number of times to retry RPC requests before giving up retry_timeout: how to long wait since the last ping to retry the RPC request _mock: whether to use mock version of the subtensor interface + _log_raw_websockets: whether to log raw websocket requests during RPC requests + ws_shutdown_timer: how long after the last connection your websocket should close """ self.max_retries = max_retries @@ -723,13 +757,16 @@ def __init__( self.chain_endpoint = url self.url = url self._chain = chain_name + self._log_raw_websockets = _log_raw_websockets if not _mock: self.ws = Websocket( url, + _log_raw_websockets=_log_raw_websockets, options={ "max_size": self.ws_max_size, "write_limit": 2**16, }, + shutdown_timer=ws_shutdown_timer, ) else: self.ws = AsyncMock(spec=Websocket) diff --git a/async_substrate_interface/substrate_addons.py b/async_substrate_interface/substrate_addons.py index c9ca1e8..4f2412f 100644 --- a/async_substrate_interface/substrate_addons.py +++ b/async_substrate_interface/substrate_addons.py @@ -117,9 +117,11 @@ def __init__( max_retries: int = 5, retry_timeout: float = 60.0, _mock: bool = False, + _log_raw_websockets: bool = False, archive_nodes: Optional[list[str]] = None, ): fallback_chains = fallback_chains or [] + archive_nodes = archive_nodes or [] self.fallback_chains = ( iter(fallback_chains) if not retry_forever @@ -150,6 +152,7 @@ def __init__( _mock=_mock, retry_timeout=retry_timeout, max_retries=max_retries, + _log_raw_websockets=_log_raw_websockets, ) initialized = True logger.info(f"Connected to {chain_url}") @@ -259,9 +262,12 @@ def __init__( max_retries: int = 5, retry_timeout: float = 60.0, _mock: bool = False, + _log_raw_websockets: bool = False, archive_nodes: Optional[list[str]] = None, + ws_shutdown_timer: float = 5.0, ): fallback_chains = fallback_chains or [] + archive_nodes = archive_nodes or [] self.fallback_chains = ( iter(fallback_chains) if not retry_forever @@ -285,6 +291,8 @@ def __init__( _mock=_mock, retry_timeout=retry_timeout, max_retries=max_retries, + _log_raw_websockets=_log_raw_websockets, + ws_shutdown_timer=ws_shutdown_timer, ) self._original_methods = { method: getattr(self, method) for method in RETRY_METHODS @@ -349,7 +357,7 @@ async def _retry(self, method, *args, **kwargs): try: await self._reinstantiate_substrate(e, use_archive=use_archive) return await method_(*args, **kwargs) - except StopAsyncIteration: + except StopIteration: logger.error( f"Max retries exceeded with {self.url}. No more fallback chains." ) diff --git a/async_substrate_interface/sync_substrate.py b/async_substrate_interface/sync_substrate.py index 2697f10..b5148a8 100644 --- a/async_substrate_interface/sync_substrate.py +++ b/async_substrate_interface/sync_substrate.py @@ -53,6 +53,7 @@ ResultHandler = Callable[[dict, Any], tuple[dict, bool]] logger = logging.getLogger("async_substrate_interface") +raw_websocket_logger = logging.getLogger("raw_websocket") class ExtrinsicReceipt: @@ -485,6 +486,7 @@ def __init__( max_retries: int = 5, retry_timeout: float = 60.0, _mock: bool = False, + _log_raw_websockets: bool = False, ): """ The sync compatible version of the subtensor interface commands we use in bittensor. Use this instance only @@ -501,6 +503,7 @@ def __init__( max_retries: number of times to retry RPC requests before giving up retry_timeout: how to long wait since the last ping to retry the RPC request _mock: whether to use mock version of the subtensor interface + _log_raw_websockets: whether to log raw websocket requests during RPC requests """ self.max_retries = max_retries @@ -527,6 +530,7 @@ def __init__( self.registry_type_map = {} self.type_id_to_name = {} self._mock = _mock + self.log_raw_websockets = _log_raw_websockets if not _mock: self.ws = self.connect(init=True) self.initialize() @@ -1831,12 +1835,18 @@ def _make_rpc_request( ws = self.connect(init=False if attempt == 1 else True) for payload in payloads: item_id = get_next_id() - ws.send(json.dumps({**payload["payload"], **{"id": item_id}})) + to_send = {**payload["payload"], **{"id": item_id}} + if self.log_raw_websockets: + raw_websocket_logger.debug(f"WEBSOCKET_SEND> {to_send}") + ws.send(json.dumps(to_send)) request_manager.add_request(item_id, payload["id"]) while True: try: - response = json.loads(ws.recv(timeout=self.retry_timeout, decode=False)) + recd = ws.recv(timeout=self.retry_timeout, decode=False) + if self.log_raw_websockets: + raw_websocket_logger.debug(f"WEBSOCKET_RECEIVE> {recd.decode()}") + response = json.loads(recd) except (TimeoutError, ConnectionClosed): if attempt >= self.max_retries: logger.warning( diff --git a/pyproject.toml b/pyproject.toml index c62fb3c..389ea1a 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -1,6 +1,6 @@ [project] name = "async-substrate-interface" -version = "1.3.0" +version = "1.3.1" description = "Asyncio library for interacting with substrate. Mostly API-compatible with py-substrate-interface" readme = "README.md" license = { file = "LICENSE" } diff --git a/tests/unit_tests/asyncio_/test_substrate_interface.py b/tests/unit_tests/asyncio_/test_substrate_interface.py index ea76595..a64d570 100644 --- a/tests/unit_tests/asyncio_/test_substrate_interface.py +++ b/tests/unit_tests/asyncio_/test_substrate_interface.py @@ -1,3 +1,4 @@ +import asyncio from unittest.mock import AsyncMock, MagicMock import pytest @@ -91,3 +92,22 @@ async def test_runtime_call(monkeypatch): substrate.rpc_request.assert_any_call( "state_call", ["SubstrateApi_SubstrateMethod", "", None] ) + + +@pytest.mark.asyncio +async def test_websocket_shutdown_timer(): + # using default ws shutdown timer of 5.0 seconds + async with AsyncSubstrateInterface("wss://lite.sub.latent.to:443") as substrate: + await substrate.get_chain_head() + await asyncio.sleep(6) + assert ( + substrate.ws._initialized is False + ) # connection should have closed automatically + + # using custom ws shutdown timer of 10.0 seconds + async with AsyncSubstrateInterface( + "wss://lite.sub.latent.to:443", ws_shutdown_timer=10.0 + ) as substrate: + await substrate.get_chain_head() + await asyncio.sleep(6) # same sleep time as before + assert substrate.ws._initialized is True # connection should still be open