Skip to content
Merged
Show file tree
Hide file tree
Changes from 13 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
199 changes: 180 additions & 19 deletions bittensor/core/async_subtensor.py
Original file line number Diff line number Diff line change
Expand Up @@ -500,6 +500,148 @@ async def encode_params(

return param_data.to_hex()

async def _runtime_method_exists(
self, api: str, method: str, block_hash: str
) -> bool:
"""
Check if a runtime call method exists at the given block.

The complicated logic here comes from the fact that there are two ways in which runtime calls
are stored: the new and primary method is through the Metadata V15, but the V14 is a good backup (implemented
around mid 2024)

Returns:
True if the runtime call method exists, False otherwise.
"""
runtime = await self.substrate.init_runtime(block_hash=block_hash)
if runtime.metadata_v15 is not None:
metadata_v15_value = runtime.metadata_v15.value()
apis = {entry["name"]: entry for entry in metadata_v15_value["apis"]}
try:
api_entry = apis[api]
methods = {entry["name"]: entry for entry in api_entry["methods"]}
_ = methods[method]
return True
except KeyError:
return False
else:
try:
await self.substrate.get_metadata_runtime_call_function(
api=api,
method=method,
block_hash=block_hash,
)
return True
except ValueError:
return False

async def _query_with_fallback(
self,
*args: tuple[str, str, Optional[list[Any]]],
block_hash: Optional[str] = None,
default_value: Any = ValueError,
):
"""
Queries the subtensor node with a given set of args, falling back to the next group if the method
does not exist at the given block. This method exists to support backwards compatibility for blocks.

Parameters:
*args: Tuples containing (module, storage_function, params) in the order they should be attempted.
block_hash: The hash of the block being queried. If not provided, the chain tip will be used.
default_value: The default value to return if none of the methods exist at the given block.

Returns:
The value returned by the subtensor node, or the default value if none of the methods exist at the given
block.

Raises:
ValueError: If no default value is provided, and none of the methods exist at the given block, a
ValueError will be raised.

Example:
value = await self._query_with_fallback(
# the first attempt will be made to SubtensorModule.MechanismEmissionSplit with params `[1]`
("SubtensorModule", "MechanismEmissionSplit", [1]),
# if it does not exist at the given block, the next attempt will be made to
# SubtensorModule.MechanismEmission with params `None`
("SubtensorModule", "MechanismEmission", None),
block_hash="0x1234",
# if none of the methods exist at the given block, the default value of `None` will be returned
default_value=None,
)
"""
if block_hash is None:
block_hash = await self.substrate.get_chain_head()
for module, storage_function, params in args:
if await self.substrate.get_metadata_storage_function(
module_name=module,
storage_name=storage_function,
block_hash=block_hash,
):
return await self.substrate.query(
module=module,
storage_function=storage_function,
block_hash=block_hash,
params=params,
)
if not isinstance(default_value, ValueError):
return default_value
else:
raise default_value

async def _runtime_call_with_fallback(
self,
*args: tuple[str, str, Optional[list[Any]] | dict[str, Any]],
block_hash: Optional[str] = None,
default_value: Any = ValueError,
):
"""
Makes a runtime call to the subtensor node with a given set of args, falling back to the next group if the
api.method does not exist at the given block. This method exists to support backwards compatibility for blocks.

Parameters:
*args: Tuples containing (api, method, params) in the order they should be attempted.
block_hash: The hash of the block being queried. If not provided, the chain tip will be used.
default_value: The default value to return if none of the methods exist at the given block.

Raises:
ValueError: If no default value is provided, and none of the methods exist at the given block, a
ValueError will be raised.

Example:
query = await self._runtime_call_with_fallback(
# the first attempt will be made to SubnetInfoRuntimeApi.get_selective_mechagraph with the
# given params
(
"SubnetInfoRuntimeApi",
"get_selective_mechagraph",
[netuid, mechid, [f for f in range(len(SelectiveMetagraphIndex))]],
),
# if it does not exist at the given block, the next attempt will be made as such:
("SubnetInfoRuntimeApi", "get_metagraph", [[netuid]]),
block_hash=block_hash,
# if none of the methods exist at the given block, the default value will be returned
default_value=None,
)

"""
if block_hash is None:
block_hash = await self.substrate.get_chain_head()
for api, method, params in args:
if await self._runtime_method_exists(
api=api, method=method, block_hash=block_hash
):
return await self.substrate.runtime_call(
api=api,
method=method,
block_hash=block_hash,
params=params,
)
if not isinstance(default_value, ValueError):
return default_value
else:
raise default_value

async def get_hyperparameter(
self,
param_name: str,
Expand Down Expand Up @@ -2673,11 +2815,10 @@ async def get_mechanism_emission_split(
whole numbers). Returns None if emission is evenly split or if the data is unavailable.
"""
block_hash = await self.determine_block_hash(block, block_hash, reuse_block)
result = await self.substrate.query(
module="SubtensorModule",
storage_function="MechanismEmissionSplit",
params=[netuid],
result = await self._query_with_fallback(
("SubtensorModule", "MechanismEmissionSplit", [netuid]),
block_hash=block_hash,
default_value=None,
)
if result is None or not hasattr(result, "value"):
return None
Expand All @@ -2703,11 +2844,10 @@ async def get_mechanism_count(
The number of mechanisms for the given subnet.
"""
block_hash = await self.determine_block_hash(block, block_hash, reuse_block)
query = await self.substrate.query(
module="SubtensorModule",
storage_function="MechanismCountCurrent",
params=[netuid],
query = await self._query_with_fallback(
("SubtensorModule", "MechanismCountCurrent", [netuid]),
block_hash=block_hash,
default_value=None,
)
return getattr(query, "value", 1)

Expand Down Expand Up @@ -2769,22 +2909,43 @@ async def get_metagraph_info(
block_hash = await self.determine_block_hash(block, block_hash, reuse_block)
if not block_hash and reuse_block:
block_hash = self.substrate.last_block_hash
if not block_hash:
block_hash = await self.substrate.get_chain_head()

indexes = (
[
# Normalize selected_indices to a list of integers
if selected_indices is not None:
indexes = [
f.value if isinstance(f, SelectiveMetagraphIndex) else f
for f in selected_indices
]
if selected_indices is not None
else [f for f in range(len(SelectiveMetagraphIndex))]
)
if 0 not in indexes:
indexes = [0] + indexes
query = await self._runtime_call_with_fallback(
(
"SubnetInfoRuntimeApi",
"get_selective_mechagraph",
[netuid, mechid, indexes],
),
("SubnetInfoRuntimeApi", "get_selective_metagraph", [netuid, indexes]),
block_hash=block_hash,
default_value=ValueError(
"You have specified `selected_indices` to retrieve metagraph info selectively, but the "
"selective runtime calls are not available at this block (probably too old). Do not specify "
"`selected_indices` to retrieve metagraph info selectively."
),
)
else:
query = await self._runtime_call_with_fallback(
(
"SubnetInfoRuntimeApi",
"get_selective_mechagraph",
[netuid, mechid, [f for f in range(len(SelectiveMetagraphIndex))]],
),
("SubnetInfoRuntimeApi", "get_metagraph", [[netuid]]),
block_hash=block_hash,
default_value=None,
)

query = await self.substrate.runtime_call(
api="SubnetInfoRuntimeApi",
method="get_selective_mechagraph",
params=[netuid, mechid, indexes if 0 in indexes else [0] + indexes],
block_hash=block_hash,
)
if getattr(query, "value", None) is None:
logging.error(
f"Subnet mechanism {netuid}.{mechid if mechid else 0} does not exist."
Expand Down
17 changes: 10 additions & 7 deletions bittensor/core/metagraph.py
Original file line number Diff line number Diff line change
Expand Up @@ -1413,7 +1413,7 @@ async def sync(
lite = self.lite

subtensor = await self._initialize_subtensor(subtensor)

cur_block = None
if (
subtensor.chain_endpoint != settings.ARCHIVE_ENTRYPOINT
or subtensor.network != "archive"
Expand All @@ -1425,7 +1425,10 @@ async def sync(
"network for subtensor and retry."
)
if block is None:
block = await subtensor.get_current_block()
if cur_block is not None:
block = cur_block
else:
block = await subtensor.get_current_block()

# Assign neurons based on 'lite' flag
await self._assign_neurons(block, lite, subtensor)
Expand Down Expand Up @@ -1654,11 +1657,11 @@ async def _apply_extra_info(self, block: int):
)
if metagraph_info:
self._apply_metagraph_info_mixin(metagraph_info=metagraph_info)
self.mechanism_count = await self.subtensor.get_mechanism_count(
netuid=self.netuid, block=block
)
self.emissions_split = await self.subtensor.get_mechanism_emission_split(
netuid=self.netuid, block=block
self.mechanism_count, self.emissions_split = await asyncio.gather(
self.subtensor.get_mechanism_count(netuid=self.netuid, block=block),
self.subtensor.get_mechanism_emission_split(
netuid=self.netuid, block=block
),
)


Expand Down
Loading
Loading