Skip to content
Merged
Show file tree
Hide file tree
Changes from 7 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
115 changes: 105 additions & 10 deletions bittensor/core/async_subtensor.py
Original file line number Diff line number Diff line change
Expand Up @@ -2673,9 +2673,17 @@ async def get_mechanism_emission_split(
whole numbers). Returns None if emission is evenly split or if the data is unavailable.
"""
block_hash = await self.determine_block_hash(block, block_hash, reuse_block)
module = "SubtensorModule"
storage_function = "MechanismEmissionSplit"
if not await self.substrate.get_metadata_storage_function(
module,
storage_function,
block_hash=block_hash,
):
return None
result = await self.substrate.query(
module="SubtensorModule",
storage_function="MechanismEmissionSplit",
module=module,
storage_function=storage_function,
params=[netuid],
block_hash=block_hash,
)
Expand Down Expand Up @@ -2703,14 +2711,57 @@ async def get_mechanism_count(
The number of mechanisms for the given subnet.
"""
block_hash = await self.determine_block_hash(block, block_hash, reuse_block)
module = "SubtensorModule"
storage_function = "MechanismCountCurrent"
if not await self.substrate.get_metadata_storage_function(
module,
storage_function,
block_hash=block_hash,
):
return 1
query = await self.substrate.query(
module="SubtensorModule",
storage_function="MechanismCountCurrent",
module=module,
storage_function=storage_function,
params=[netuid],
block_hash=block_hash,
)
return getattr(query, "value", 1)

async def _runtime_method_exists(
self, api: str, method: str, block_hash: str
) -> bool:
"""
Check if a runtime call method exists at the given block.

The complicated logic here comes from the fact that there are two ways in which runtime calls
are stored: the new and primary method is through the Metadata V15, but the V14 is a good backup (implemented
around mid 2024)

Returns:
True if the runtime call method exists, False otherwise.
"""
runtime = await self.substrate.init_runtime(block_hash=block_hash)
if runtime.metadata_v15 is not None:
metadata_v15_value = runtime.metadata_v15.value()
apis = {entry["name"]: entry for entry in metadata_v15_value["apis"]}
try:
api_entry = apis[api]
methods = {entry["name"]: entry for entry in api_entry["methods"]}
_ = methods[method]
return True
except KeyError:
return False
else:
try:
await self.substrate.get_metadata_runtime_call_function(
api=api,
method=method,
block_hash=block_hash,
)
return True
except ValueError:
return False

