Skip to content

Commit a1e8e29

Browse files
committed
Async seems to be working
1 parent 54ee09d commit a1e8e29

File tree

3 files changed

+148
-98
lines changed

3 files changed

+148
-98
lines changed

bittensor/core/async_subtensor.py

Lines changed: 15 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -79,7 +79,7 @@
7979
)
8080
from bittensor.core.metagraph import AsyncMetagraph
8181
from bittensor.core.settings import version_as_int, TYPE_REGISTRY
82-
from bittensor.core.types import ParamWithTypes, SubtensorMixin
82+
from bittensor.core.types import ParamWithTypes, SubtensorMixin, RetrySubstrate
8383
from bittensor.utils import (
8484
Certificate,
8585
decode_hex_identity_dict,
@@ -90,6 +90,7 @@
9090
u16_normalized_float,
9191
u64_normalized_float,
9292
unlock_key,
93+
determine_chain_endpoint_and_network,
9394
)
9495
from bittensor.utils.balance import (
9596
Balance,
@@ -115,6 +116,8 @@ def __init__(
115116
config: Optional["Config"] = None,
116117
_mock: bool = False,
117118
log_verbose: bool = False,
119+
fallback_chains: Optional[list[str]] = None,
120+
retry_forever: bool = False,
118121
):
119122
"""
120123
Initializes an instance of the AsyncSubtensor class.
@@ -124,17 +127,23 @@ def __init__(
124127
config (Optional[Config]): Configuration object for the AsyncSubtensor instance.
125128
_mock: Whether this is a mock instance. Mainly just for use in testing.
126129
log_verbose (bool): Enables or disables verbose logging.
130+
fallback_chains: list of chain urls to try if the initial one fails
131+
retry_forever: whether to continuously try the chains indefinitely if timeout failure
127132
128133
Raises:
129134
Any exceptions raised during the setup, configuration, or connection process.
130135
"""
136+
fallback_chains_ = fallback_chains or []
131137
if config is None:
132138
config = AsyncSubtensor.config()
133139
self._config = copy.deepcopy(config)
134140
self.chain_endpoint, self.network = AsyncSubtensor.setup_config(
135141
network, self._config
136142
)
137143
self._mock = _mock
144+
fallback_chain_urls = [
145+
determine_chain_endpoint_and_network(x)[1] for x in fallback_chains_
146+
]
138147

139148
self.log_verbose = log_verbose
140149
self._check_and_log_network_settings()
@@ -143,13 +152,16 @@ def __init__(
143152
f"Connecting to network: [blue]{self.network}[/blue], "
144153
f"chain_endpoint: [blue]{self.chain_endpoint}[/blue]..."
145154
)
146-
self.substrate = AsyncSubstrateInterface(
147-
url=self.chain_endpoint,
155+
self.substrate = RetrySubstrate(
156+
AsyncSubstrateInterface,
157+
main_url=self.chain_endpoint,
148158
ss58_format=SS58_FORMAT,
149159
type_registry=TYPE_REGISTRY,
150160
use_remote_preset=True,
151161
chain_name="Bittensor",
152162
_mock=_mock,
163+
fallback_chains=fallback_chain_urls,
164+
retry_forever=retry_forever,
153165
)
154166
if self.log_verbose:
155167
logging.info(

bittensor/core/axon.py

Lines changed: 15 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -504,9 +504,9 @@ def verify_custom(synapse: MyCustomSynapse):
504504
)
505505

506506
param_class = first_param.annotation
507-
assert issubclass(param_class, Synapse), (
508-
"The first argument of forward_fn must inherit from bittensor.Synapse"
509-
)
507+
assert issubclass(
508+
param_class, Synapse
509+
), "The first argument of forward_fn must inherit from bittensor.Synapse"
510510
request_name = param_class.__name__
511511

512512
async def endpoint(*args, **kwargs):
@@ -580,19 +580,19 @@ async def endpoint(*args, **kwargs):
580580
blacklist_sig = Signature(
581581
expected_params, return_annotation=Tuple[bool, str]
582582
)
583-
assert signature(blacklist_fn) == blacklist_sig, (
584-
f"The blacklist_fn function must have the signature: blacklist( synapse: {request_name} ) -> tuple[bool, str]"
585-
)
583+
assert (
584+
signature(blacklist_fn) == blacklist_sig
585+
), f"The blacklist_fn function must have the signature: blacklist( synapse: {request_name} ) -> tuple[bool, str]"
586586
if priority_fn:
587587
priority_sig = Signature(expected_params, return_annotation=float)
588-
assert signature(priority_fn) == priority_sig, (
589-
f"The priority_fn function must have the signature: priority( synapse: {request_name} ) -> float"
590-
)
588+
assert (
589+
signature(priority_fn) == priority_sig
590+
), f"The priority_fn function must have the signature: priority( synapse: {request_name} ) -> float"
591591
if verify_fn:
592592
verify_sig = Signature(expected_params, return_annotation=None)
593-
assert signature(verify_fn) == verify_sig, (
594-
f"The verify_fn function must have the signature: verify( synapse: {request_name} ) -> None"
595-
)
593+
assert (
594+
signature(verify_fn) == verify_sig
595+
), f"The verify_fn function must have the signature: verify( synapse: {request_name} ) -> None"
596596

597597
# Store functions in appropriate attribute dictionaries
598598
self.forward_class_types[request_name] = param_class
@@ -747,9 +747,9 @@ def check_config(cls, config: "Config"):
747747
Raises:
748748
AssertionError: If the axon or external ports are not in range [1024, 65535]
749749
"""
750-
assert 1024 < config.axon.port < 65535, (
751-
"Axon port must be in range [1024, 65535]"
752-
)
750+
assert (
751+
1024 < config.axon.port < 65535
752+
), "Axon port must be in range [1024, 65535]"
753753

754754
assert config.axon.external_port is None or (
755755
1024 < config.axon.external_port < 65535

bittensor/core/types.py

Lines changed: 118 additions & 80 deletions
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,4 @@
1+
import asyncio
12
from abc import ABC
23
import argparse
34
from functools import partial
@@ -31,6 +32,7 @@ def __init__(
3132
fallback_chains: Optional[list[str]] = None,
3233
retry_forever: bool = False,
3334
):
35+
fallback_chains = fallback_chains or []
3436
self._substrate_class: SubstrateClass = substrate
3537
self.ss58_format: int = ss58_format
3638
self.type_registry: dict = type_registry
@@ -56,7 +58,7 @@ def __init__(
5658
initialized = True
5759
break
5860
except ConnectionError:
59-
continue
61+
logging.warning(f"Unable to connect to {chain_url}")
6062
if not initialized:
6163
raise ConnectionError(
6264
f"Unable to connect at any chains specified: {[main_url]+fallback_chains}"
@@ -71,107 +73,140 @@ def __init__(
7173
# token_symbol
7274
# name
7375

74-
self._get_block_handler = partial(self._retry, "_get_block_handler")
75-
self.apply_type_registry_presets = partial(
76-
self._retry, "apply_type_registry_presets"
76+
retry = (
77+
self._async_retry
78+
if self._substrate_class == AsyncSubstrateInterface
79+
else self._retry
7780
)
78-
self.close = partial(self._retry, "close")
79-
self.compose_call = partial(self._retry, "compose_call")
80-
self.connect = partial(self._retry, "connect")
81-
self.create_scale_object = partial(self._retry, "create_scale_object")
82-
self.create_signed_extrinsic = partial(self._retry, "create_signed_extrinsic")
83-
self.create_storage_key = partial(self._retry, "create_storage_key")
84-
self.decode_scale = partial(self._retry, "decode_scale")
85-
self.encode_scale = partial(self._retry, "encode_scale")
86-
self.extension_call = partial(self._retry, "extension_call")
87-
self.filter_events = partial(self._retry, "filter_events")
88-
self.filter_extrinsics = partial(self._retry, "filter_extrinsics")
89-
self.generate_signature_payload = partial(
90-
self._retry, "generate_signature_payload"
91-
)
92-
self.get_account_next_index = partial(self._retry, "get_account_next_index")
93-
self.get_account_nonce = partial(self._retry, "get_account_nonce")
94-
self.get_block = partial(self._retry, "get_block")
95-
self.get_block_hash = partial(self._retry, "get_block_hash")
96-
self.get_block_header = partial(self._retry, "get_block_header")
97-
self.get_block_metadata = partial(self._retry, "get_block_metadata")
98-
self.get_block_number = partial(self._retry, "get_block_number")
99-
self.get_block_runtime_info = partial(self._retry, "get_block_runtime_info")
81+
82+
self._get_block_handler = partial(retry, "_get_block_handler")
83+
self.apply_type_registry_presets = partial(retry, "apply_type_registry_presets")
84+
self.close = partial(retry, "close")
85+
self.compose_call = partial(retry, "compose_call")
86+
self.connect = partial(retry, "connect")
87+
self.create_scale_object = partial(retry, "create_scale_object")
88+
self.create_signed_extrinsic = partial(retry, "create_signed_extrinsic")
89+
self.create_storage_key = partial(retry, "create_storage_key")
90+
self.decode_scale = partial(retry, "decode_scale")
91+
self.encode_scale = partial(retry, "encode_scale")
92+
self.extension_call = partial(retry, "extension_call")
93+
self.filter_events = partial(retry, "filter_events")
94+
self.filter_extrinsics = partial(retry, "filter_extrinsics")
95+
self.generate_signature_payload = partial(retry, "generate_signature_payload")
96+
self.get_account_next_index = partial(retry, "get_account_next_index")
97+
self.get_account_nonce = partial(retry, "get_account_nonce")
98+
self.get_block = partial(retry, "get_block")
99+
self.get_block_hash = partial(retry, "get_block_hash")
100+
self.get_block_header = partial(retry, "get_block_header")
101+
self.get_block_metadata = partial(retry, "get_block_metadata")
102+
self.get_block_number = partial(retry, "get_block_number")
103+
self.get_block_runtime_info = partial(retry, "get_block_runtime_info")
100104
self.get_block_runtime_version_for = partial(
101-
self._retry, "get_block_runtime_version_for"
102-
)
103-
self.get_block_timestamp = partial(self._retry, "get_block_timestamp")
104-
self.get_chain_finalised_head = partial(self._retry, "get_chain_finalised_head")
105-
self.get_chain_head = partial(self._retry, "get_chain_head")
106-
self.get_constant = partial(self._retry, "get_constant")
107-
self.get_events = partial(self._retry, "get_events")
108-
self.get_extrinsics = partial(self._retry, "get_extrinsics")
109-
self.get_metadata_call_function = partial(
110-
self._retry, "get_metadata_call_function"
105+
retry, "get_block_runtime_version_for"
111106
)
112-
self.get_metadata_constant = partial(self._retry, "get_metadata_constant")
113-
self.get_metadata_error = partial(self._retry, "get_metadata_error")
114-
self.get_metadata_errors = partial(self._retry, "get_metadata_errors")
115-
self.get_metadata_module = partial(self._retry, "get_metadata_module")
116-
self.get_metadata_modules = partial(self._retry, "get_metadata_modules")
107+
self.get_block_timestamp = partial(retry, "get_block_timestamp")
108+
self.get_chain_finalised_head = partial(retry, "get_chain_finalised_head")
109+
self.get_chain_head = partial(retry, "get_chain_head")
110+
self.get_constant = partial(retry, "get_constant")
111+
self.get_events = partial(retry, "get_events")
112+
self.get_extrinsics = partial(retry, "get_extrinsics")
113+
self.get_metadata_call_function = partial(retry, "get_metadata_call_function")
114+
self.get_metadata_constant = partial(retry, "get_metadata_constant")
115+
self.get_metadata_error = partial(retry, "get_metadata_error")
116+
self.get_metadata_errors = partial(retry, "get_metadata_errors")
117+
self.get_metadata_module = partial(retry, "get_metadata_module")
118+
self.get_metadata_modules = partial(retry, "get_metadata_modules")
117119
self.get_metadata_runtime_call_function = partial(
118-
self._retry, "get_metadata_runtime_call_function"
120+
retry, "get_metadata_runtime_call_function"
119121
)
120122
self.get_metadata_runtime_call_functions = partial(
121-
self._retry, "get_metadata_runtime_call_functions"
123+
retry, "get_metadata_runtime_call_functions"
122124
)
123125
self.get_metadata_storage_function = partial(
124-
self._retry, "get_metadata_storage_function"
126+
retry, "get_metadata_storage_function"
125127
)
126128
self.get_metadata_storage_functions = partial(
127-
self._retry, "get_metadata_storage_functions"
128-
)
129-
self.get_parent_block_hash = partial(self._retry, "get_parent_block_hash")
130-
self.get_payment_info = partial(self._retry, "get_payment_info")
131-
self.get_storage_item = partial(self._retry, "get_storage_item")
132-
self.get_type_definition = partial(self._retry, "get_type_definition")
133-
self.get_type_registry = partial(self._retry, "get_type_registry")
134-
self.init_runtime = partial(self._retry, "init_runtime")
135-
self.initialize = partial(self._retry, "initialize")
136-
self.is_valid_ss58_address = partial(self._retry, "is_valid_ss58_address")
137-
self.load_runtime = partial(self._retry, "load_runtime")
138-
self.make_payload = partial(self._retry, "make_payload")
139-
self.query = partial(self._retry, "query")
140-
self.query_map = partial(self._retry, "query_map")
141-
self.query_multi = partial(self._retry, "query_multi")
142-
self.query_multiple = partial(self._retry, "query_multiple")
143-
self.reload_type_registry = partial(self._retry, "reload_type_registry")
144-
self.retrieve_extrinsic_by_hash = partial(
145-
self._retry, "retrieve_extrinsic_by_hash"
129+
retry, "get_metadata_storage_functions"
146130
)
131+
self.get_parent_block_hash = partial(retry, "get_parent_block_hash")
132+
self.get_payment_info = partial(retry, "get_payment_info")
133+
self.get_storage_item = partial(retry, "get_storage_item")
134+
self.get_type_definition = partial(retry, "get_type_definition")
135+
self.get_type_registry = partial(retry, "get_type_registry")
136+
self.init_runtime = partial(retry, "init_runtime")
137+
self.initialize = partial(retry, "initialize")
138+
self.is_valid_ss58_address = partial(retry, "is_valid_ss58_address")
139+
self.load_runtime = partial(retry, "load_runtime")
140+
self.make_payload = partial(retry, "make_payload")
141+
self.query = partial(retry, "query")
142+
self.query_map = partial(retry, "query_map")
143+
self.query_multi = partial(retry, "query_multi")
144+
self.query_multiple = partial(retry, "query_multiple")
145+
self.reload_type_registry = partial(retry, "reload_type_registry")
146+
self.retrieve_extrinsic_by_hash = partial(retry, "retrieve_extrinsic_by_hash")
147147
self.retrieve_extrinsic_by_identifier = partial(
148-
self._retry, "retrieve_extrinsic_by_identifier"
148+
retry, "retrieve_extrinsic_by_identifier"
149149
)
150-
self.rpc_request = partial(self._retry, "rpc_request")
151-
self.runtime_call = partial(self._retry, "runtime_call")
152-
self.search_block_number = partial(self._retry, "search_block_number")
153-
self.serialize_constant = partial(self._retry, "serialize_constant")
154-
self.serialize_module_call = partial(self._retry, "serialize_module_call")
155-
self.serialize_module_error = partial(self._retry, "serialize_module_error")
156-
self.serialize_module_event = partial(self._retry, "serialize_module_event")
157-
self.serialize_storage_item = partial(self._retry, "serialize_storage_item")
158-
self.ss58_decode = partial(self._retry, "ss58_decode")
159-
self.ss58_encode = partial(self._retry, "ss58_encode")
160-
self.submit_extrinsic = partial(self._retry, "submit_extrinsic")
161-
self.subscribe_block_headers = partial(self._retry, "subscribe_block_headers")
162-
self.supports_rpc_method = partial(self._retry, "supports_rpc_method")
150+
self.rpc_request = partial(retry, "rpc_request")
151+
self.runtime_call = partial(retry, "runtime_call")
152+
self.search_block_number = partial(retry, "search_block_number")
153+
self.serialize_constant = partial(retry, "serialize_constant")
154+
self.serialize_module_call = partial(retry, "serialize_module_call")
155+
self.serialize_module_error = partial(retry, "serialize_module_error")
156+
self.serialize_module_event = partial(retry, "serialize_module_event")
157+
self.serialize_storage_item = partial(retry, "serialize_storage_item")
158+
self.ss58_decode = partial(retry, "ss58_decode")
159+
self.ss58_encode = partial(retry, "ss58_encode")
160+
self.submit_extrinsic = partial(retry, "submit_extrinsic")
161+
self.subscribe_block_headers = partial(retry, "subscribe_block_headers")
162+
self.supports_rpc_method = partial(retry, "supports_rpc_method")
163163
self.ws = self._substrate.ws
164164

165165
def _retry(self, method, *args, **kwargs):
166166
try:
167167
method_ = getattr(self._substrate, method)
168168
return method_(*args, **kwargs)
169-
except MaxRetriesExceeded:
169+
except (MaxRetriesExceeded, ConnectionError, ConnectionRefusedError) as e:
170170
try:
171171
next_network = next(self.fallback_chains)
172+
if e.__class__ == MaxRetriesExceeded:
173+
logging.error(
174+
f"Max retries exceeded with {self._substrate.url}. Retrying with {next_network}."
175+
)
176+
else:
177+
print(f"Connection error. Trying again with {next_network}")
178+
self._substrate = self._substrate_class(
179+
url=next_network,
180+
ss58_format=self.ss58_format,
181+
type_registry=self.type_registry,
182+
use_remote_preset=self.use_remote_preset,
183+
chain_name=self.chain_name,
184+
_mock=self._mock,
185+
)
186+
method_ = getattr(self._substrate, method)
187+
return self._retry(method_(*args, **kwargs))
188+
except StopIteration:
172189
logging.error(
173-
f"Max retries exceeded with {self._substrate.url}. Retrying with {next_network}."
190+
f"Max retries exceeded with {self._substrate.url}. No more fallback chains."
174191
)
192+
raise MaxRetriesExceeded
193+
194+
async def _async_retry(self, method, *args, **kwargs):
195+
try:
196+
method_ = getattr(self._substrate, method)
197+
if asyncio.iscoroutinefunction(method_):
198+
return await method_(*args, **kwargs)
199+
else:
200+
return method_(*args, **kwargs)
201+
except (MaxRetriesExceeded, ConnectionError, ConnectionRefusedError) as e:
202+
try:
203+
next_network = next(self.fallback_chains)
204+
if e.__class__ == MaxRetriesExceeded:
205+
logging.error(
206+
f"Max retries exceeded with {self._substrate.url}. Retrying with {next_network}."
207+
)
208+
else:
209+
print(f"Connection error. Trying again with {next_network}")
175210
self._substrate = self._substrate_class(
176211
url=next_network,
177212
ss58_format=self.ss58_format,
@@ -181,7 +216,10 @@ def _retry(self, method, *args, **kwargs):
181216
_mock=self._mock,
182217
)
183218
method_ = getattr(self._substrate, method)
184-
return self._retry(method_(*args, **kwargs))
219+
if asyncio.iscoroutinefunction(method_):
220+
return await method_(*args, **kwargs)
221+
else:
222+
return method_(*args, **kwargs)
185223
except StopIteration:
186224
logging.error(
187225
f"Max retries exceeded with {self._substrate.url}. No more fallback chains."

0 commit comments

Comments
 (0)