Skip to content

Commit d6aa47d

Browse files
committed
[WIP] moving retry to async-substrate-interface
1 parent b67bf78 commit d6aa47d

File tree

1 file changed

+202
-0
lines changed

1 file changed

+202
-0
lines changed

async_substrate_interface/types.py

Lines changed: 202 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,8 @@
44
from collections.abc import Iterable
55
from dataclasses import dataclass
66
from datetime import datetime
7+
from functools import partial
8+
from itertools import cycle
79
from typing import Optional, Union, Any
810

911
from bt_decode import PortableRegistry, encode as encode_by_type_string
@@ -13,6 +15,7 @@
1315
from scalecodec.type_registry import load_type_registry_preset
1416
from scalecodec.types import GenericCall, ScaleType
1517

18+
from .errors import MaxRetriesExceeded
1619
from .utils import json
1720

1821

@@ -358,6 +361,205 @@ def serialize(self):
358361
def decode(self):
359362
return self.value
360363

364+
SubstrateClass = Type[Union[SubstrateInterface, AsyncSubstrateInterface]]
365+
366+
367+
class RetrySubstrate:
368+
def __init__(
369+
self,
370+
substrate: SubstrateClass,
371+
main_url: str,
372+
ss58_format: int,
373+
type_registry: dict,
374+
use_remote_preset: bool,
375+
chain_name: str,
376+
_mock: bool,
377+
fallback_chains: Optional[list[str]] = None,
378+
retry_forever: bool = False,
379+
):
380+
fallback_chains = fallback_chains or []
381+
self._substrate_class: SubstrateClass = substrate
382+
self.ss58_format: int = ss58_format
383+
self.type_registry: dict = type_registry
384+
self.use_remote_preset: bool = use_remote_preset
385+
self.chain_name: str = chain_name
386+
self._mock = _mock
387+
self.fallback_chains = (
388+
iter(fallback_chains)
389+
if not retry_forever
390+
else cycle(fallback_chains + [main_url])
391+
)
392+
initialized = False
393+
for chain_url in [main_url] + fallback_chains:
394+
try:
395+
self._substrate = self._substrate_class(
396+
url=chain_url,
397+
ss58_format=ss58_format,
398+
type_registry=type_registry,
399+
use_remote_preset=use_remote_preset,
400+
chain_name=chain_name,
401+
_mock=_mock,
402+
)
403+
initialized = True
404+
break
405+
except ConnectionError:
406+
logging.warning(f"Unable to connect to {chain_url}")
407+
if not initialized:
408+
raise ConnectionError(
409+
f"Unable to connect at any chains specified: {[main_url]+fallback_chains}"
410+
)
411+
412+
# retries
413+
414+
# TODO: properties that need retry logic
415+
# properties
416+
# version
417+
# token_decimals
418+
# token_symbol
419+
# name
420+
421+
retry = (
422+
self._async_retry
423+
if self._substrate_class == AsyncSubstrateInterface
424+
else self._retry
425+
)
426+
427+
self._get_block_handler = partial(retry, "_get_block_handler")
428+
self.apply_type_registry_presets = partial(retry, "apply_type_registry_presets")
429+
self.close = partial(retry, "close")
430+
self.compose_call = partial(retry, "compose_call")
431+
self.connect = partial(retry, "connect")
432+
self.create_scale_object = partial(retry, "create_scale_object")
433+
self.create_signed_extrinsic = partial(retry, "create_signed_extrinsic")
434+
self.create_storage_key = partial(retry, "create_storage_key")
435+
self.decode_scale = partial(retry, "decode_scale")
436+
self.encode_scale = partial(retry, "encode_scale")
437+
self.extension_call = partial(retry, "extension_call")
438+
self.filter_events = partial(retry, "filter_events")
439+
self.filter_extrinsics = partial(retry, "filter_extrinsics")
440+
self.generate_signature_payload = partial(retry, "generate_signature_payload")
441+
self.get_account_next_index = partial(retry, "get_account_next_index")
442+
self.get_account_nonce = partial(retry, "get_account_nonce")
443+
self.get_block = partial(retry, "get_block")
444+
self.get_block_hash = partial(retry, "get_block_hash")
445+
self.get_block_header = partial(retry, "get_block_header")
446+
self.get_block_metadata = partial(retry, "get_block_metadata")
447+
self.get_block_number = partial(retry, "get_block_number")
448+
self.get_block_runtime_info = partial(retry, "get_block_runtime_info")
449+
self.get_block_runtime_version_for = partial(
450+
retry, "get_block_runtime_version_for"
451+
)
452+
self.get_block_timestamp = partial(retry, "get_block_timestamp")
453+
self.get_chain_finalised_head = partial(retry, "get_chain_finalised_head")
454+
self.get_chain_head = partial(retry, "get_chain_head")
455+
self.get_constant = partial(retry, "get_constant")
456+
self.get_events = partial(retry, "get_events")
457+
self.get_extrinsics = partial(retry, "get_extrinsics")
458+
self.get_metadata_call_function = partial(retry, "get_metadata_call_function")
459+
self.get_metadata_constant = partial(retry, "get_metadata_constant")
460+
self.get_metadata_error = partial(retry, "get_metadata_error")
461+
self.get_metadata_errors = partial(retry, "get_metadata_errors")
462+
self.get_metadata_module = partial(retry, "get_metadata_module")
463+
self.get_metadata_modules = partial(retry, "get_metadata_modules")
464+
self.get_metadata_runtime_call_function = partial(
465+
retry, "get_metadata_runtime_call_function"
466+
)
467+
self.get_metadata_runtime_call_functions = partial(
468+
retry, "get_metadata_runtime_call_functions"
469+
)
470+
self.get_metadata_storage_function = partial(
471+
retry, "get_metadata_storage_function"
472+
)
473+
self.get_metadata_storage_functions = partial(
474+
retry, "get_metadata_storage_functions"
475+
)
476+
self.get_parent_block_hash = partial(retry, "get_parent_block_hash")
477+
self.get_payment_info = partial(retry, "get_payment_info")
478+
self.get_storage_item = partial(retry, "get_storage_item")
479+
self.get_type_definition = partial(retry, "get_type_definition")
480+
self.get_type_registry = partial(retry, "get_type_registry")
481+
self.init_runtime = partial(retry, "init_runtime")
482+
self.initialize = partial(retry, "initialize")
483+
self.is_valid_ss58_address = partial(retry, "is_valid_ss58_address")
484+
self.load_runtime = partial(retry, "load_runtime")
485+
self.make_payload = partial(retry, "make_payload")
486+
self.query = partial(retry, "query")
487+
self.query_map = partial(retry, "query_map")
488+
self.query_multi = partial(retry, "query_multi")
489+
self.query_multiple = partial(retry, "query_multiple")
490+
self.reload_type_registry = partial(retry, "reload_type_registry")
491+
self.retrieve_extrinsic_by_hash = partial(retry, "retrieve_extrinsic_by_hash")
492+
self.retrieve_extrinsic_by_identifier = partial(
493+
retry, "retrieve_extrinsic_by_identifier"
494+
)
495+
self.rpc_request = partial(retry, "rpc_request")
496+
self.runtime_call = partial(retry, "runtime_call")
497+
self.search_block_number = partial(retry, "search_block_number")
498+
self.serialize_constant = partial(retry, "serialize_constant")
499+
self.serialize_module_call = partial(retry, "serialize_module_call")
500+
self.serialize_module_error = partial(retry, "serialize_module_error")
501+
self.serialize_module_event = partial(retry, "serialize_module_event")
502+
self.serialize_storage_item = partial(retry, "serialize_storage_item")
503+
self.ss58_decode = partial(retry, "ss58_decode")
504+
self.ss58_encode = partial(retry, "ss58_encode")
505+
self.submit_extrinsic = partial(retry, "submit_extrinsic")
506+
self.subscribe_block_headers = partial(retry, "subscribe_block_headers")
507+
self.supports_rpc_method = partial(retry, "supports_rpc_method")
508+
self.ws = self._substrate.ws
509+
510+
def _retry(self, method, *args, **kwargs):
511+
try:
512+
method_ = getattr(self._substrate, method)
513+
return method_(*args, **kwargs)
514+
except (MaxRetriesExceeded, ConnectionError, ConnectionRefusedError) as e:
515+
try:
516+
self._reinstantiate_substrate()
517+
method_ = getattr(self._substrate, method)
518+
return self._retry(method_(*args, **kwargs))
519+
except StopIteration:
520+
logging.error(
521+
f"Max retries exceeded with {self._substrate.url}. No more fallback chains."
522+
)
523+
raise MaxRetriesExceeded
524+
525+
async def _async_retry(self, method, *args, **kwargs):
526+
try:
527+
method_ = getattr(self._substrate, method)
528+
if asyncio.iscoroutinefunction(method_):
529+
return await method_(*args, **kwargs)
530+
else:
531+
return method_(*args, **kwargs)
532+
except (MaxRetriesExceeded, ConnectionError, ConnectionRefusedError) as e:
533+
try:
534+
self._reinstantiate_substrate(e)
535+
method_ = getattr(self._substrate, method)
536+
if asyncio.iscoroutinefunction(method_):
537+
return await method_(*args, **kwargs)
538+
else:
539+
return method_(*args, **kwargs)
540+
except StopIteration:
541+
logging.error(
542+
f"Max retries exceeded with {self._substrate.url}. No more fallback chains."
543+
)
544+
raise MaxRetriesExceeded
545+
546+
def _reinstantiate_substrate(self, e: Optional[Exception] = None) -> None:
547+
next_network = next(self.fallback_chains)
548+
if e.__class__ == MaxRetriesExceeded:
549+
logging.error(
550+
f"Max retries exceeded with {self._substrate.url}. Retrying with {next_network}."
551+
)
552+
else:
553+
print(f"Connection error. Trying again with {next_network}")
554+
self._substrate = self._substrate_class(
555+
url=next_network,
556+
ss58_format=self.ss58_format,
557+
type_registry=self.type_registry,
558+
use_remote_preset=self.use_remote_preset,
559+
chain_name=self.chain_name,
560+
_mock=self._mock,
561+
)
562+
361563

362564
class SubstrateMixin(ABC):
363565
type_registry_preset = None

0 commit comments

Comments
 (0)