From 10d34437e098375ee0fdc0e3baaf60f2ebd427c8 Mon Sep 17 00:00:00 2001 From: Benjamin Himes Date: Wed, 11 Jun 2025 00:24:25 +0200 Subject: [PATCH 1/8] Adds ability to log raw websockets for debugging. --- async_substrate_interface/substrate_addons.py | 2 ++ async_substrate_interface/sync_substrate.py | 14 ++++++++++++-- 2 files changed, 14 insertions(+), 2 deletions(-) diff --git a/async_substrate_interface/substrate_addons.py b/async_substrate_interface/substrate_addons.py index c9ca1e8..7c54373 100644 --- a/async_substrate_interface/substrate_addons.py +++ b/async_substrate_interface/substrate_addons.py @@ -117,6 +117,7 @@ def __init__( max_retries: int = 5, retry_timeout: float = 60.0, _mock: bool = False, + _log_raw_websockets: bool = False, archive_nodes: Optional[list[str]] = None, ): fallback_chains = fallback_chains or [] @@ -150,6 +151,7 @@ def __init__( _mock=_mock, retry_timeout=retry_timeout, max_retries=max_retries, + _log_raw_websockets=_log_raw_websockets, ) initialized = True logger.info(f"Connected to {chain_url}") diff --git a/async_substrate_interface/sync_substrate.py b/async_substrate_interface/sync_substrate.py index 2697f10..b5148a8 100644 --- a/async_substrate_interface/sync_substrate.py +++ b/async_substrate_interface/sync_substrate.py @@ -53,6 +53,7 @@ ResultHandler = Callable[[dict, Any], tuple[dict, bool]] logger = logging.getLogger("async_substrate_interface") +raw_websocket_logger = logging.getLogger("raw_websocket") class ExtrinsicReceipt: @@ -485,6 +486,7 @@ def __init__( max_retries: int = 5, retry_timeout: float = 60.0, _mock: bool = False, + _log_raw_websockets: bool = False, ): """ The sync compatible version of the subtensor interface commands we use in bittensor. Use this instance only @@ -501,6 +503,7 @@ def __init__( max_retries: number of times to retry RPC requests before giving up retry_timeout: how to long wait since the last ping to retry the RPC request _mock: whether to use mock version of the subtensor interface + _log_raw_websockets: whether to log raw websocket requests during RPC requests """ self.max_retries = max_retries @@ -527,6 +530,7 @@ def __init__( self.registry_type_map = {} self.type_id_to_name = {} self._mock = _mock + self.log_raw_websockets = _log_raw_websockets if not _mock: self.ws = self.connect(init=True) self.initialize() @@ -1831,12 +1835,18 @@ def _make_rpc_request( ws = self.connect(init=False if attempt == 1 else True) for payload in payloads: item_id = get_next_id() - ws.send(json.dumps({**payload["payload"], **{"id": item_id}})) + to_send = {**payload["payload"], **{"id": item_id}} + if self.log_raw_websockets: + raw_websocket_logger.debug(f"WEBSOCKET_SEND> {to_send}") + ws.send(json.dumps(to_send)) request_manager.add_request(item_id, payload["id"]) while True: try: - response = json.loads(ws.recv(timeout=self.retry_timeout, decode=False)) + recd = ws.recv(timeout=self.retry_timeout, decode=False) + if self.log_raw_websockets: + raw_websocket_logger.debug(f"WEBSOCKET_RECEIVE> {recd.decode()}") + response = json.loads(recd) except (TimeoutError, ConnectionClosed): if attempt >= self.max_retries: logger.warning( From 607fd0a24ab3113926698c22eccf0b721207924f Mon Sep 17 00:00:00 2001 From: Benjamin Himes Date: Wed, 11 Jun 2025 16:02:03 +0200 Subject: [PATCH 2/8] Fixes default vals for archive_nodes --- async_substrate_interface/substrate_addons.py | 2 ++ 1 file changed, 2 insertions(+) diff --git a/async_substrate_interface/substrate_addons.py b/async_substrate_interface/substrate_addons.py index c9ca1e8..8d19fc7 100644 --- a/async_substrate_interface/substrate_addons.py +++ b/async_substrate_interface/substrate_addons.py @@ -120,6 +120,7 @@ def __init__( archive_nodes: Optional[list[str]] = None, ): fallback_chains = fallback_chains or [] + archive_nodes = archive_nodes or [] self.fallback_chains = ( iter(fallback_chains) if not retry_forever @@ -262,6 +263,7 @@ def __init__( archive_nodes: Optional[list[str]] = None, ): fallback_chains = fallback_chains or [] + archive_nodes = archive_nodes or [] self.fallback_chains = ( iter(fallback_chains) if not retry_forever From 62e00a13a2d0774c46dd7de4c0b2f456fa624bf0 Mon Sep 17 00:00:00 2001 From: Benjamin Himes Date: Wed, 11 Jun 2025 18:02:27 +0200 Subject: [PATCH 3/8] Applied raw websocket logger to async substrate interface as well. --- async_substrate_interface/async_substrate.py | 18 ++++++++++++++++-- async_substrate_interface/substrate_addons.py | 2 ++ 2 files changed, 18 insertions(+), 2 deletions(-) diff --git a/async_substrate_interface/async_substrate.py b/async_substrate_interface/async_substrate.py index 1f2d659..b839a5f 100644 --- a/async_substrate_interface/async_substrate.py +++ b/async_substrate_interface/async_substrate.py @@ -75,6 +75,7 @@ ResultHandler = Callable[[dict, Any], Awaitable[tuple[dict, bool]]] logger = logging.getLogger("async_substrate_interface") +raw_websocket_logger = logging.getLogger("raw_websocket") class AsyncExtrinsicReceipt: @@ -505,6 +506,7 @@ def __init__( max_connections=100, shutdown_timer=5, options: Optional[dict] = None, + _log_raw_websockets: bool = False, ): """ Websocket manager object. Allows for the use of a single websocket connection by multiple @@ -532,6 +534,8 @@ def __init__( self._exit_task = None self._open_subscriptions = 0 self._options = options if options else {} + self._log_raw_websockets = _log_raw_websockets + try: now = asyncio.get_running_loop().time() except RuntimeError: @@ -615,7 +619,10 @@ async def shutdown(self): async def _recv(self) -> None: try: # TODO consider wrapping this in asyncio.wait_for and use that for the timeout logic - response = json.loads(await self.ws.recv(decode=False)) + recd = await self.ws.recv(decode=False) + if self._log_raw_websockets: + raw_websocket_logger.debug(f"WEBSOCKET_RECEIVE> {recd.decode()}") + response = json.loads() self.last_received = await self.loop_time() async with self._lock: # note that these 'subscriptions' are all waiting sent messages which have not received @@ -660,7 +667,10 @@ async def send(self, payload: dict) -> int: # self._open_subscriptions += 1 await self.max_subscriptions.acquire() try: - await self.ws.send(json.dumps({**payload, **{"id": original_id}})) + to_send = {**payload, **{"id": original_id}} + if self._log_raw_websockets: + raw_websocket_logger.debug(f"WEBSOCKET_SEND> {to_send}") + await self.ws.send(json.dumps(to_send)) self.last_sent = await self.loop_time() return original_id except (ConnectionClosed, ssl.SSLError, EOFError): @@ -699,6 +709,7 @@ def __init__( max_retries: int = 5, retry_timeout: float = 60.0, _mock: bool = False, + _log_raw_websockets: bool = False, ): """ The asyncio-compatible version of the subtensor interface commands we use in bittensor. It is important to @@ -716,6 +727,7 @@ def __init__( max_retries: number of times to retry RPC requests before giving up retry_timeout: how to long wait since the last ping to retry the RPC request _mock: whether to use mock version of the subtensor interface + _log_raw_websockets: whether to log raw websocket requests during RPC requests """ self.max_retries = max_retries @@ -723,9 +735,11 @@ def __init__( self.chain_endpoint = url self.url = url self._chain = chain_name + self._log_raw_websockets = _log_raw_websockets if not _mock: self.ws = Websocket( url, + _log_raw_websockets=_log_raw_websockets, options={ "max_size": self.ws_max_size, "write_limit": 2**16, diff --git a/async_substrate_interface/substrate_addons.py b/async_substrate_interface/substrate_addons.py index 25c5ba2..7ec19b4 100644 --- a/async_substrate_interface/substrate_addons.py +++ b/async_substrate_interface/substrate_addons.py @@ -262,6 +262,7 @@ def __init__( max_retries: int = 5, retry_timeout: float = 60.0, _mock: bool = False, + _log_raw_websockets: bool = False, archive_nodes: Optional[list[str]] = None, ): fallback_chains = fallback_chains or [] @@ -289,6 +290,7 @@ def __init__( _mock=_mock, retry_timeout=retry_timeout, max_retries=max_retries, + _log_raw_websockets=_log_raw_websockets, ) self._original_methods = { method: getattr(self, method) for method in RETRY_METHODS From c5731894c6bd74f315b48655da4498c0ced85c53 Mon Sep 17 00:00:00 2001 From: Benjamin Himes Date: Wed, 11 Jun 2025 18:07:43 +0200 Subject: [PATCH 4/8] Typo --- async_substrate_interface/async_substrate.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/async_substrate_interface/async_substrate.py b/async_substrate_interface/async_substrate.py index b839a5f..609b046 100644 --- a/async_substrate_interface/async_substrate.py +++ b/async_substrate_interface/async_substrate.py @@ -622,7 +622,7 @@ async def _recv(self) -> None: recd = await self.ws.recv(decode=False) if self._log_raw_websockets: raw_websocket_logger.debug(f"WEBSOCKET_RECEIVE> {recd.decode()}") - response = json.loads() + response = json.loads(recd) self.last_received = await self.loop_time() async with self._lock: # note that these 'subscriptions' are all waiting sent messages which have not received From 26a2a7e78aac45ff08c7485507981c6b111df82c Mon Sep 17 00:00:00 2001 From: Benjamin Himes Date: Wed, 11 Jun 2025 19:10:29 +0200 Subject: [PATCH 5/8] Updates version and changelog --- CHANGELOG.md | 7 +++++++ pyproject.toml | 2 +- 2 files changed, 8 insertions(+), 1 deletion(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 6e0ca60..138c8a6 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -1,5 +1,12 @@ # Changelog +## 1.3.1 /2025-06-11 +* Fixes default vals for archive_nodes by @thewhaleking in https://github.com/opentensor/async-substrate-interface/pull/134 +* Adds ability to log raw websockets for debugging. by @thewhaleking in https://github.com/opentensor/async-substrate-interface/pull/133 + + +**Full Changelog**: https://github.com/opentensor/async-substrate-interface/compare/v1.3.0...v1.3.1 + ## 1.3.0 /2025-06-10 * Add GH test runner by @thewhaleking in https://github.com/opentensor/async-substrate-interface/pull/129 diff --git a/pyproject.toml b/pyproject.toml index c62fb3c..389ea1a 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -1,6 +1,6 @@ [project] name = "async-substrate-interface" -version = "1.3.0" +version = "1.3.1" description = "Asyncio library for interacting with substrate. Mostly API-compatible with py-substrate-interface" readme = "README.md" license = { file = "LICENSE" } From 8bd9214003c7d5bc057b58e536fc99fbe33b398a Mon Sep 17 00:00:00 2001 From: Benjamin Himes Date: Wed, 11 Jun 2025 19:35:37 +0200 Subject: [PATCH 6/8] StopAsyncIteration should be StopIteration, as it's iterating over the chain endpoints, not async iterating. --- async_substrate_interface/substrate_addons.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/async_substrate_interface/substrate_addons.py b/async_substrate_interface/substrate_addons.py index 7ec19b4..d076104 100644 --- a/async_substrate_interface/substrate_addons.py +++ b/async_substrate_interface/substrate_addons.py @@ -355,7 +355,7 @@ async def _retry(self, method, *args, **kwargs): try: await self._reinstantiate_substrate(e, use_archive=use_archive) return await method_(*args, **kwargs) - except StopAsyncIteration: + except StopIteration: logger.error( f"Max retries exceeded with {self.url}. No more fallback chains." ) From 3b85edc8150f009ad0699323d49bc470e33ab984 Mon Sep 17 00:00:00 2001 From: Benjamin Himes Date: Wed, 11 Jun 2025 21:51:58 +0200 Subject: [PATCH 7/8] Adds stability to async websockets. --- async_substrate_interface/async_substrate.py | 119 +++++++++++------- async_substrate_interface/substrate_addons.py | 2 + 2 files changed, 73 insertions(+), 48 deletions(-) diff --git a/async_substrate_interface/async_substrate.py b/async_substrate_interface/async_substrate.py index 609b046..598a882 100644 --- a/async_substrate_interface/async_substrate.py +++ b/async_substrate_interface/async_substrate.py @@ -32,7 +32,7 @@ MultiAccountId, ) from websockets.asyncio.client import connect -from websockets.exceptions import ConnectionClosed +from websockets.exceptions import ConnectionClosed, WebSocketException from async_substrate_interface.const import SS58_FORMAT from async_substrate_interface.errors import ( @@ -535,6 +535,8 @@ def __init__( self._open_subscriptions = 0 self._options = options if options else {} self._log_raw_websockets = _log_raw_websockets + self._is_connecting = False + self._is_closing = False try: now = asyncio.get_running_loop().time() @@ -560,38 +562,63 @@ async def __aenter__(self): async def loop_time() -> float: return asyncio.get_running_loop().time() + async def _cancel(self): + try: + self._receiving_task.cancel() + await self._receiving_task + await self.ws.close() + except ( + AttributeError, + asyncio.CancelledError, + WebSocketException, + ): + pass + except Exception as e: + logger.warning( + f"{e} encountered while trying to close websocket connection." + ) + async def connect(self, force=False): - now = await self.loop_time() - self.last_received = now - self.last_sent = now - if self._exit_task: - self._exit_task.cancel() - async with self._lock: - if not self._initialized or force: - try: - self._receiving_task.cancel() - await self._receiving_task - await self.ws.close() - except (AttributeError, asyncio.CancelledError): - pass - self.ws = await asyncio.wait_for( - connect(self.ws_url, **self._options), timeout=10 - ) - self._receiving_task = asyncio.create_task(self._start_receiving()) - self._initialized = True + self._is_connecting = True + try: + now = await self.loop_time() + self.last_received = now + self.last_sent = now + if self._exit_task: + self._exit_task.cancel() + if not self._is_closing: + if not self._initialized or force: + try: + await asyncio.wait_for(self._cancel(), timeout=10.0) + except asyncio.TimeoutError: + pass + + self.ws = await asyncio.wait_for( + connect(self.ws_url, **self._options), timeout=10.0 + ) + self._receiving_task = asyncio.get_running_loop().create_task( + self._start_receiving() + ) + self._initialized = True + finally: + self._is_connecting = False async def __aexit__(self, exc_type, exc_val, exc_tb): - async with self._lock: # TODO is this actually what I want to happen? - self._in_use -= 1 - if self._exit_task is not None: - self._exit_task.cancel() - try: - await self._exit_task - except asyncio.CancelledError: - pass - if self._in_use == 0 and self.ws is not None: - self._open_subscriptions = 0 - self._exit_task = asyncio.create_task(self._exit_with_timer()) + self._is_closing = True + try: + if not self._is_connecting: + self._in_use -= 1 + if self._exit_task is not None: + self._exit_task.cancel() + try: + await self._exit_task + except asyncio.CancelledError: + pass + if self._in_use == 0 and self.ws is not None: + self._open_subscriptions = 0 + self._exit_task = asyncio.create_task(self._exit_with_timer()) + finally: + self._is_closing = False async def _exit_with_timer(self): """ @@ -605,16 +632,15 @@ async def _exit_with_timer(self): pass async def shutdown(self): - async with self._lock: - try: - self._receiving_task.cancel() - await self._receiving_task - await self.ws.close() - except (AttributeError, asyncio.CancelledError): - pass - self.ws = None - self._initialized = False - self._receiving_task = None + self._is_closing = True + try: + await asyncio.wait_for(self._cancel(), timeout=10.0) + except asyncio.TimeoutError: + pass + self.ws = None + self._initialized = False + self._receiving_task = None + self._is_closing = False async def _recv(self) -> None: try: @@ -624,10 +650,6 @@ async def _recv(self) -> None: raw_websocket_logger.debug(f"WEBSOCKET_RECEIVE> {recd.decode()}") response = json.loads(recd) self.last_received = await self.loop_time() - async with self._lock: - # note that these 'subscriptions' are all waiting sent messages which have not received - # responses, and thus are not the same as RPC 'subscriptions', which are unique - self._open_subscriptions -= 1 if "id" in response: self._received[response["id"]] = response self._in_use_ids.remove(response["id"]) @@ -647,8 +669,7 @@ async def _start_receiving(self): except asyncio.CancelledError: pass except ConnectionClosed: - async with self._lock: - await self.connect(force=True) + await self.connect(force=True) async def send(self, payload: dict) -> int: """ @@ -674,8 +695,7 @@ async def send(self, payload: dict) -> int: self.last_sent = await self.loop_time() return original_id except (ConnectionClosed, ssl.SSLError, EOFError): - async with self._lock: - await self.connect(force=True) + await self.connect(force=True) async def retrieve(self, item_id: int) -> Optional[dict]: """ @@ -710,6 +730,7 @@ def __init__( retry_timeout: float = 60.0, _mock: bool = False, _log_raw_websockets: bool = False, + ws_shutdown_timer: float = 5.0, ): """ The asyncio-compatible version of the subtensor interface commands we use in bittensor. It is important to @@ -728,6 +749,7 @@ def __init__( retry_timeout: how to long wait since the last ping to retry the RPC request _mock: whether to use mock version of the subtensor interface _log_raw_websockets: whether to log raw websocket requests during RPC requests + ws_shutdown_timer: how long after the last connection your websocket should close """ self.max_retries = max_retries @@ -744,6 +766,7 @@ def __init__( "max_size": self.ws_max_size, "write_limit": 2**16, }, + shutdown_timer=ws_shutdown_timer, ) else: self.ws = AsyncMock(spec=Websocket) diff --git a/async_substrate_interface/substrate_addons.py b/async_substrate_interface/substrate_addons.py index 7ec19b4..578eb80 100644 --- a/async_substrate_interface/substrate_addons.py +++ b/async_substrate_interface/substrate_addons.py @@ -264,6 +264,7 @@ def __init__( _mock: bool = False, _log_raw_websockets: bool = False, archive_nodes: Optional[list[str]] = None, + ws_shutdown_timer: float = 5.0, ): fallback_chains = fallback_chains or [] archive_nodes = archive_nodes or [] @@ -291,6 +292,7 @@ def __init__( retry_timeout=retry_timeout, max_retries=max_retries, _log_raw_websockets=_log_raw_websockets, + ws_shutdown_timer=ws_shutdown_timer, ) self._original_methods = { method: getattr(self, method) for method in RETRY_METHODS From 0d264ba2a363ae3abe27e7ff14fb6936f05fbf05 Mon Sep 17 00:00:00 2001 From: Benjamin Himes Date: Wed, 11 Jun 2025 23:24:46 +0200 Subject: [PATCH 8/8] Add test for ws shutdown timer. --- .../asyncio_/test_substrate_interface.py | 20 +++++++++++++++++++ 1 file changed, 20 insertions(+) diff --git a/tests/unit_tests/asyncio_/test_substrate_interface.py b/tests/unit_tests/asyncio_/test_substrate_interface.py index ea76595..a64d570 100644 --- a/tests/unit_tests/asyncio_/test_substrate_interface.py +++ b/tests/unit_tests/asyncio_/test_substrate_interface.py @@ -1,3 +1,4 @@ +import asyncio from unittest.mock import AsyncMock, MagicMock import pytest @@ -91,3 +92,22 @@ async def test_runtime_call(monkeypatch): substrate.rpc_request.assert_any_call( "state_call", ["SubstrateApi_SubstrateMethod", "", None] ) + + +@pytest.mark.asyncio +async def test_websocket_shutdown_timer(): + # using default ws shutdown timer of 5.0 seconds + async with AsyncSubstrateInterface("wss://lite.sub.latent.to:443") as substrate: + await substrate.get_chain_head() + await asyncio.sleep(6) + assert ( + substrate.ws._initialized is False + ) # connection should have closed automatically + + # using custom ws shutdown timer of 10.0 seconds + async with AsyncSubstrateInterface( + "wss://lite.sub.latent.to:443", ws_shutdown_timer=10.0 + ) as substrate: + await substrate.get_chain_head() + await asyncio.sleep(6) # same sleep time as before + assert substrate.ws._initialized is True # connection should still be open