async def get_metagraph_info(
self,
netuid: int,
Expand Down Expand Up @@ -2769,22 +2820,66 @@ async def get_metagraph_info(
block_hash = await self.determine_block_hash(block, block_hash, reuse_block)
if not block_hash and reuse_block:
block_hash = self.substrate.last_block_hash
if not block_hash:
block_hash = await self.substrate.get_chain_head()

indexes = (
[
async def _determine_fetch_method(block_hash_: str) -> tuple[str, list]:
"""Determine the best available runtime call method and its parameters for the given block/params"""

# Try selective mechagraph first (newest)
if await self._runtime_method_exists(
"SubnetInfoRuntimeApi", "get_selective_mechagraph", block_hash_
):
if indexes is not None:
return "get_selective_mechagraph", [netuid, mechid, indexes]
return "get_selective_mechagraph", [
netuid,
mechid,
list(range(len(SelectiveMetagraphIndex))),
]

# Try selective metagraph (older)
if await self._runtime_method_exists(
"SubnetInfoRuntimeApi", "get_selective_metagraph", block_hash_
):
if indexes is not None:
return "get_selective_metagraph", [netuid, indexes]
# Fall through to get_metagraph if no indexes specified
elif indexes is not None:
# User requested selective retrieval but it's not available
raise ValueError(
"You have specified `selected_indices` to retrieve metagraph info selectively, but the "
"selective runtime calls are not available at this block (probably too old). Do not specify "
"`selected_indices` to retrieve metagraph info selectively."
)

# Fall back to old metagraph
return "get_metagraph", [netuid]

# Normalize selected_indices to a list of integers
if selected_indices is not None:
indexes = [
f.value if isinstance(f, SelectiveMetagraphIndex) else f
for f in selected_indices
]
if selected_indices is not None
else [f for f in range(len(SelectiveMetagraphIndex))]
if 0 not in indexes:
indexes = [0] + indexes
else:
indexes = None

# Determine the best available fetch method
fetch_method, params = await _determine_fetch_method(
block_hash_=block_hash,
)

# Execute the query
query = await self.substrate.runtime_call(
api="SubnetInfoRuntimeApi",
method="get_selective_mechagraph",
params=[netuid, mechid, indexes if 0 in indexes else [0] + indexes],
method=fetch_method,
params=params,
block_hash=block_hash,
)

if getattr(query, "value", None) is None:
logging.error(
f"Subnet mechanism {netuid}.{mechid if mechid else 0} does not exist."
Expand Down
17 changes: 10 additions & 7 deletions bittensor/core/metagraph.py
Original file line number Diff line number Diff line change
Expand Up @@ -1413,7 +1413,7 @@ async def sync(
lite = self.lite

subtensor = await self._initialize_subtensor(subtensor)

cur_block = None
if (
subtensor.chain_endpoint != settings.ARCHIVE_ENTRYPOINT
or subtensor.network != "archive"
Expand All @@ -1425,7 +1425,10 @@ async def sync(
"network for subtensor and retry."
)
if block is None:
block = await subtensor.get_current_block()
if cur_block is not None:
block = cur_block
else:
block = await subtensor.get_current_block()

# Assign neurons based on 'lite' flag
await self._assign_neurons(block, lite, subtensor)
Expand Down Expand Up @@ -1654,11 +1657,11 @@ async def _apply_extra_info(self, block: int):
)
if metagraph_info:
self._apply_metagraph_info_mixin(metagraph_info=metagraph_info)
self.mechanism_count = await self.subtensor.get_mechanism_count(
netuid=self.netuid, block=block
)
self.emissions_split = await self.subtensor.get_mechanism_emission_split(
netuid=self.netuid, block=block
self.mechanism_count, self.emissions_split = await asyncio.gather(
self.subtensor.get_mechanism_count(netuid=self.netuid, block=block),
self.subtensor.get_mechanism_emission_split(
netuid=self.netuid, block=block
),
)


Expand Down
108 changes: 99 additions & 9 deletions bittensor/core/subtensor.py
Original file line number Diff line number Diff line change
Expand Up @@ -1930,6 +1930,12 @@ def get_mechanism_emission_split(
whole numbers). Returns None if emission is evenly split or if the data is unavailable.
"""
block_hash = self.determine_block_hash(block)
module = "SubtensorModule"
storage_function = "MechanismEmissionSplit"
if not self.substrate.get_metadata_storage_function(
module, storage_function, block_hash=block_hash
):
return None
result = self.substrate.query(
module="SubtensorModule",
storage_function="MechanismEmissionSplit",
Expand All @@ -1956,14 +1962,53 @@ def get_mechanism_count(
The number of mechanisms for the given subnet.
"""
block_hash = self.determine_block_hash(block)
module = "SubtensorModule"
storage_function = "MechanismCountCurrent"
if not self.substrate.get_metadata_storage_function(
module, storage_function, block_hash=block_hash
):
return 1
query = self.substrate.query(
module="SubtensorModule",
storage_function="MechanismCountCurrent",
module=module,
storage_function=storage_function,
params=[netuid],
block_hash=block_hash,
)
return query.value if query is not None and hasattr(query, "value") else 1

def _runtime_method_exists(self, api: str, method: str, block_hash: str) -> bool:
"""
Check if a runtime call method exists at the given block.

The complicated logic here comes from the fact that there are two ways in which runtime calls
are stored: the new and primary method is through the Metadata V15, but the V14 is a good backup (implemented
around mid 2024)

Returns:
True if the runtime call method exists, False otherwise.
"""
runtime = self.substrate.init_runtime(block_hash=block_hash)
if runtime.metadata_v15 is not None:
metadata_v15_value = runtime.metadata_v15.value()
apis = {entry["name"]: entry for entry in metadata_v15_value["apis"]}
try:
api_entry = apis[api]
methods = {entry["name"]: entry for entry in api_entry["methods"]}
_ = methods[method]
return True
except KeyError:
return False
else:
try:
self.substrate.get_metadata_runtime_call_function(
api=api,
method=method,
block_hash=block_hash,
)
return True
except ValueError:
return False

def get_metagraph_info(
self,
netuid: int,
Expand Down Expand Up @@ -2011,23 +2056,68 @@ def get_metagraph_info(
- <https://docs.learnbittensor.org/glossary#metagraph>
- <https://docs.learnbittensor.org/glossary#emission>
"""
block_hash = self.determine_block_hash(block=block)

indexes = (
[
def _determine_fetch_method(block_hash_: str) -> tuple[str, list]:
"""Determine the best available runtime call method and its parameters for the given block/params"""

# Try selective mechagraph first (newest)
if self._runtime_method_exists(
"SubnetInfoRuntimeApi", "get_selective_mechagraph", block_hash_
):
if indexes is not None:
return "get_selective_mechagraph", [netuid, mechid, indexes]
return "get_selective_mechagraph", [
netuid,
mechid,
list(range(len(SelectiveMetagraphIndex))),
]

# Try selective metagraph (older)
if self._runtime_method_exists(
"SubnetInfoRuntimeApi", "get_selective_metagraph", block_hash_
):
if indexes is not None:
return "get_selective_metagraph", [netuid, indexes]
# Fall through to get_metagraph if no indexes specified
elif indexes is not None:
# User requested selective retrieval but it's not available
raise ValueError(
"You have specified `selected_indices` to retrieve metagraph info selectively, but the "
"selective runtime calls are not available at this block (probably too old). Do not specify "
"`selected_indices` to retrieve metagraph info selectively."
)

# Fall back to old metagraph
return "get_metagraph", [netuid]

block_hash: str = (
self.determine_block_hash(block=block) or self.substrate.get_chain_head()
)

# Normalize selected_indices to a list of integers
if selected_indices is not None:
indexes = [
f.value if isinstance(f, SelectiveMetagraphIndex) else f
for f in selected_indices
]
if selected_indices is not None
else [f for f in range(len(SelectiveMetagraphIndex))]
if 0 not in indexes:
indexes = [0] + indexes
else:
indexes = None

# Determine the best available fetch method
fetch_method, params = _determine_fetch_method(
block_hash_=block_hash,
)

# Execute the query
query = self.substrate.runtime_call(
api="SubnetInfoRuntimeApi",
method="get_selective_mechagraph",
params=[netuid, mechid, indexes if 0 in indexes else [0] + indexes],
method=fetch_method,
params=params,
block_hash=block_hash,
)

if query is None or not hasattr(query, "value") or query.value is None:
logging.error(
f"Subnet mechanism {netuid}.{mechid if mechid else 0} does not exist."
Expand Down
2 changes: 1 addition & 1 deletion tests/helpers/integration_websocket_at_version.txt

Large diffs are not rendered by default.

Loading