From d6aa47d0aa86945b87673b1b1bacf700b51c0370 Mon Sep 17 00:00:00 2001 From: Benjamin Himes Date: Fri, 25 Apr 2025 15:54:36 +0200 Subject: [PATCH 01/13] [WIP] moving retry to async-substrate-interface --- async_substrate_interface/types.py | 202 +++++++++++++++++++++++++++++ 1 file changed, 202 insertions(+) diff --git a/async_substrate_interface/types.py b/async_substrate_interface/types.py index 754b860..c95db76 100644 --- a/async_substrate_interface/types.py +++ b/async_substrate_interface/types.py @@ -4,6 +4,8 @@ from collections.abc import Iterable from dataclasses import dataclass from datetime import datetime +from functools import partial +from itertools import cycle from typing import Optional, Union, Any from bt_decode import PortableRegistry, encode as encode_by_type_string @@ -13,6 +15,7 @@ from scalecodec.type_registry import load_type_registry_preset from scalecodec.types import GenericCall, ScaleType +from .errors import MaxRetriesExceeded from .utils import json @@ -358,6 +361,205 @@ def serialize(self): def decode(self): return self.value +SubstrateClass = Type[Union[SubstrateInterface, AsyncSubstrateInterface]] + + +class RetrySubstrate: + def __init__( + self, + substrate: SubstrateClass, + main_url: str, + ss58_format: int, + type_registry: dict, + use_remote_preset: bool, + chain_name: str, + _mock: bool, + fallback_chains: Optional[list[str]] = None, + retry_forever: bool = False, + ): + fallback_chains = fallback_chains or [] + self._substrate_class: SubstrateClass = substrate + self.ss58_format: int = ss58_format + self.type_registry: dict = type_registry + self.use_remote_preset: bool = use_remote_preset + self.chain_name: str = chain_name + self._mock = _mock + self.fallback_chains = ( + iter(fallback_chains) + if not retry_forever + else cycle(fallback_chains + [main_url]) + ) + initialized = False + for chain_url in [main_url] + fallback_chains: + try: + self._substrate = self._substrate_class( + url=chain_url, + ss58_format=ss58_format, + type_registry=type_registry, + use_remote_preset=use_remote_preset, + chain_name=chain_name, + _mock=_mock, + ) + initialized = True + break + except ConnectionError: + logging.warning(f"Unable to connect to {chain_url}") + if not initialized: + raise ConnectionError( + f"Unable to connect at any chains specified: {[main_url]+fallback_chains}" + ) + + # retries + + # TODO: properties that need retry logic + # properties + # version + # token_decimals + # token_symbol + # name + + retry = ( + self._async_retry + if self._substrate_class == AsyncSubstrateInterface + else self._retry + ) + + self._get_block_handler = partial(retry, "_get_block_handler") + self.apply_type_registry_presets = partial(retry, "apply_type_registry_presets") + self.close = partial(retry, "close") + self.compose_call = partial(retry, "compose_call") + self.connect = partial(retry, "connect") + self.create_scale_object = partial(retry, "create_scale_object") + self.create_signed_extrinsic = partial(retry, "create_signed_extrinsic") + self.create_storage_key = partial(retry, "create_storage_key") + self.decode_scale = partial(retry, "decode_scale") + self.encode_scale = partial(retry, "encode_scale") + self.extension_call = partial(retry, "extension_call") + self.filter_events = partial(retry, "filter_events") + self.filter_extrinsics = partial(retry, "filter_extrinsics") + self.generate_signature_payload = partial(retry, "generate_signature_payload") + self.get_account_next_index = partial(retry, "get_account_next_index") + self.get_account_nonce = partial(retry, "get_account_nonce") + self.get_block = partial(retry, "get_block") + self.get_block_hash = partial(retry, "get_block_hash") + self.get_block_header = partial(retry, "get_block_header") + self.get_block_metadata = partial(retry, "get_block_metadata") + self.get_block_number = partial(retry, "get_block_number") + self.get_block_runtime_info = partial(retry, "get_block_runtime_info") + self.get_block_runtime_version_for = partial( + retry, "get_block_runtime_version_for" + ) + self.get_block_timestamp = partial(retry, "get_block_timestamp") + self.get_chain_finalised_head = partial(retry, "get_chain_finalised_head") + self.get_chain_head = partial(retry, "get_chain_head") + self.get_constant = partial(retry, "get_constant") + self.get_events = partial(retry, "get_events") + self.get_extrinsics = partial(retry, "get_extrinsics") + self.get_metadata_call_function = partial(retry, "get_metadata_call_function") + self.get_metadata_constant = partial(retry, "get_metadata_constant") + self.get_metadata_error = partial(retry, "get_metadata_error") + self.get_metadata_errors = partial(retry, "get_metadata_errors") + self.get_metadata_module = partial(retry, "get_metadata_module") + self.get_metadata_modules = partial(retry, "get_metadata_modules") + self.get_metadata_runtime_call_function = partial( + retry, "get_metadata_runtime_call_function" + ) + self.get_metadata_runtime_call_functions = partial( + retry, "get_metadata_runtime_call_functions" + ) + self.get_metadata_storage_function = partial( + retry, "get_metadata_storage_function" + ) + self.get_metadata_storage_functions = partial( + retry, "get_metadata_storage_functions" + ) + self.get_parent_block_hash = partial(retry, "get_parent_block_hash") + self.get_payment_info = partial(retry, "get_payment_info") + self.get_storage_item = partial(retry, "get_storage_item") + self.get_type_definition = partial(retry, "get_type_definition") + self.get_type_registry = partial(retry, "get_type_registry") + self.init_runtime = partial(retry, "init_runtime") + self.initialize = partial(retry, "initialize") + self.is_valid_ss58_address = partial(retry, "is_valid_ss58_address") + self.load_runtime = partial(retry, "load_runtime") + self.make_payload = partial(retry, "make_payload") + self.query = partial(retry, "query") + self.query_map = partial(retry, "query_map") + self.query_multi = partial(retry, "query_multi") + self.query_multiple = partial(retry, "query_multiple") + self.reload_type_registry = partial(retry, "reload_type_registry") + self.retrieve_extrinsic_by_hash = partial(retry, "retrieve_extrinsic_by_hash") + self.retrieve_extrinsic_by_identifier = partial( + retry, "retrieve_extrinsic_by_identifier" + ) + self.rpc_request = partial(retry, "rpc_request") + self.runtime_call = partial(retry, "runtime_call") + self.search_block_number = partial(retry, "search_block_number") + self.serialize_constant = partial(retry, "serialize_constant") + self.serialize_module_call = partial(retry, "serialize_module_call") + self.serialize_module_error = partial(retry, "serialize_module_error") + self.serialize_module_event = partial(retry, "serialize_module_event") + self.serialize_storage_item = partial(retry, "serialize_storage_item") + self.ss58_decode = partial(retry, "ss58_decode") + self.ss58_encode = partial(retry, "ss58_encode") + self.submit_extrinsic = partial(retry, "submit_extrinsic") + self.subscribe_block_headers = partial(retry, "subscribe_block_headers") + self.supports_rpc_method = partial(retry, "supports_rpc_method") + self.ws = self._substrate.ws + + def _retry(self, method, *args, **kwargs): + try: + method_ = getattr(self._substrate, method) + return method_(*args, **kwargs) + except (MaxRetriesExceeded, ConnectionError, ConnectionRefusedError) as e: + try: + self._reinstantiate_substrate() + method_ = getattr(self._substrate, method) + return self._retry(method_(*args, **kwargs)) + except StopIteration: + logging.error( + f"Max retries exceeded with {self._substrate.url}. No more fallback chains." + ) + raise MaxRetriesExceeded + + async def _async_retry(self, method, *args, **kwargs): + try: + method_ = getattr(self._substrate, method) + if asyncio.iscoroutinefunction(method_): + return await method_(*args, **kwargs) + else: + return method_(*args, **kwargs) + except (MaxRetriesExceeded, ConnectionError, ConnectionRefusedError) as e: + try: + self._reinstantiate_substrate(e) + method_ = getattr(self._substrate, method) + if asyncio.iscoroutinefunction(method_): + return await method_(*args, **kwargs) + else: + return method_(*args, **kwargs) + except StopIteration: + logging.error( + f"Max retries exceeded with {self._substrate.url}. No more fallback chains." + ) + raise MaxRetriesExceeded + + def _reinstantiate_substrate(self, e: Optional[Exception] = None) -> None: + next_network = next(self.fallback_chains) + if e.__class__ == MaxRetriesExceeded: + logging.error( + f"Max retries exceeded with {self._substrate.url}. Retrying with {next_network}." + ) + else: + print(f"Connection error. Trying again with {next_network}") + self._substrate = self._substrate_class( + url=next_network, + ss58_format=self.ss58_format, + type_registry=self.type_registry, + use_remote_preset=self.use_remote_preset, + chain_name=self.chain_name, + _mock=self._mock, + ) + class SubstrateMixin(ABC): type_registry_preset = None From 743f708575fde3f6742fa49a63c18878449bc67e Mon Sep 17 00:00:00 2001 From: Benjamin Himes Date: Fri, 25 Apr 2025 17:36:37 +0200 Subject: [PATCH 02/13] moved around some stuff --- async_substrate_interface/substrate_addons.py | 214 ++++++++++++++++++ async_substrate_interface/types.py | 199 ---------------- 2 files changed, 214 insertions(+), 199 deletions(-) create mode 100644 async_substrate_interface/substrate_addons.py diff --git a/async_substrate_interface/substrate_addons.py b/async_substrate_interface/substrate_addons.py new file mode 100644 index 0000000..dedb968 --- /dev/null +++ b/async_substrate_interface/substrate_addons.py @@ -0,0 +1,214 @@ +import asyncio +import logging +from functools import partial +from itertools import cycle +from typing import Optional, Type, Union + +from async_substrate_interface.async_substrate import AsyncSubstrateInterface +from async_substrate_interface.errors import MaxRetriesExceeded +from async_substrate_interface.sync_substrate import SubstrateInterface + +SubstrateClass = Type[Union[SubstrateInterface, AsyncSubstrateInterface]] + + +class RetrySubstrate: + def __init__( + self, + substrate: SubstrateClass, + main_url: str, + use_remote_preset: bool = False, + fallback_chains: Optional[list[str]] = None, + retry_forever: bool = False, + ss58_format: Optional[int] = None, + type_registry: Optional[dict] = None, + type_registry_preset: Optional[str] = None, + chain_name: str = "", + max_retries: int = 5, + retry_timeout: float = 60.0, + _mock: bool = False, + ): + fallback_chains = fallback_chains or [] + self._substrate_class: SubstrateClass = substrate + self.ss58_format: int = ss58_format + self.type_registry: dict = type_registry + self.use_remote_preset: bool = use_remote_preset + self.chain_name: Optional[str] = chain_name + self.max_retries: int = max_retries + self.retry_timeout: float = retry_timeout + self._mock = _mock + self.type_registry_preset: Optional[str] = type_registry_preset + self.fallback_chains = ( + iter(fallback_chains) + if not retry_forever + else cycle(fallback_chains + [main_url]) + ) + initialized = False + for chain_url in [main_url] + fallback_chains: + try: + self._substrate = self._substrate_class( + url=chain_url, + ss58_format=ss58_format, + type_registry=type_registry, + use_remote_preset=use_remote_preset, + chain_name=chain_name, + _mock=_mock, + ) + initialized = True + break + except ConnectionError: + logging.warning(f"Unable to connect to {chain_url}") + if not initialized: + raise ConnectionError( + f"Unable to connect at any chains specified: {[main_url]+fallback_chains}" + ) + + # retries + + # TODO: properties that need retry logic + # properties + # version + # token_decimals + # token_symbol + # name + + retry = ( + self._async_retry + if self._substrate_class == AsyncSubstrateInterface + else self._retry + ) + + self._get_block_handler = partial(retry, "_get_block_handler") + self.apply_type_registry_presets = partial(retry, "apply_type_registry_presets") + self.close = partial(retry, "close") + self.compose_call = partial(retry, "compose_call") + self.connect = partial(retry, "connect") + self.create_scale_object = partial(retry, "create_scale_object") + self.create_signed_extrinsic = partial(retry, "create_signed_extrinsic") + self.create_storage_key = partial(retry, "create_storage_key") + self.decode_scale = partial(retry, "decode_scale") + self.encode_scale = partial(retry, "encode_scale") + self.extension_call = partial(retry, "extension_call") + self.filter_events = partial(retry, "filter_events") + self.filter_extrinsics = partial(retry, "filter_extrinsics") + self.generate_signature_payload = partial(retry, "generate_signature_payload") + self.get_account_next_index = partial(retry, "get_account_next_index") + self.get_account_nonce = partial(retry, "get_account_nonce") + self.get_block = partial(retry, "get_block") + self.get_block_hash = partial(retry, "get_block_hash") + self.get_block_header = partial(retry, "get_block_header") + self.get_block_metadata = partial(retry, "get_block_metadata") + self.get_block_number = partial(retry, "get_block_number") + self.get_block_runtime_info = partial(retry, "get_block_runtime_info") + self.get_block_runtime_version_for = partial( + retry, "get_block_runtime_version_for" + ) + self.get_block_timestamp = partial(retry, "get_block_timestamp") + self.get_chain_finalised_head = partial(retry, "get_chain_finalised_head") + self.get_chain_head = partial(retry, "get_chain_head") + self.get_constant = partial(retry, "get_constant") + self.get_events = partial(retry, "get_events") + self.get_extrinsics = partial(retry, "get_extrinsics") + self.get_metadata_call_function = partial(retry, "get_metadata_call_function") + self.get_metadata_constant = partial(retry, "get_metadata_constant") + self.get_metadata_error = partial(retry, "get_metadata_error") + self.get_metadata_errors = partial(retry, "get_metadata_errors") + self.get_metadata_module = partial(retry, "get_metadata_module") + self.get_metadata_modules = partial(retry, "get_metadata_modules") + self.get_metadata_runtime_call_function = partial( + retry, "get_metadata_runtime_call_function" + ) + self.get_metadata_runtime_call_functions = partial( + retry, "get_metadata_runtime_call_functions" + ) + self.get_metadata_storage_function = partial( + retry, "get_metadata_storage_function" + ) + self.get_metadata_storage_functions = partial( + retry, "get_metadata_storage_functions" + ) + self.get_parent_block_hash = partial(retry, "get_parent_block_hash") + self.get_payment_info = partial(retry, "get_payment_info") + self.get_storage_item = partial(retry, "get_storage_item") + self.get_type_definition = partial(retry, "get_type_definition") + self.get_type_registry = partial(retry, "get_type_registry") + self.init_runtime = partial(retry, "init_runtime") + self.initialize = partial(retry, "initialize") + self.is_valid_ss58_address = partial(retry, "is_valid_ss58_address") + self.load_runtime = partial(retry, "load_runtime") + self.make_payload = partial(retry, "make_payload") + self.query = partial(retry, "query") + self.query_map = partial(retry, "query_map") + self.query_multi = partial(retry, "query_multi") + self.query_multiple = partial(retry, "query_multiple") + self.reload_type_registry = partial(retry, "reload_type_registry") + self.retrieve_extrinsic_by_hash = partial(retry, "retrieve_extrinsic_by_hash") + self.retrieve_extrinsic_by_identifier = partial( + retry, "retrieve_extrinsic_by_identifier" + ) + self.rpc_request = partial(retry, "rpc_request") + self.runtime_call = partial(retry, "runtime_call") + self.search_block_number = partial(retry, "search_block_number") + self.serialize_constant = partial(retry, "serialize_constant") + self.serialize_module_call = partial(retry, "serialize_module_call") + self.serialize_module_error = partial(retry, "serialize_module_error") + self.serialize_module_event = partial(retry, "serialize_module_event") + self.serialize_storage_item = partial(retry, "serialize_storage_item") + self.ss58_decode = partial(retry, "ss58_decode") + self.ss58_encode = partial(retry, "ss58_encode") + self.submit_extrinsic = partial(retry, "submit_extrinsic") + self.subscribe_block_headers = partial(retry, "subscribe_block_headers") + self.supports_rpc_method = partial(retry, "supports_rpc_method") + self.ws = self._substrate.ws + + def _retry(self, method, *args, **kwargs): + try: + method_ = getattr(self._substrate, method) + return method_(*args, **kwargs) + except (MaxRetriesExceeded, ConnectionError, ConnectionRefusedError) as e: + try: + self._reinstantiate_substrate(e) + method_ = getattr(self._substrate, method) + return self._retry(method_(*args, **kwargs)) + except StopIteration: + logging.error( + f"Max retries exceeded with {self._substrate.url}. No more fallback chains." + ) + raise MaxRetriesExceeded + + async def _async_retry(self, method, *args, **kwargs): + try: + method_ = getattr(self._substrate, method) + if asyncio.iscoroutinefunction(method_): + return await method_(*args, **kwargs) + else: + return method_(*args, **kwargs) + except (MaxRetriesExceeded, ConnectionError, ConnectionRefusedError) as e: + try: + self._reinstantiate_substrate(e) + method_ = getattr(self._substrate, method) + if asyncio.iscoroutinefunction(method_): + return await method_(*args, **kwargs) + else: + return method_(*args, **kwargs) + except StopIteration: + logging.error( + f"Max retries exceeded with {self._substrate.url}. No more fallback chains." + ) + raise MaxRetriesExceeded + + def _reinstantiate_substrate(self, e: Optional[Exception] = None) -> None: + next_network = next(self.fallback_chains) + if e.__class__ == MaxRetriesExceeded: + logging.error( + f"Max retries exceeded with {self._substrate.url}. Retrying with {next_network}." + ) + else: + print(f"Connection error. Trying again with {next_network}") + self._substrate = self._substrate_class( + url=next_network, + ss58_format=self.ss58_format, + type_registry=self.type_registry, + use_remote_preset=self.use_remote_preset, + chain_name=self.chain_name, + _mock=self._mock, + ) diff --git a/async_substrate_interface/types.py b/async_substrate_interface/types.py index c95db76..e1a3266 100644 --- a/async_substrate_interface/types.py +++ b/async_substrate_interface/types.py @@ -361,205 +361,6 @@ def serialize(self): def decode(self): return self.value -SubstrateClass = Type[Union[SubstrateInterface, AsyncSubstrateInterface]] - - -class RetrySubstrate: - def __init__( - self, - substrate: SubstrateClass, - main_url: str, - ss58_format: int, - type_registry: dict, - use_remote_preset: bool, - chain_name: str, - _mock: bool, - fallback_chains: Optional[list[str]] = None, - retry_forever: bool = False, - ): - fallback_chains = fallback_chains or [] - self._substrate_class: SubstrateClass = substrate - self.ss58_format: int = ss58_format - self.type_registry: dict = type_registry - self.use_remote_preset: bool = use_remote_preset - self.chain_name: str = chain_name - self._mock = _mock - self.fallback_chains = ( - iter(fallback_chains) - if not retry_forever - else cycle(fallback_chains + [main_url]) - ) - initialized = False - for chain_url in [main_url] + fallback_chains: - try: - self._substrate = self._substrate_class( - url=chain_url, - ss58_format=ss58_format, - type_registry=type_registry, - use_remote_preset=use_remote_preset, - chain_name=chain_name, - _mock=_mock, - ) - initialized = True - break - except ConnectionError: - logging.warning(f"Unable to connect to {chain_url}") - if not initialized: - raise ConnectionError( - f"Unable to connect at any chains specified: {[main_url]+fallback_chains}" - ) - - # retries - - # TODO: properties that need retry logic - # properties - # version - # token_decimals - # token_symbol - # name - - retry = ( - self._async_retry - if self._substrate_class == AsyncSubstrateInterface - else self._retry - ) - - self._get_block_handler = partial(retry, "_get_block_handler") - self.apply_type_registry_presets = partial(retry, "apply_type_registry_presets") - self.close = partial(retry, "close") - self.compose_call = partial(retry, "compose_call") - self.connect = partial(retry, "connect") - self.create_scale_object = partial(retry, "create_scale_object") - self.create_signed_extrinsic = partial(retry, "create_signed_extrinsic") - self.create_storage_key = partial(retry, "create_storage_key") - self.decode_scale = partial(retry, "decode_scale") - self.encode_scale = partial(retry, "encode_scale") - self.extension_call = partial(retry, "extension_call") - self.filter_events = partial(retry, "filter_events") - self.filter_extrinsics = partial(retry, "filter_extrinsics") - self.generate_signature_payload = partial(retry, "generate_signature_payload") - self.get_account_next_index = partial(retry, "get_account_next_index") - self.get_account_nonce = partial(retry, "get_account_nonce") - self.get_block = partial(retry, "get_block") - self.get_block_hash = partial(retry, "get_block_hash") - self.get_block_header = partial(retry, "get_block_header") - self.get_block_metadata = partial(retry, "get_block_metadata") - self.get_block_number = partial(retry, "get_block_number") - self.get_block_runtime_info = partial(retry, "get_block_runtime_info") - self.get_block_runtime_version_for = partial( - retry, "get_block_runtime_version_for" - ) - self.get_block_timestamp = partial(retry, "get_block_timestamp") - self.get_chain_finalised_head = partial(retry, "get_chain_finalised_head") - self.get_chain_head = partial(retry, "get_chain_head") - self.get_constant = partial(retry, "get_constant") - self.get_events = partial(retry, "get_events") - self.get_extrinsics = partial(retry, "get_extrinsics") - self.get_metadata_call_function = partial(retry, "get_metadata_call_function") - self.get_metadata_constant = partial(retry, "get_metadata_constant") - self.get_metadata_error = partial(retry, "get_metadata_error") - self.get_metadata_errors = partial(retry, "get_metadata_errors") - self.get_metadata_module = partial(retry, "get_metadata_module") - self.get_metadata_modules = partial(retry, "get_metadata_modules") - self.get_metadata_runtime_call_function = partial( - retry, "get_metadata_runtime_call_function" - ) - self.get_metadata_runtime_call_functions = partial( - retry, "get_metadata_runtime_call_functions" - ) - self.get_metadata_storage_function = partial( - retry, "get_metadata_storage_function" - ) - self.get_metadata_storage_functions = partial( - retry, "get_metadata_storage_functions" - ) - self.get_parent_block_hash = partial(retry, "get_parent_block_hash") - self.get_payment_info = partial(retry, "get_payment_info") - self.get_storage_item = partial(retry, "get_storage_item") - self.get_type_definition = partial(retry, "get_type_definition") - self.get_type_registry = partial(retry, "get_type_registry") - self.init_runtime = partial(retry, "init_runtime") - self.initialize = partial(retry, "initialize") - self.is_valid_ss58_address = partial(retry, "is_valid_ss58_address") - self.load_runtime = partial(retry, "load_runtime") - self.make_payload = partial(retry, "make_payload") - self.query = partial(retry, "query") - self.query_map = partial(retry, "query_map") - self.query_multi = partial(retry, "query_multi") - self.query_multiple = partial(retry, "query_multiple") - self.reload_type_registry = partial(retry, "reload_type_registry") - self.retrieve_extrinsic_by_hash = partial(retry, "retrieve_extrinsic_by_hash") - self.retrieve_extrinsic_by_identifier = partial( - retry, "retrieve_extrinsic_by_identifier" - ) - self.rpc_request = partial(retry, "rpc_request") - self.runtime_call = partial(retry, "runtime_call") - self.search_block_number = partial(retry, "search_block_number") - self.serialize_constant = partial(retry, "serialize_constant") - self.serialize_module_call = partial(retry, "serialize_module_call") - self.serialize_module_error = partial(retry, "serialize_module_error") - self.serialize_module_event = partial(retry, "serialize_module_event") - self.serialize_storage_item = partial(retry, "serialize_storage_item") - self.ss58_decode = partial(retry, "ss58_decode") - self.ss58_encode = partial(retry, "ss58_encode") - self.submit_extrinsic = partial(retry, "submit_extrinsic") - self.subscribe_block_headers = partial(retry, "subscribe_block_headers") - self.supports_rpc_method = partial(retry, "supports_rpc_method") - self.ws = self._substrate.ws - - def _retry(self, method, *args, **kwargs): - try: - method_ = getattr(self._substrate, method) - return method_(*args, **kwargs) - except (MaxRetriesExceeded, ConnectionError, ConnectionRefusedError) as e: - try: - self._reinstantiate_substrate() - method_ = getattr(self._substrate, method) - return self._retry(method_(*args, **kwargs)) - except StopIteration: - logging.error( - f"Max retries exceeded with {self._substrate.url}. No more fallback chains." - ) - raise MaxRetriesExceeded - - async def _async_retry(self, method, *args, **kwargs): - try: - method_ = getattr(self._substrate, method) - if asyncio.iscoroutinefunction(method_): - return await method_(*args, **kwargs) - else: - return method_(*args, **kwargs) - except (MaxRetriesExceeded, ConnectionError, ConnectionRefusedError) as e: - try: - self._reinstantiate_substrate(e) - method_ = getattr(self._substrate, method) - if asyncio.iscoroutinefunction(method_): - return await method_(*args, **kwargs) - else: - return method_(*args, **kwargs) - except StopIteration: - logging.error( - f"Max retries exceeded with {self._substrate.url}. No more fallback chains." - ) - raise MaxRetriesExceeded - - def _reinstantiate_substrate(self, e: Optional[Exception] = None) -> None: - next_network = next(self.fallback_chains) - if e.__class__ == MaxRetriesExceeded: - logging.error( - f"Max retries exceeded with {self._substrate.url}. Retrying with {next_network}." - ) - else: - print(f"Connection error. Trying again with {next_network}") - self._substrate = self._substrate_class( - url=next_network, - ss58_format=self.ss58_format, - type_registry=self.type_registry, - use_remote_preset=self.use_remote_preset, - chain_name=self.chain_name, - _mock=self._mock, - ) - class SubstrateMixin(ABC): type_registry_preset = None From 5e3d6f576188ea174bb8a1326f8c5db7b1cf5ca0 Mon Sep 17 00:00:00 2001 From: Benjamin Himes Date: Fri, 25 Apr 2025 17:36:49 +0200 Subject: [PATCH 03/13] moved around some stuff --- async_substrate_interface/types.py | 3 --- 1 file changed, 3 deletions(-) diff --git a/async_substrate_interface/types.py b/async_substrate_interface/types.py index e1a3266..754b860 100644 --- a/async_substrate_interface/types.py +++ b/async_substrate_interface/types.py @@ -4,8 +4,6 @@ from collections.abc import Iterable from dataclasses import dataclass from datetime import datetime -from functools import partial -from itertools import cycle from typing import Optional, Union, Any from bt_decode import PortableRegistry, encode as encode_by_type_string @@ -15,7 +13,6 @@ from scalecodec.type_registry import load_type_registry_preset from scalecodec.types import GenericCall, ScaleType -from .errors import MaxRetriesExceeded from .utils import json From be2e1ed86cd1b4245aecc37e68fc5e3f7e33af6b Mon Sep 17 00:00:00 2001 From: Benjamin Himes Date: Tue, 29 Apr 2025 20:47:36 +0200 Subject: [PATCH 04/13] Ruff --- async_substrate_interface/substrate_addons.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/async_substrate_interface/substrate_addons.py b/async_substrate_interface/substrate_addons.py index dedb968..362c8b7 100644 --- a/async_substrate_interface/substrate_addons.py +++ b/async_substrate_interface/substrate_addons.py @@ -59,7 +59,7 @@ def __init__( logging.warning(f"Unable to connect to {chain_url}") if not initialized: raise ConnectionError( - f"Unable to connect at any chains specified: {[main_url]+fallback_chains}" + f"Unable to connect at any chains specified: {[main_url] + fallback_chains}" ) # retries From d80c1e62f552d1a35d9d56ac2b897fc63b38a7b9 Mon Sep 17 00:00:00 2001 From: Benjamin Himes Date: Mon, 5 May 2025 23:14:07 +0200 Subject: [PATCH 05/13] New direction --- async_substrate_interface/substrate_addons.py | 315 ++++++++++-------- 1 file changed, 175 insertions(+), 140 deletions(-) diff --git a/async_substrate_interface/substrate_addons.py b/async_substrate_interface/substrate_addons.py index 362c8b7..b6d4daa 100644 --- a/async_substrate_interface/substrate_addons.py +++ b/async_substrate_interface/substrate_addons.py @@ -2,20 +2,90 @@ import logging from functools import partial from itertools import cycle -from typing import Optional, Type, Union +from typing import Optional from async_substrate_interface.async_substrate import AsyncSubstrateInterface from async_substrate_interface.errors import MaxRetriesExceeded from async_substrate_interface.sync_substrate import SubstrateInterface -SubstrateClass = Type[Union[SubstrateInterface, AsyncSubstrateInterface]] +RETRY_METHODS = [ + "_get_block_handler", + "apply_type_registry_presets", + "close", + "compose_call", + "connect", + "create_scale_object", + "create_signed_extrinsic", + "create_storage_key", + "decode_scale", + "encode_scale", + "extension_call", + "filter_events", + "filter_extrinsics", + "generate_signature_payload", + "get_account_next_index", + "get_account_nonce", + "get_block", + "get_block_hash", + "get_block_header", + "get_block_metadata", + "get_block_number", + "get_block_runtime_info", + "get_block_runtime_version_for", + "get_block_timestamp", + "get_chain_finalised_head", + "get_chain_head", + "get_constant", + "get_events", + "get_extrinsics", + "get_metadata_call_function", + "get_metadata_constant", + "get_metadata_error", + "get_metadata_errors", + "get_metadata_module", + "get_metadata_modules", + "get_metadata_runtime_call_function", + "get_metadata_runtime_call_functions", + "get_metadata_storage_function", + "get_metadata_storage_functions", + "get_parent_block_hash", + "get_payment_info", + "get_storage_item", + "get_type_definition", + "get_type_registry", + "init_runtime", + "initialize", + "is_valid_ss58_address", + "load_runtime", + "make_payload", + "query", + "query_map", + "query_multi", + "query_multiple", + "reload_type_registry", + "retrieve_extrinsic_by_hash", + "retrieve_extrinsic_by_identifier", + "rpc_request", + "runtime_call", + "search_block_number", + "serialize_constant", + "serialize_module_call", + "serialize_module_error", + "serialize_module_event", + "serialize_storage_item", + "ss58_decode", + "ss58_encode", + "submit_extrinsic", + "subscribe_block_headers", + "supports_rpc_method", +] -class RetrySubstrate: + +class RetrySyncSubstrate(SubstrateInterface): def __init__( self, - substrate: SubstrateClass, - main_url: str, + url: str, use_remote_preset: bool = False, fallback_chains: Optional[list[str]] = None, retry_forever: bool = False, @@ -28,30 +98,29 @@ def __init__( _mock: bool = False, ): fallback_chains = fallback_chains or [] - self._substrate_class: SubstrateClass = substrate - self.ss58_format: int = ss58_format - self.type_registry: dict = type_registry - self.use_remote_preset: bool = use_remote_preset - self.chain_name: Optional[str] = chain_name - self.max_retries: int = max_retries - self.retry_timeout: float = retry_timeout - self._mock = _mock - self.type_registry_preset: Optional[str] = type_registry_preset self.fallback_chains = ( iter(fallback_chains) if not retry_forever - else cycle(fallback_chains + [main_url]) + else cycle(fallback_chains + [url]) ) + self.use_remote_preset = use_remote_preset + self.chain_name = chain_name + self._mock = _mock + self.retry_timeout = retry_timeout + self.max_retries = max_retries initialized = False - for chain_url in [main_url] + fallback_chains: + for chain_url in [url] + fallback_chains: try: - self._substrate = self._substrate_class( + super().__init__( url=chain_url, ss58_format=ss58_format, type_registry=type_registry, use_remote_preset=use_remote_preset, + type_registry_preset=type_registry_preset, chain_name=chain_name, _mock=_mock, + retry_timeout=retry_timeout, + max_retries=max_retries, ) initialized = True break @@ -59,125 +128,108 @@ def __init__( logging.warning(f"Unable to connect to {chain_url}") if not initialized: raise ConnectionError( - f"Unable to connect at any chains specified: {[main_url] + fallback_chains}" + f"Unable to connect at any chains specified: {[url] + fallback_chains}" ) - - # retries - - # TODO: properties that need retry logic - # properties - # version - # token_decimals - # token_symbol - # name - - retry = ( - self._async_retry - if self._substrate_class == AsyncSubstrateInterface - else self._retry - ) - - self._get_block_handler = partial(retry, "_get_block_handler") - self.apply_type_registry_presets = partial(retry, "apply_type_registry_presets") - self.close = partial(retry, "close") - self.compose_call = partial(retry, "compose_call") - self.connect = partial(retry, "connect") - self.create_scale_object = partial(retry, "create_scale_object") - self.create_signed_extrinsic = partial(retry, "create_signed_extrinsic") - self.create_storage_key = partial(retry, "create_storage_key") - self.decode_scale = partial(retry, "decode_scale") - self.encode_scale = partial(retry, "encode_scale") - self.extension_call = partial(retry, "extension_call") - self.filter_events = partial(retry, "filter_events") - self.filter_extrinsics = partial(retry, "filter_extrinsics") - self.generate_signature_payload = partial(retry, "generate_signature_payload") - self.get_account_next_index = partial(retry, "get_account_next_index") - self.get_account_nonce = partial(retry, "get_account_nonce") - self.get_block = partial(retry, "get_block") - self.get_block_hash = partial(retry, "get_block_hash") - self.get_block_header = partial(retry, "get_block_header") - self.get_block_metadata = partial(retry, "get_block_metadata") - self.get_block_number = partial(retry, "get_block_number") - self.get_block_runtime_info = partial(retry, "get_block_runtime_info") - self.get_block_runtime_version_for = partial( - retry, "get_block_runtime_version_for" - ) - self.get_block_timestamp = partial(retry, "get_block_timestamp") - self.get_chain_finalised_head = partial(retry, "get_chain_finalised_head") - self.get_chain_head = partial(retry, "get_chain_head") - self.get_constant = partial(retry, "get_constant") - self.get_events = partial(retry, "get_events") - self.get_extrinsics = partial(retry, "get_extrinsics") - self.get_metadata_call_function = partial(retry, "get_metadata_call_function") - self.get_metadata_constant = partial(retry, "get_metadata_constant") - self.get_metadata_error = partial(retry, "get_metadata_error") - self.get_metadata_errors = partial(retry, "get_metadata_errors") - self.get_metadata_module = partial(retry, "get_metadata_module") - self.get_metadata_modules = partial(retry, "get_metadata_modules") - self.get_metadata_runtime_call_function = partial( - retry, "get_metadata_runtime_call_function" - ) - self.get_metadata_runtime_call_functions = partial( - retry, "get_metadata_runtime_call_functions" - ) - self.get_metadata_storage_function = partial( - retry, "get_metadata_storage_function" - ) - self.get_metadata_storage_functions = partial( - retry, "get_metadata_storage_functions" - ) - self.get_parent_block_hash = partial(retry, "get_parent_block_hash") - self.get_payment_info = partial(retry, "get_payment_info") - self.get_storage_item = partial(retry, "get_storage_item") - self.get_type_definition = partial(retry, "get_type_definition") - self.get_type_registry = partial(retry, "get_type_registry") - self.init_runtime = partial(retry, "init_runtime") - self.initialize = partial(retry, "initialize") - self.is_valid_ss58_address = partial(retry, "is_valid_ss58_address") - self.load_runtime = partial(retry, "load_runtime") - self.make_payload = partial(retry, "make_payload") - self.query = partial(retry, "query") - self.query_map = partial(retry, "query_map") - self.query_multi = partial(retry, "query_multi") - self.query_multiple = partial(retry, "query_multiple") - self.reload_type_registry = partial(retry, "reload_type_registry") - self.retrieve_extrinsic_by_hash = partial(retry, "retrieve_extrinsic_by_hash") - self.retrieve_extrinsic_by_identifier = partial( - retry, "retrieve_extrinsic_by_identifier" - ) - self.rpc_request = partial(retry, "rpc_request") - self.runtime_call = partial(retry, "runtime_call") - self.search_block_number = partial(retry, "search_block_number") - self.serialize_constant = partial(retry, "serialize_constant") - self.serialize_module_call = partial(retry, "serialize_module_call") - self.serialize_module_error = partial(retry, "serialize_module_error") - self.serialize_module_event = partial(retry, "serialize_module_event") - self.serialize_storage_item = partial(retry, "serialize_storage_item") - self.ss58_decode = partial(retry, "ss58_decode") - self.ss58_encode = partial(retry, "ss58_encode") - self.submit_extrinsic = partial(retry, "submit_extrinsic") - self.subscribe_block_headers = partial(retry, "subscribe_block_headers") - self.supports_rpc_method = partial(retry, "supports_rpc_method") - self.ws = self._substrate.ws + for method in RETRY_METHODS: + setattr(self, method, partial(self._retry, method)) def _retry(self, method, *args, **kwargs): try: - method_ = getattr(self._substrate, method) + method_ = getattr(self, method) return method_(*args, **kwargs) except (MaxRetriesExceeded, ConnectionError, ConnectionRefusedError) as e: try: self._reinstantiate_substrate(e) - method_ = getattr(self._substrate, method) + method_ = getattr(self, method) return self._retry(method_(*args, **kwargs)) except StopIteration: logging.error( - f"Max retries exceeded with {self._substrate.url}. No more fallback chains." + f"Max retries exceeded with {self.url}. No more fallback chains." ) raise MaxRetriesExceeded - async def _async_retry(self, method, *args, **kwargs): + def _reinstantiate_substrate(self, e: Optional[Exception] = None) -> None: + next_network = next(self.fallback_chains) + if e.__class__ == MaxRetriesExceeded: + logging.error( + f"Max retries exceeded with {self.url}. Retrying with {next_network}." + ) + else: + print(f"Connection error. Trying again with {next_network}") + super().__init__( + url=next_network, + ss58_format=self.ss58_format, + type_registry=self.type_registry, + use_remote_preset=self.use_remote_preset, + chain_name=self.chain_name, + _mock=self._mock, + retry_timeout=self.retry_timeout, + max_retries=self.max_retries, + ) + + +class RetryAsyncSubstrate(AsyncSubstrateInterface): + def __init__( + self, + url: str, + use_remote_preset: bool = False, + fallback_chains: Optional[list[str]] = None, + retry_forever: bool = False, + ss58_format: Optional[int] = None, + type_registry: Optional[dict] = None, + type_registry_preset: Optional[str] = None, + chain_name: str = "", + max_retries: int = 5, + retry_timeout: float = 60.0, + _mock: bool = False, + ): + fallback_chains = fallback_chains or [] + self.fallback_chains = ( + iter(fallback_chains) + if not retry_forever + else cycle(fallback_chains + [url]) + ) + self.use_remote_preset = use_remote_preset + self.chain_name = chain_name + self._mock = _mock + self.retry_timeout = retry_timeout + self.max_retries = max_retries + super().__init__( + url=url, + ss58_format=ss58_format, + type_registry=type_registry, + use_remote_preset=use_remote_preset, + type_registry_preset=type_registry_preset, + chain_name=chain_name, + _mock=_mock, + retry_timeout=retry_timeout, + max_retries=max_retries, + ) + for method in RETRY_METHODS: + setattr(self, method, partial(self._retry, method)) + + def _reinstantiate_substrate(self, e: Optional[Exception] = None) -> None: + next_network = next(self.fallback_chains) + if e.__class__ == MaxRetriesExceeded: + logging.error( + f"Max retries exceeded with {self.url}. Retrying with {next_network}." + ) + else: + print(f"Connection error. Trying again with {next_network}") + super().__init__( + url=next_network, + ss58_format=self.ss58_format, + type_registry=self.type_registry, + use_remote_preset=self.use_remote_preset, + chain_name=self.chain_name, + _mock=self._mock, + retry_timeout=self.retry_timeout, + max_retries=self.max_retries, + ) + + async def _retry(self, method, *args, **kwargs): try: - method_ = getattr(self._substrate, method) + method_ = getattr(self, method) if asyncio.iscoroutinefunction(method_): return await method_(*args, **kwargs) else: @@ -185,30 +237,13 @@ async def _async_retry(self, method, *args, **kwargs): except (MaxRetriesExceeded, ConnectionError, ConnectionRefusedError) as e: try: self._reinstantiate_substrate(e) - method_ = getattr(self._substrate, method) + method_ = getattr(self, method) if asyncio.iscoroutinefunction(method_): return await method_(*args, **kwargs) else: return method_(*args, **kwargs) except StopIteration: logging.error( - f"Max retries exceeded with {self._substrate.url}. No more fallback chains." + f"Max retries exceeded with {self.url}. No more fallback chains." ) raise MaxRetriesExceeded - - def _reinstantiate_substrate(self, e: Optional[Exception] = None) -> None: - next_network = next(self.fallback_chains) - if e.__class__ == MaxRetriesExceeded: - logging.error( - f"Max retries exceeded with {self._substrate.url}. Retrying with {next_network}." - ) - else: - print(f"Connection error. Trying again with {next_network}") - self._substrate = self._substrate_class( - url=next_network, - ss58_format=self.ss58_format, - type_registry=self.type_registry, - use_remote_preset=self.use_remote_preset, - chain_name=self.chain_name, - _mock=self._mock, - ) From a64def5826665e55f75b9bd447c377783feedbe0 Mon Sep 17 00:00:00 2001 From: Benjamin Himes Date: Mon, 5 May 2025 23:58:12 +0200 Subject: [PATCH 06/13] Add properties as well. --- async_substrate_interface/substrate_addons.py | 33 +++++++++++++++++++ 1 file changed, 33 insertions(+) diff --git a/async_substrate_interface/substrate_addons.py b/async_substrate_interface/substrate_addons.py index b6d4daa..2409cf0 100644 --- a/async_substrate_interface/substrate_addons.py +++ b/async_substrate_interface/substrate_addons.py @@ -81,6 +81,8 @@ "supports_rpc_method", ] +RETRY_PROPS = ["properties", "version", "token_decimals", "token_symbol", "name"] + class RetrySyncSubstrate(SubstrateInterface): def __init__( @@ -132,6 +134,8 @@ def __init__( ) for method in RETRY_METHODS: setattr(self, method, partial(self._retry, method)) + for property_ in RETRY_PROPS: + setattr(self, property_, partial(self._retry_property, property_)) def _retry(self, method, *args, **kwargs): try: @@ -148,6 +152,19 @@ def _retry(self, method, *args, **kwargs): ) raise MaxRetriesExceeded + def _retry_property(self, property_): + try: + return getattr(self, property_) + except (MaxRetriesExceeded, ConnectionError, ConnectionRefusedError) as e: + try: + self._reinstantiate_substrate(e) + return self._retry_property(property_) + except StopIteration: + logging.error( + f"Max retries exceeded with {self.url}. No more fallback chains." + ) + raise MaxRetriesExceeded + def _reinstantiate_substrate(self, e: Optional[Exception] = None) -> None: next_network = next(self.fallback_chains) if e.__class__ == MaxRetriesExceeded: @@ -207,6 +224,8 @@ def __init__( ) for method in RETRY_METHODS: setattr(self, method, partial(self._retry, method)) + for property_ in RETRY_PROPS: + setattr(self, property_, partial(self._retry_property, property_)) def _reinstantiate_substrate(self, e: Optional[Exception] = None) -> None: next_network = next(self.fallback_chains) @@ -237,6 +256,7 @@ async def _retry(self, method, *args, **kwargs): except (MaxRetriesExceeded, ConnectionError, ConnectionRefusedError) as e: try: self._reinstantiate_substrate(e) + await self.initialize() method_ = getattr(self, method) if asyncio.iscoroutinefunction(method_): return await method_(*args, **kwargs) @@ -247,3 +267,16 @@ async def _retry(self, method, *args, **kwargs): f"Max retries exceeded with {self.url}. No more fallback chains." ) raise MaxRetriesExceeded + + async def _retry_property(self, property_): + try: + return await getattr(self, property_) + except (MaxRetriesExceeded, ConnectionError, ConnectionRefusedError) as e: + try: + self._reinstantiate_substrate(e) + return await self._retry_property(property_) + except StopIteration: + logging.error( + f"Max retries exceeded with {self.url}. No more fallback chains." + ) + raise MaxRetriesExceeded From 951d558bdda67619d10b8fa8023d72a9c16f9506 Mon Sep 17 00:00:00 2001 From: Benjamin Himes Date: Tue, 6 May 2025 17:38:01 +0200 Subject: [PATCH 07/13] Update. --- async_substrate_interface/substrate_addons.py | 38 +++++-------------- 1 file changed, 9 insertions(+), 29 deletions(-) diff --git a/async_substrate_interface/substrate_addons.py b/async_substrate_interface/substrate_addons.py index 2409cf0..7eb9628 100644 --- a/async_substrate_interface/substrate_addons.py +++ b/async_substrate_interface/substrate_addons.py @@ -11,18 +11,13 @@ RETRY_METHODS = [ "_get_block_handler", - "apply_type_registry_presets", "close", "compose_call", - "connect", "create_scale_object", "create_signed_extrinsic", "create_storage_key", "decode_scale", "encode_scale", - "extension_call", - "filter_events", - "filter_extrinsics", "generate_signature_payload", "get_account_next_index", "get_account_nonce", @@ -33,7 +28,6 @@ "get_block_number", "get_block_runtime_info", "get_block_runtime_version_for", - "get_block_timestamp", "get_chain_finalised_head", "get_chain_head", "get_constant", @@ -56,26 +50,13 @@ "get_type_registry", "init_runtime", "initialize", - "is_valid_ss58_address", - "load_runtime", - "make_payload", "query", "query_map", "query_multi", "query_multiple", - "reload_type_registry", - "retrieve_extrinsic_by_hash", "retrieve_extrinsic_by_identifier", "rpc_request", "runtime_call", - "search_block_number", - "serialize_constant", - "serialize_module_call", - "serialize_module_error", - "serialize_module_event", - "serialize_storage_item", - "ss58_decode", - "ss58_encode", "submit_extrinsic", "subscribe_block_headers", "supports_rpc_method", @@ -132,14 +113,15 @@ def __init__( raise ConnectionError( f"Unable to connect at any chains specified: {[url] + fallback_chains}" ) + self._original_methods = { + method: getattr(self, method) for method in RETRY_METHODS + } for method in RETRY_METHODS: setattr(self, method, partial(self._retry, method)) - for property_ in RETRY_PROPS: - setattr(self, property_, partial(self._retry_property, property_)) def _retry(self, method, *args, **kwargs): try: - method_ = getattr(self, method) + method_ = self._original_methods[method] return method_(*args, **kwargs) except (MaxRetriesExceeded, ConnectionError, ConnectionRefusedError) as e: try: @@ -222,10 +204,11 @@ def __init__( retry_timeout=retry_timeout, max_retries=max_retries, ) + self._original_methods = { + method: getattr(self, method) for method in RETRY_METHODS + } for method in RETRY_METHODS: setattr(self, method, partial(self._retry, method)) - for property_ in RETRY_PROPS: - setattr(self, property_, partial(self._retry_property, property_)) def _reinstantiate_substrate(self, e: Optional[Exception] = None) -> None: next_network = next(self.fallback_chains) @@ -248,11 +231,8 @@ def _reinstantiate_substrate(self, e: Optional[Exception] = None) -> None: async def _retry(self, method, *args, **kwargs): try: - method_ = getattr(self, method) - if asyncio.iscoroutinefunction(method_): - return await method_(*args, **kwargs) - else: - return method_(*args, **kwargs) + method_ = self._original_methods[method] + return await method_(*args, **kwargs) except (MaxRetriesExceeded, ConnectionError, ConnectionRefusedError) as e: try: self._reinstantiate_substrate(e) From 91bfe7a9fa86e09a2d25cc8f96833bb2e2aa93c5 Mon Sep 17 00:00:00 2001 From: Benjamin Himes Date: Tue, 6 May 2025 19:17:57 +0200 Subject: [PATCH 08/13] Sync substrate retry working. --- async_substrate_interface/substrate_addons.py | 62 +++++++++++-------- async_substrate_interface/sync_substrate.py | 14 +++-- 2 files changed, 47 insertions(+), 29 deletions(-) diff --git a/async_substrate_interface/substrate_addons.py b/async_substrate_interface/substrate_addons.py index 7eb9628..bbca1e8 100644 --- a/async_substrate_interface/substrate_addons.py +++ b/async_substrate_interface/substrate_addons.py @@ -3,11 +3,14 @@ from functools import partial from itertools import cycle from typing import Optional +from websockets.exceptions import ConnectionClosed from async_substrate_interface.async_substrate import AsyncSubstrateInterface from async_substrate_interface.errors import MaxRetriesExceeded from async_substrate_interface.sync_substrate import SubstrateInterface +logger = logging.getLogger("async_substrate_interface") + RETRY_METHODS = [ "_get_block_handler", @@ -106,30 +109,32 @@ def __init__( max_retries=max_retries, ) initialized = True + logger.info(f"Connected to {chain_url}") break except ConnectionError: - logging.warning(f"Unable to connect to {chain_url}") + logger.warning(f"Unable to connect to {chain_url}") if not initialized: raise ConnectionError( f"Unable to connect at any chains specified: {[url] + fallback_chains}" ) + retry_methods = ["connect"] + RETRY_METHODS self._original_methods = { - method: getattr(self, method) for method in RETRY_METHODS + method: getattr(self, method) for method in retry_methods } - for method in RETRY_METHODS: + for method in retry_methods: setattr(self, method, partial(self._retry, method)) def _retry(self, method, *args, **kwargs): try: method_ = self._original_methods[method] return method_(*args, **kwargs) - except (MaxRetriesExceeded, ConnectionError, ConnectionRefusedError) as e: + except (MaxRetriesExceeded, ConnectionError, EOFError, ConnectionClosed) as e: try: self._reinstantiate_substrate(e) - method_ = getattr(self, method) - return self._retry(method_(*args, **kwargs)) + method_ = self._original_methods[method] + return method_(*args, **kwargs) except StopIteration: - logging.error( + logger.error( f"Max retries exceeded with {self.url}. No more fallback chains." ) raise MaxRetriesExceeded @@ -137,34 +142,31 @@ def _retry(self, method, *args, **kwargs): def _retry_property(self, property_): try: return getattr(self, property_) - except (MaxRetriesExceeded, ConnectionError, ConnectionRefusedError) as e: + except (MaxRetriesExceeded, ConnectionError, EOFError, ConnectionClosed) as e: try: self._reinstantiate_substrate(e) return self._retry_property(property_) except StopIteration: - logging.error( + logger.error( f"Max retries exceeded with {self.url}. No more fallback chains." ) raise MaxRetriesExceeded def _reinstantiate_substrate(self, e: Optional[Exception] = None) -> None: next_network = next(self.fallback_chains) + self.ws.close() if e.__class__ == MaxRetriesExceeded: - logging.error( + logger.error( f"Max retries exceeded with {self.url}. Retrying with {next_network}." ) else: - print(f"Connection error. Trying again with {next_network}") - super().__init__( - url=next_network, - ss58_format=self.ss58_format, - type_registry=self.type_registry, - use_remote_preset=self.use_remote_preset, - chain_name=self.chain_name, - _mock=self._mock, - retry_timeout=self.retry_timeout, - max_retries=self.max_retries, - ) + logger.error(f"Connection error. Trying again with {next_network}") + self.url = next_network + self.chain_endpoint = next_network + self.initialized = False + self.ws = self.connect(init=True) + if not self._mock: + self.initialize() class RetryAsyncSubstrate(AsyncSubstrateInterface): @@ -213,7 +215,7 @@ def __init__( def _reinstantiate_substrate(self, e: Optional[Exception] = None) -> None: next_network = next(self.fallback_chains) if e.__class__ == MaxRetriesExceeded: - logging.error( + logger.error( f"Max retries exceeded with {self.url}. Retrying with {next_network}." ) else: @@ -228,12 +230,22 @@ def _reinstantiate_substrate(self, e: Optional[Exception] = None) -> None: retry_timeout=self.retry_timeout, max_retries=self.max_retries, ) + self._original_methods = { + method: getattr(self, method) for method in RETRY_METHODS + } + for method in RETRY_METHODS: + setattr(self, method, partial(self._retry, method)) async def _retry(self, method, *args, **kwargs): try: method_ = self._original_methods[method] return await method_(*args, **kwargs) - except (MaxRetriesExceeded, ConnectionError, ConnectionRefusedError) as e: + except ( + MaxRetriesExceeded, + ConnectionError, + ConnectionRefusedError, + EOFError, + ) as e: try: self._reinstantiate_substrate(e) await self.initialize() @@ -243,7 +255,7 @@ async def _retry(self, method, *args, **kwargs): else: return method_(*args, **kwargs) except StopIteration: - logging.error( + logger.error( f"Max retries exceeded with {self.url}. No more fallback chains." ) raise MaxRetriesExceeded @@ -256,7 +268,7 @@ async def _retry_property(self, property_): self._reinstantiate_substrate(e) return await self._retry_property(property_) except StopIteration: - logging.error( + logger.error( f"Max retries exceeded with {self.url}. No more fallback chains." ) raise MaxRetriesExceeded diff --git a/async_substrate_interface/sync_substrate.py b/async_substrate_interface/sync_substrate.py index 21664d4..221a04a 100644 --- a/async_substrate_interface/sync_substrate.py +++ b/async_substrate_interface/sync_substrate.py @@ -1,5 +1,6 @@ import functools import logging +import socket from hashlib import blake2b from typing import Optional, Union, Callable, Any @@ -511,7 +512,6 @@ def __init__( "strict_scale_decode": True, } self.initialized = False - self._forgettable_task = None self.ss58_format = ss58_format self.type_registry = type_registry self.type_registry_preset = type_registry_preset @@ -587,13 +587,19 @@ def name(self): def connect(self, init=False): if init is True: - return connect(self.chain_endpoint, max_size=self.ws_max_size) + try: + return connect(self.chain_endpoint, max_size=self.ws_max_size) + except (ConnectionError, socket.gaierror) as e: + raise ConnectionError(e) else: if not self.ws.close_code: return self.ws else: - self.ws = connect(self.chain_endpoint, max_size=self.ws_max_size) - return self.ws + try: + self.ws = connect(self.chain_endpoint, max_size=self.ws_max_size) + return self.ws + except (ConnectionError, socket.gaierror) as e: + raise ConnectionError(e) def get_storage_item( self, module: str, storage_function: str, block_hash: str = None From 3f50aaff7a5f9faabe41ba71b8bccd3b971c66d5 Mon Sep 17 00:00:00 2001 From: Benjamin Himes Date: Tue, 6 May 2025 20:52:42 +0200 Subject: [PATCH 09/13] Async also working. --- async_substrate_interface/async_substrate.py | 17 +- async_substrate_interface/substrate_addons.py | 151 +++++++++++------- 2 files changed, 104 insertions(+), 64 deletions(-) diff --git a/async_substrate_interface/async_substrate.py b/async_substrate_interface/async_substrate.py index 8df2341..f10b582 100644 --- a/async_substrate_interface/async_substrate.py +++ b/async_substrate_interface/async_substrate.py @@ -734,15 +734,14 @@ async def initialize(self): """ Initialize the connection to the chain. """ - async with self._lock: - self._initializing = True - if not self.initialized: - if not self._chain: - chain = await self.rpc_request("system_chain", []) - self._chain = chain.get("result") - await self.init_runtime() - self.initialized = True - self._initializing = False + self._initializing = True + if not self.initialized: + if not self._chain: + chain = await self.rpc_request("system_chain", []) + self._chain = chain.get("result") + await self.init_runtime() + self.initialized = True + self._initializing = False async def __aexit__(self, exc_type, exc_val, exc_tb): pass diff --git a/async_substrate_interface/substrate_addons.py b/async_substrate_interface/substrate_addons.py index bbca1e8..f36670b 100644 --- a/async_substrate_interface/substrate_addons.py +++ b/async_substrate_interface/substrate_addons.py @@ -1,11 +1,18 @@ +""" +A number of "plugins" for SubstrateInterface (and AsyncSubstrateInterface). At initial creation, it contains only +Retry (sync and async versions). +""" + import asyncio import logging +import socket from functools import partial from itertools import cycle from typing import Optional + from websockets.exceptions import ConnectionClosed -from async_substrate_interface.async_substrate import AsyncSubstrateInterface +from async_substrate_interface.async_substrate import AsyncSubstrateInterface, Websocket from async_substrate_interface.errors import MaxRetriesExceeded from async_substrate_interface.sync_substrate import SubstrateInterface @@ -69,6 +76,34 @@ class RetrySyncSubstrate(SubstrateInterface): + """ + A subclass of SubstrateInterface that allows for handling chain failures by using backup chains. If a sustained + network failure is encountered on a chain endpoint, the object will initialize a new connection on the next chain in + the `fallback_chains` list. If the `retry_forever` flag is set, upon reaching the last chain in `fallback_chains`, + the connection will attempt to iterate over the list (starting with `url`) again. + + E.g. + ``` + substrate = RetrySyncSubstrate( + "wss://entrypoint-finney.opentensor.ai:443", + fallback_chains=["ws://127.0.0.1:9946"] + ) + ``` + In this case, if there is a failure on entrypoint-finney, the connection will next attempt to hit localhost. If this + also fails, a `MaxRetriesExceeded` exception will be raised. + + ``` + substrate = RetrySyncSubstrate( + "wss://entrypoint-finney.opentensor.ai:443", + fallback_chains=["ws://127.0.0.1:9946"], + retry_forever=True + ) + ``` + In this case, rather than a MaxRetriesExceeded exception being raised upon failure of the second chain (localhost), + the object will again being to initialize a new connection on entrypoint-finney, and then localhost, and so on and + so forth. + """ + def __init__( self, url: str, @@ -117,6 +152,7 @@ def __init__( raise ConnectionError( f"Unable to connect at any chains specified: {[url] + fallback_chains}" ) + # "connect" is only used by SubstrateInterface, not AsyncSubstrateInterface retry_methods = ["connect"] + RETRY_METHODS self._original_methods = { method: getattr(self, method) for method in retry_methods @@ -125,13 +161,12 @@ def __init__( setattr(self, method, partial(self._retry, method)) def _retry(self, method, *args, **kwargs): + method_ = self._original_methods[method] try: - method_ = self._original_methods[method] return method_(*args, **kwargs) except (MaxRetriesExceeded, ConnectionError, EOFError, ConnectionClosed) as e: try: self._reinstantiate_substrate(e) - method_ = self._original_methods[method] return method_(*args, **kwargs) except StopIteration: logger.error( @@ -139,19 +174,6 @@ def _retry(self, method, *args, **kwargs): ) raise MaxRetriesExceeded - def _retry_property(self, property_): - try: - return getattr(self, property_) - except (MaxRetriesExceeded, ConnectionError, EOFError, ConnectionClosed) as e: - try: - self._reinstantiate_substrate(e) - return self._retry_property(property_) - except StopIteration: - logger.error( - f"Max retries exceeded with {self.url}. No more fallback chains." - ) - raise MaxRetriesExceeded - def _reinstantiate_substrate(self, e: Optional[Exception] = None) -> None: next_network = next(self.fallback_chains) self.ws.close() @@ -170,6 +192,34 @@ def _reinstantiate_substrate(self, e: Optional[Exception] = None) -> None: class RetryAsyncSubstrate(AsyncSubstrateInterface): + """ + A subclass of AsyncSubstrateInterface that allows for handling chain failures by using backup chains. If a + sustained network failure is encountered on a chain endpoint, the object will initialize a new connection on + the next chain in the `fallback_chains` list. If the `retry_forever` flag is set, upon reaching the last chain + in `fallback_chains`, the connection will attempt to iterate over the list (starting with `url`) again. + + E.g. + ``` + substrate = RetryAsyncSubstrate( + "wss://entrypoint-finney.opentensor.ai:443", + fallback_chains=["ws://127.0.0.1:9946"] + ) + ``` + In this case, if there is a failure on entrypoint-finney, the connection will next attempt to hit localhost. If this + also fails, a `MaxRetriesExceeded` exception will be raised. + + ``` + substrate = RetryAsyncSubstrate( + "wss://entrypoint-finney.opentensor.ai:443", + fallback_chains=["ws://127.0.0.1:9946"], + retry_forever=True + ) + ``` + In this case, rather than a MaxRetriesExceeded exception being raised upon failure of the second chain (localhost), + the object will again being to initialize a new connection on entrypoint-finney, and then localhost, and so on and + so forth. + """ + def __init__( self, url: str, @@ -212,62 +262,53 @@ def __init__( for method in RETRY_METHODS: setattr(self, method, partial(self._retry, method)) - def _reinstantiate_substrate(self, e: Optional[Exception] = None) -> None: + async def _reinstantiate_substrate(self, e: Optional[Exception] = None) -> None: next_network = next(self.fallback_chains) if e.__class__ == MaxRetriesExceeded: logger.error( f"Max retries exceeded with {self.url}. Retrying with {next_network}." ) else: - print(f"Connection error. Trying again with {next_network}") - super().__init__( - url=next_network, - ss58_format=self.ss58_format, - type_registry=self.type_registry, - use_remote_preset=self.use_remote_preset, - chain_name=self.chain_name, - _mock=self._mock, - retry_timeout=self.retry_timeout, - max_retries=self.max_retries, + logger.error(f"Connection error. Trying again with {next_network}") + try: + await self.ws.shutdown() + except AttributeError: + pass + if self._forgettable_task is not None: + self._forgettable_task: asyncio.Task + self._forgettable_task.cancel() + try: + await self._forgettable_task + except asyncio.CancelledError: + pass + self.chain_endpoint = next_network + self.url = next_network + self.ws = Websocket( + next_network, + options={ + "max_size": self.ws_max_size, + "write_limit": 2**16, + }, ) - self._original_methods = { - method: getattr(self, method) for method in RETRY_METHODS - } - for method in RETRY_METHODS: - setattr(self, method, partial(self._retry, method)) + self._initialized = False + self._initializing = False + await self.initialize() async def _retry(self, method, *args, **kwargs): + method_ = self._original_methods[method] try: - method_ = self._original_methods[method] return await method_(*args, **kwargs) except ( MaxRetriesExceeded, ConnectionError, - ConnectionRefusedError, + ConnectionClosed, EOFError, + socket.gaierror, ) as e: try: - self._reinstantiate_substrate(e) - await self.initialize() - method_ = getattr(self, method) - if asyncio.iscoroutinefunction(method_): - return await method_(*args, **kwargs) - else: - return method_(*args, **kwargs) - except StopIteration: - logger.error( - f"Max retries exceeded with {self.url}. No more fallback chains." - ) - raise MaxRetriesExceeded - - async def _retry_property(self, property_): - try: - return await getattr(self, property_) - except (MaxRetriesExceeded, ConnectionError, ConnectionRefusedError) as e: - try: - self._reinstantiate_substrate(e) - return await self._retry_property(property_) - except StopIteration: + await self._reinstantiate_substrate(e) + return await method_(*args, **kwargs) + except StopAsyncIteration: logger.error( f"Max retries exceeded with {self.url}. No more fallback chains." ) From 767e122570b10a2768f95ee2109ffd15eb53cfd4 Mon Sep 17 00:00:00 2001 From: Benjamin Himes Date: Tue, 6 May 2025 23:03:38 +0200 Subject: [PATCH 10/13] [WIP] tests --- tests/conftest.py | 24 +++++++++++++++++++++++ tests/test_substrate_addons.py | 36 ++++++++++++++++++++++++++++++++++ 2 files changed, 60 insertions(+) create mode 100644 tests/conftest.py create mode 100644 tests/test_substrate_addons.py diff --git a/tests/conftest.py b/tests/conftest.py new file mode 100644 index 0000000..16f0050 --- /dev/null +++ b/tests/conftest.py @@ -0,0 +1,24 @@ +import subprocess +import time + +CONTAINER_NAME_PREFIX = "test_local_chain_" +LOCALNET_IMAGE_NAME = "ghcr.io/opentensor/subtensor-localnet:devnet-ready" + + +def start_docker_container(exposed_port, name_salt: str): + container_name = f"{CONTAINER_NAME_PREFIX}{name_salt}" + + # Command to start container + cmds = [ + "docker", + "run", + "--rm", + "--name", + container_name, + "-p", + f"{exposed_port}:9945", + LOCALNET_IMAGE_NAME, + ] + + proc = subprocess.Popen(cmds, stdout=subprocess.PIPE, stderr=subprocess.PIPE) + return proc, container_name diff --git a/tests/test_substrate_addons.py b/tests/test_substrate_addons.py new file mode 100644 index 0000000..904f36d --- /dev/null +++ b/tests/test_substrate_addons.py @@ -0,0 +1,36 @@ +import threading +import subprocess + +import pytest +import time + +from async_substrate_interface.substrate_addons import RetrySyncSubstrate +from tests.conftest import start_docker_container + + +@pytest.fixture(scope="function") +def start_containers(): + # Store our subprocesses globally + processes = (start_docker_container(9945, 9945), start_docker_container(9946, 9946)) + yield processes + + # To stop the instances, you can iterate over the processes and kill them: + for process in processes: + subprocess.run(["docker", "kill", process[1]]) + process[0].kill() + + +def test_retry_sync_substrate(start_containers): + container1, container2 = start_containers + time.sleep(10) + with RetrySyncSubstrate( + "ws://127.0.0.1:9945", fallback_chains=["ws://127.0.0.1:9946"] + ) as substrate: + for i in range(10): + assert substrate.get_chain_head().startswith("0x") + if i == 8: + subprocess.run(["docker", "kill", container1[1]]) + time.sleep(10) + if i > 8: + assert substrate.chain_endpoint == "ws://127.0.0.1:9946" + time.sleep(2) From d3f647350c199901c17a0dfe5af5ec848f8e6084 Mon Sep 17 00:00:00 2001 From: Benjamin Himes Date: Tue, 6 May 2025 23:24:14 +0200 Subject: [PATCH 11/13] improved test a bit --- tests/conftest.py | 6 ++++-- tests/test_substrate_addons.py | 30 ++++++++++++++++++------------ 2 files changed, 22 insertions(+), 14 deletions(-) diff --git a/tests/conftest.py b/tests/conftest.py index 16f0050..0e312e3 100644 --- a/tests/conftest.py +++ b/tests/conftest.py @@ -1,9 +1,11 @@ import subprocess -import time +from collections import namedtuple CONTAINER_NAME_PREFIX = "test_local_chain_" LOCALNET_IMAGE_NAME = "ghcr.io/opentensor/subtensor-localnet:devnet-ready" +Container = namedtuple("Container", ["process", "name", "uri"]) + def start_docker_container(exposed_port, name_salt: str): container_name = f"{CONTAINER_NAME_PREFIX}{name_salt}" @@ -21,4 +23,4 @@ def start_docker_container(exposed_port, name_salt: str): ] proc = subprocess.Popen(cmds, stdout=subprocess.PIPE, stderr=subprocess.PIPE) - return proc, container_name + return Container(proc, container_name, f"ws://127.0.0.1:{exposed_port}") diff --git a/tests/test_substrate_addons.py b/tests/test_substrate_addons.py index 904f36d..9ce3fee 100644 --- a/tests/test_substrate_addons.py +++ b/tests/test_substrate_addons.py @@ -9,28 +9,34 @@ @pytest.fixture(scope="function") -def start_containers(): - # Store our subprocesses globally +def docker_containers(): processes = (start_docker_container(9945, 9945), start_docker_container(9946, 9946)) - yield processes + try: + yield processes - # To stop the instances, you can iterate over the processes and kill them: - for process in processes: - subprocess.run(["docker", "kill", process[1]]) - process[0].kill() + finally: + for process in processes: + subprocess.run(["docker", "kill", process[1]]) + process[0].kill() -def test_retry_sync_substrate(start_containers): - container1, container2 = start_containers +def test_retry_sync_substrate(docker_containers): time.sleep(10) with RetrySyncSubstrate( - "ws://127.0.0.1:9945", fallback_chains=["ws://127.0.0.1:9946"] + docker_containers[0].uri, fallback_chains=[docker_containers[1].uri] ) as substrate: for i in range(10): assert substrate.get_chain_head().startswith("0x") if i == 8: - subprocess.run(["docker", "kill", container1[1]]) + subprocess.run(["docker", "stop", docker_containers[0].name]) time.sleep(10) if i > 8: - assert substrate.chain_endpoint == "ws://127.0.0.1:9946" + assert substrate.chain_endpoint == docker_containers[1].uri time.sleep(2) + + +def test_retry_sync_substrate_offline(): + with pytest.raises(ConnectionError): + RetrySyncSubstrate( + "ws://127.0.0.1:9945", fallback_chains=["ws://127.0.0.1:9946"] + ) From 1baab0f43ab5249052ffb6563765008c448e66a4 Mon Sep 17 00:00:00 2001 From: Benjamin Himes Date: Wed, 7 May 2025 20:42:46 +0200 Subject: [PATCH 12/13] Add `chain_endpoint` and `url` prior to super init. --- async_substrate_interface/substrate_addons.py | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/async_substrate_interface/substrate_addons.py b/async_substrate_interface/substrate_addons.py index f36670b..be611c4 100644 --- a/async_substrate_interface/substrate_addons.py +++ b/async_substrate_interface/substrate_addons.py @@ -129,9 +129,13 @@ def __init__( self._mock = _mock self.retry_timeout = retry_timeout self.max_retries = max_retries + self.chain_endpoint = url + self.url = url initialized = False for chain_url in [url] + fallback_chains: try: + self.chain_endpoint = chain_url + self.url = chain_url super().__init__( url=chain_url, ss58_format=ss58_format, From c9a13ff41149045f4a4ab170f491df387ce6146a Mon Sep 17 00:00:00 2001 From: Benjamin Himes Date: Wed, 7 May 2025 21:43:01 +0200 Subject: [PATCH 13/13] More tests. --- async_substrate_interface/substrate_addons.py | 8 +++- tests/test_substrate_addons.py | 44 ++++++++++++++++--- 2 files changed, 45 insertions(+), 7 deletions(-) diff --git a/async_substrate_interface/substrate_addons.py b/async_substrate_interface/substrate_addons.py index be611c4..5edb26a 100644 --- a/async_substrate_interface/substrate_addons.py +++ b/async_substrate_interface/substrate_addons.py @@ -168,7 +168,13 @@ def _retry(self, method, *args, **kwargs): method_ = self._original_methods[method] try: return method_(*args, **kwargs) - except (MaxRetriesExceeded, ConnectionError, EOFError, ConnectionClosed) as e: + except ( + MaxRetriesExceeded, + ConnectionError, + EOFError, + ConnectionClosed, + TimeoutError, + ) as e: try: self._reinstantiate_substrate(e) return method_(*args, **kwargs) diff --git a/tests/test_substrate_addons.py b/tests/test_substrate_addons.py index 9ce3fee..686a028 100644 --- a/tests/test_substrate_addons.py +++ b/tests/test_substrate_addons.py @@ -5,8 +5,11 @@ import time from async_substrate_interface.substrate_addons import RetrySyncSubstrate +from async_substrate_interface.errors import MaxRetriesExceeded from tests.conftest import start_docker_container +LATENT_LITE_ENTRYPOINT = "wss://lite.sub.latent.to:443" + @pytest.fixture(scope="function") def docker_containers(): @@ -16,22 +19,51 @@ def docker_containers(): finally: for process in processes: - subprocess.run(["docker", "kill", process[1]]) - process[0].kill() + subprocess.run(["docker", "kill", process.name]) + process.process.kill() + + +@pytest.fixture(scope="function") +def single_local_chain(): + process = start_docker_container(9945, 9945) + try: + yield process + finally: + print("TRIGGERED KILL") + subprocess.run(["docker", "kill", process.name]) + process.process.kill() -def test_retry_sync_substrate(docker_containers): +def test_retry_sync_substrate(single_local_chain): time.sleep(10) with RetrySyncSubstrate( - docker_containers[0].uri, fallback_chains=[docker_containers[1].uri] + single_local_chain.uri, fallback_chains=[LATENT_LITE_ENTRYPOINT] ) as substrate: for i in range(10): assert substrate.get_chain_head().startswith("0x") if i == 8: - subprocess.run(["docker", "stop", docker_containers[0].name]) - time.sleep(10) + subprocess.run(["docker", "stop", single_local_chain.name]) if i > 8: + assert substrate.chain_endpoint == LATENT_LITE_ENTRYPOINT + time.sleep(2) + + +def test_retry_sync_substrate_max_retries(docker_containers): + time.sleep(10) + with RetrySyncSubstrate( + docker_containers[0].uri, fallback_chains=[docker_containers[1].uri] + ) as substrate: + for i in range(5): + print("EYE EQUALS", i) + assert substrate.get_chain_head().startswith("0x") + if i == 2: + subprocess.run(["docker", "pause", docker_containers[0].name]) + if i == 3: assert substrate.chain_endpoint == docker_containers[1].uri + if i == 4: + subprocess.run(["docker", "pause", docker_containers[1].name]) + with pytest.raises(MaxRetriesExceeded): + substrate.get_chain_head().startswith("0x") time.sleep(2)