Skip to content

Commit 743f708

Browse files
committed
moved around some stuff
1 parent d6aa47d commit 743f708

File tree

2 files changed

+214
-199
lines changed

2 files changed

+214
-199
lines changed
Lines changed: 214 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,214 @@
1+
import asyncio
2+
import logging
3+
from functools import partial
4+
from itertools import cycle
5+
from typing import Optional, Type, Union
6+
7+
from async_substrate_interface.async_substrate import AsyncSubstrateInterface
8+
from async_substrate_interface.errors import MaxRetriesExceeded
9+
from async_substrate_interface.sync_substrate import SubstrateInterface
10+
11+
SubstrateClass = Type[Union[SubstrateInterface, AsyncSubstrateInterface]]
12+
13+
14+
class RetrySubstrate:
15+
def __init__(
16+
self,
17+
substrate: SubstrateClass,
18+
main_url: str,
19+
use_remote_preset: bool = False,
20+
fallback_chains: Optional[list[str]] = None,
21+
retry_forever: bool = False,
22+
ss58_format: Optional[int] = None,
23+
type_registry: Optional[dict] = None,
24+
type_registry_preset: Optional[str] = None,
25+
chain_name: str = "",
26+
max_retries: int = 5,
27+
retry_timeout: float = 60.0,
28+
_mock: bool = False,
29+
):
30+
fallback_chains = fallback_chains or []
31+
self._substrate_class: SubstrateClass = substrate
32+
self.ss58_format: int = ss58_format
33+
self.type_registry: dict = type_registry
34+
self.use_remote_preset: bool = use_remote_preset
35+
self.chain_name: Optional[str] = chain_name
36+
self.max_retries: int = max_retries
37+
self.retry_timeout: float = retry_timeout
38+
self._mock = _mock
39+
self.type_registry_preset: Optional[str] = type_registry_preset
40+
self.fallback_chains = (
41+
iter(fallback_chains)
42+
if not retry_forever
43+
else cycle(fallback_chains + [main_url])
44+
)
45+
initialized = False
46+
for chain_url in [main_url] + fallback_chains:
47+
try:
48+
self._substrate = self._substrate_class(
49+
url=chain_url,
50+
ss58_format=ss58_format,
51+
type_registry=type_registry,
52+
use_remote_preset=use_remote_preset,
53+
chain_name=chain_name,
54+
_mock=_mock,
55+
)
56+
initialized = True
57+
break
58+
except ConnectionError:
59+
logging.warning(f"Unable to connect to {chain_url}")
60+
if not initialized:
61+
raise ConnectionError(
62+
f"Unable to connect at any chains specified: {[main_url]+fallback_chains}"
63+
)
64+
65+
# retries
66+
67+
# TODO: properties that need retry logic
68+
# properties
69+
# version
70+
# token_decimals
71+
# token_symbol
72+
# name
73+
74+
retry = (
75+
self._async_retry
76+
if self._substrate_class == AsyncSubstrateInterface
77+
else self._retry
78+
)
79+
80+
self._get_block_handler = partial(retry, "_get_block_handler")
81+
self.apply_type_registry_presets = partial(retry, "apply_type_registry_presets")
82+
self.close = partial(retry, "close")
83+
self.compose_call = partial(retry, "compose_call")
84+
self.connect = partial(retry, "connect")
85+
self.create_scale_object = partial(retry, "create_scale_object")
86+
self.create_signed_extrinsic = partial(retry, "create_signed_extrinsic")
87+
self.create_storage_key = partial(retry, "create_storage_key")
88+
self.decode_scale = partial(retry, "decode_scale")
89+
self.encode_scale = partial(retry, "encode_scale")
90+
self.extension_call = partial(retry, "extension_call")
91+
self.filter_events = partial(retry, "filter_events")
92+
self.filter_extrinsics = partial(retry, "filter_extrinsics")
93+
self.generate_signature_payload = partial(retry, "generate_signature_payload")
94+
self.get_account_next_index = partial(retry, "get_account_next_index")
95+
self.get_account_nonce = partial(retry, "get_account_nonce")
96+
self.get_block = partial(retry, "get_block")
97+
self.get_block_hash = partial(retry, "get_block_hash")
98+
self.get_block_header = partial(retry, "get_block_header")
99+
self.get_block_metadata = partial(retry, "get_block_metadata")
100+
self.get_block_number = partial(retry, "get_block_number")
101+
self.get_block_runtime_info = partial(retry, "get_block_runtime_info")
102+
self.get_block_runtime_version_for = partial(
103+
retry, "get_block_runtime_version_for"
104+
)
105+
self.get_block_timestamp = partial(retry, "get_block_timestamp")
106+
self.get_chain_finalised_head = partial(retry, "get_chain_finalised_head")
107+
self.get_chain_head = partial(retry, "get_chain_head")
108+
self.get_constant = partial(retry, "get_constant")
109+
self.get_events = partial(retry, "get_events")
110+
self.get_extrinsics = partial(retry, "get_extrinsics")
111+
self.get_metadata_call_function = partial(retry, "get_metadata_call_function")
112+
self.get_metadata_constant = partial(retry, "get_metadata_constant")
113+
self.get_metadata_error = partial(retry, "get_metadata_error")
114+
self.get_metadata_errors = partial(retry, "get_metadata_errors")
115+
self.get_metadata_module = partial(retry, "get_metadata_module")
116+
self.get_metadata_modules = partial(retry, "get_metadata_modules")
117+
self.get_metadata_runtime_call_function = partial(
118+
retry, "get_metadata_runtime_call_function"
119+
)
120+
self.get_metadata_runtime_call_functions = partial(
121+
retry, "get_metadata_runtime_call_functions"
122+
)
123+
self.get_metadata_storage_function = partial(
124+
retry, "get_metadata_storage_function"
125+
)
126+
self.get_metadata_storage_functions = partial(
127+
retry, "get_metadata_storage_functions"
128+
)
129+
self.get_parent_block_hash = partial(retry, "get_parent_block_hash")
130+
self.get_payment_info = partial(retry, "get_payment_info")
131+
self.get_storage_item = partial(retry, "get_storage_item")
132+
self.get_type_definition = partial(retry, "get_type_definition")
133+
self.get_type_registry = partial(retry, "get_type_registry")
134+
self.init_runtime = partial(retry, "init_runtime")
135+
self.initialize = partial(retry, "initialize")
136+
self.is_valid_ss58_address = partial(retry, "is_valid_ss58_address")
137+
self.load_runtime = partial(retry, "load_runtime")
138+
self.make_payload = partial(retry, "make_payload")
139+
self.query = partial(retry, "query")
140+
self.query_map = partial(retry, "query_map")
141+
self.query_multi = partial(retry, "query_multi")
142+
self.query_multiple = partial(retry, "query_multiple")
143+
self.reload_type_registry = partial(retry, "reload_type_registry")
144+
self.retrieve_extrinsic_by_hash = partial(retry, "retrieve_extrinsic_by_hash")
145+
self.retrieve_extrinsic_by_identifier = partial(
146+
retry, "retrieve_extrinsic_by_identifier"
147+
)
148+
self.rpc_request = partial(retry, "rpc_request")
149+
self.runtime_call = partial(retry, "runtime_call")
150+
self.search_block_number = partial(retry, "search_block_number")
151+
self.serialize_constant = partial(retry, "serialize_constant")
152+
self.serialize_module_call = partial(retry, "serialize_module_call")
153+
self.serialize_module_error = partial(retry, "serialize_module_error")
154+
self.serialize_module_event = partial(retry, "serialize_module_event")
155+
self.serialize_storage_item = partial(retry, "serialize_storage_item")
156+
self.ss58_decode = partial(retry, "ss58_decode")
157+
self.ss58_encode = partial(retry, "ss58_encode")
158+
self.submit_extrinsic = partial(retry, "submit_extrinsic")
159+
self.subscribe_block_headers = partial(retry, "subscribe_block_headers")
160+
self.supports_rpc_method = partial(retry, "supports_rpc_method")
161+
self.ws = self._substrate.ws
162+
163+
def _retry(self, method, *args, **kwargs):
164+
try:
165+
method_ = getattr(self._substrate, method)
166+
return method_(*args, **kwargs)
167+
except (MaxRetriesExceeded, ConnectionError, ConnectionRefusedError) as e:
168+
try:
169+
self._reinstantiate_substrate(e)
170+
method_ = getattr(self._substrate, method)
171+
return self._retry(method_(*args, **kwargs))
172+
except StopIteration:
173+
logging.error(
174+
f"Max retries exceeded with {self._substrate.url}. No more fallback chains."
175+
)
176+
raise MaxRetriesExceeded
177+
178+
async def _async_retry(self, method, *args, **kwargs):
179+
try:
180+
method_ = getattr(self._substrate, method)
181+
if asyncio.iscoroutinefunction(method_):
182+
return await method_(*args, **kwargs)
183+
else:
184+
return method_(*args, **kwargs)
185+
except (MaxRetriesExceeded, ConnectionError, ConnectionRefusedError) as e:
186+
try:
187+
self._reinstantiate_substrate(e)
188+
method_ = getattr(self._substrate, method)
189+
if asyncio.iscoroutinefunction(method_):
190+
return await method_(*args, **kwargs)
191+
else:
192+
return method_(*args, **kwargs)
193+
except StopIteration:
194+
logging.error(
195+
f"Max retries exceeded with {self._substrate.url}. No more fallback chains."
196+
)
197+
raise MaxRetriesExceeded
198+
199+
def _reinstantiate_substrate(self, e: Optional[Exception] = None) -> None:
200+
next_network = next(self.fallback_chains)
201+
if e.__class__ == MaxRetriesExceeded:
202+
logging.error(
203+
f"Max retries exceeded with {self._substrate.url}. Retrying with {next_network}."
204+
)
205+
else:
206+
print(f"Connection error. Trying again with {next_network}")
207+
self._substrate = self._substrate_class(
208+
url=next_network,
209+
ss58_format=self.ss58_format,
210+
type_registry=self.type_registry,
211+
use_remote_preset=self.use_remote_preset,
212+
chain_name=self.chain_name,
213+
_mock=self._mock,
214+
)

0 commit comments

Comments
 (0)