Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
183 changes: 72 additions & 111 deletions bittensor/core/async_subtensor.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,8 +11,6 @@
from bittensor_wallet.utils import SS58_FORMAT
from numpy.typing import NDArray
from scalecodec import GenericCall, ScaleType
from scalecodec.base import RuntimeConfiguration
from scalecodec.type_registry import load_type_registry_preset

from bittensor.core.chain_data import (
DelegateInfo,
Expand All @@ -24,7 +22,6 @@
SubnetHyperparameters,
SubnetInfo,
WeightCommitInfo,
custom_rpc_type_registry,
decode_account_id,
DynamicInfo,
)
Expand Down Expand Up @@ -69,8 +66,6 @@
from bittensor.utils import (
decode_hex_identity_dict,
format_error_message,
hex_to_bytes,
ss58_to_vec_u8,
torch,
u16_normalized_float,
_decode_hex_identity_dict,
Expand Down Expand Up @@ -424,11 +419,11 @@ async def query_runtime_api(
self,
runtime_api: str,
method: str,
params: Optional[Union[list[list[int]], dict[str, int], list[int]]] = None,
params: Optional[Union[list[Any], dict[str, Any]]],
block: Optional[int] = None,
block_hash: Optional[str] = None,
reuse_block: bool = False,
) -> Optional[str]:
) -> Optional[Any]:
"""
Queries the runtime API of the Bittensor blockchain, providing a way to interact with the underlying runtime and
retrieve data encoded in Scale Bytes format. This function is essential for advanced users who need to
Expand All @@ -444,47 +439,16 @@ async def query_runtime_api(
reuse_block: Whether to reuse the last-used block hash. Do not set if using block_hash or block

Returns:
The Scale Bytes encoded result from the runtime API call, or `None` if the call fails.
The decoded result from the runtime API call, or `None` if the call fails.

This function enables access to the deeper layers of the Bittensor blockchain, allowing for detailed and
specific interactions with the network's runtime environment.
"""
# TODO why doesn't this just use SubstrateInterface.runtime_call ?
block_hash = await self.determine_block_hash(block, block_hash, reuse_block)

call_definition = TYPE_REGISTRY["runtime_api"][runtime_api]["methods"][method]

data = (
"0x"
if params is None
else await self.encode_params(
call_definition=call_definition, params=params
)
)
api_method = f"{runtime_api}_{method}"

json_result = await self.substrate.rpc_request(
method="state_call",
params=[api_method, data, block_hash] if block_hash else [api_method, data],
reuse_block_hash=reuse_block,
result = await self.substrate.runtime_call(
runtime_api, method, params, block_hash
)

if json_result is None:
return None

return_type = call_definition["type"]

as_scale_bytes = scalecodec.ScaleBytes(json_result["result"]) # type: ignore

rpc_runtime_config = RuntimeConfiguration()
rpc_runtime_config.update_type_registry(load_type_registry_preset("legacy"))
rpc_runtime_config.update_type_registry(custom_rpc_type_registry)

obj = rpc_runtime_config.create_scale_object(return_type, as_scale_bytes)
if obj.data.to_hex() == "0x0400": # RPC returned None result
return None

return obj.decode()
return result.value

async def query_subtensor(
self,
Expand Down Expand Up @@ -793,14 +757,18 @@ async def get_all_subnets_info(
Gaining insights into the subnets' details assists in understanding the network's composition, the roles of
different subnets, and their unique features.
"""
block_hash = await self.determine_block_hash(block, block_hash, reuse_block)
hex_bytes_result = await self.query_runtime_api(
"SubnetInfoRuntimeApi", "get_subnets_info", params=[], block_hash=block_hash
result = await self.query_runtime_api(
runtime_api="SubnetInfoRuntimeApi",
method="get_subnets_info",
params=[],
block=block,
block_hash=block_hash,
reuse_block=reuse_block,
)
if not hex_bytes_result:
if not result:
return []
else:
return SubnetInfo.list_from_vec_u8(hex_to_bytes(hex_bytes_result))
return SubnetInfo.list_from_any(result)

async def get_balance(
self,
Expand Down Expand Up @@ -1043,19 +1011,19 @@ async def get_delegate_by_hotkey(
network's consensus and governance structures.
"""

block_hash = await self.determine_block_hash(block, block_hash, reuse_block)
encoded_hotkey = ss58_to_vec_u8(hotkey_ss58)

json_body = await self.substrate.rpc_request(
method="delegateInfo_getDelegate", # custom rpc method
params=([encoded_hotkey, block_hash] if block_hash else [encoded_hotkey]),
reuse_block_hash=reuse_block,
result = await self.query_runtime_api(
runtime_api="DelegateInfoRuntimeApi",
method="get_delegate",
params=[hotkey_ss58],
block=block,
block_hash=block_hash,
reuse_block=reuse_block,
)

if not (result := json_body.get("result", None)):
if not result:
return None

return DelegateInfo.from_vec_u8(bytes(result))
return DelegateInfo.from_any(result)

async def get_delegate_identities(
self,
Expand Down Expand Up @@ -1192,18 +1160,19 @@ async def get_delegated(
the network's delegation and consensus mechanisms.
"""

block_hash = await self.determine_block_hash(block, block_hash, reuse_block)
encoded_coldkey = ss58_to_vec_u8(coldkey_ss58)
json_body = await self.substrate.rpc_request(
method="delegateInfo_getDelegated",
params=([block_hash, encoded_coldkey] if block_hash else [encoded_coldkey]),
reuse_block_hash=reuse_block,
result = await self.query_runtime_api(
runtime_api="DelegateInfoRuntimeApi",
method="get_delegated",
params=[coldkey_ss58],
block=block,
block_hash=block_hash,
reuse_block=reuse_block,
)

if not (result := json_body.get("result")):
if not result:
return []

return DelegateInfo.delegated_list_from_vec_u8(bytes(result))
return DelegateInfo.delegated_list_from_any(result)

async def get_delegates(
self,
Expand All @@ -1222,16 +1191,16 @@ async def get_delegates(
Returns:
List of DelegateInfo objects, or an empty list if there are no delegates.
"""
block_hash = await self.determine_block_hash(block, block_hash, reuse_block)
hex_bytes_result = await self.query_runtime_api(
result = await self.query_runtime_api(
runtime_api="DelegateInfoRuntimeApi",
method="get_delegates",
params=[],
block=block,
block_hash=block_hash,
reuse_block=reuse_block,
)
if hex_bytes_result:
return DelegateInfo.list_from_vec_u8(hex_to_bytes(hex_bytes_result))
if result:
return DelegateInfo.list_from_any(result)
else:
return []

Expand Down Expand Up @@ -1508,18 +1477,17 @@ async def get_neuron_for_pubkey_and_subnet(
if uid is None:
return NeuronInfo.get_null_neuron()

params = [netuid, uid.value]
json_body = await self.substrate.rpc_request(
method="neuronInfo_getNeuron",
params=params,
result = await self.query_runtime_api(
runtime_api="NeuronInfoRuntimeApi",
method="get_neuron",
params=[netuid, uid.value],
block_hash=block_hash,
reuse_block_hash=reuse_block,
)

if not (result := json_body.get("result", None)):
if not result:
return NeuronInfo.get_null_neuron()

return NeuronInfo.from_vec_u8(bytes(result))
return NeuronInfo.from_any(result)

async def get_stake(
self,
Expand Down Expand Up @@ -1649,21 +1617,19 @@ async def get_stake_info_for_coldkey(
Stake information is vital for account holders to assess their investment and participation in the network's
delegation and consensus processes.
"""
encoded_coldkey = ss58_to_vec_u8(coldkey_ss58)

block_hash = await self.determine_block_hash(block, block_hash, reuse_block)
hex_bytes_result = await self.query_runtime_api(
result = await self.query_runtime_api(
runtime_api="StakeInfoRuntimeApi",
method="get_stake_info_for_coldkey",
params=[encoded_coldkey],
params=[coldkey_ss58],
block=block,
block_hash=block_hash,
reuse_block=reuse_block,
)

if not hex_bytes_result:
if not result:
return []

return StakeInfo.list_from_vec_u8(hex_to_bytes(hex_bytes_result))
return StakeInfo.list_from_any(result)

async def get_subnet_burn_cost(
self,
Expand All @@ -1686,11 +1652,11 @@ async def get_subnet_burn_cost(
The subnet burn cost is an important economic parameter, reflecting the network's mechanisms for controlling
the proliferation of subnets and ensuring their commitment to the network's long-term viability.
"""
block_hash = await self.determine_block_hash(block, block_hash, reuse_block)
lock_cost = await self.query_runtime_api(
runtime_api="SubnetRegistrationRuntimeApi",
method="get_network_registration_cost",
params=[],
block=block,
block_hash=block_hash,
reuse_block=reuse_block,
)
Expand Down Expand Up @@ -1720,19 +1686,19 @@ async def get_subnet_hyperparameters(
Understanding the hyperparameters is crucial for comprehending how subnets are configured and managed, and how
they interact with the network's consensus and incentive mechanisms.
"""
block_hash = await self.determine_block_hash(block, block_hash, reuse_block)
hex_bytes_result = await self.query_runtime_api(
result = await self.query_runtime_api(
runtime_api="SubnetInfoRuntimeApi",
method="get_subnet_hyperparams",
params=[netuid],
block=block,
block_hash=block_hash,
reuse_block=reuse_block,
)

if not hex_bytes_result:
if not result:
return None

return SubnetHyperparameters.from_vec_u8(hex_to_bytes(hex_bytes_result))
return SubnetHyperparameters.from_any(result)

async def get_subnet_reveal_period_epochs(
self, netuid: int, block: Optional[int] = None, block_hash: Optional[str] = None
Expand Down Expand Up @@ -2402,22 +2368,19 @@ async def neuron_for_uid(
if uid is None:
return NeuronInfo.get_null_neuron()

block_hash = await self.determine_block_hash(block, block_hash, reuse_block)

if reuse_block:
block_hash = self.substrate.last_block_hash

params = [netuid, uid, block_hash] if block_hash else [netuid, uid]
json_body = await self.substrate.rpc_request(
method="neuronInfo_getNeuron",
params=params, # custom rpc method
reuse_block_hash=reuse_block,
result = await self.query_runtime_api(
runtime_api="NeuronInfoRuntimeApi",
method="get_neuron",
params=[netuid, uid],
block=block,
block_hash=block_hash,
reuse_block=reuse_block,
)
if not (result := json_body.get("result", None)):

if not result:
return NeuronInfo.get_null_neuron()

bytes_result = bytes(result)
return NeuronInfo.from_vec_u8(bytes_result)
return NeuronInfo.from_any(result)

async def neurons(
self,
Expand All @@ -2443,19 +2406,19 @@ async def neurons(
Understanding the distribution and status of neurons within a subnet is key to comprehending the network's
decentralized structure and the dynamics of its consensus and governance processes.
"""
block_hash = await self.determine_block_hash(block, block_hash, reuse_block)
hex_bytes_result = await self.query_runtime_api(
result = await self.query_runtime_api(
runtime_api="NeuronInfoRuntimeApi",
method="get_neurons",
params=[netuid],
block=block,
block_hash=block_hash,
reuse_block=reuse_block,
)

if not hex_bytes_result:
if not result:
return []

return NeuronInfo.list_from_vec_u8(hex_to_bytes(hex_bytes_result))
return NeuronInfo.list_from_any(result)

async def neurons_lite(
self,
Expand All @@ -2481,21 +2444,19 @@ async def neurons_lite(
This function offers a quick overview of the neuron population within a subnet, facilitating efficient analysis
of the network's decentralized structure and neuron dynamics.
"""
block_hash = await self.determine_block_hash(block, block_hash, reuse_block)
hex_bytes_result = await self.query_runtime_api(
result = await self.query_runtime_api(
runtime_api="NeuronInfoRuntimeApi",
method="get_neurons_lite",
params=[
netuid
], # TODO check to see if this can accept more than one at a time
params=[netuid],
block=block,
block_hash=block_hash,
reuse_block=reuse_block,
)

if not hex_bytes_result:
if not result:
return []

return NeuronInfoLite.list_from_vec_u8(hex_to_bytes(hex_bytes_result))
return NeuronInfoLite.list_from_any(result)

async def query_identity(
self,
Expand Down
2 changes: 1 addition & 1 deletion bittensor/core/chain_data/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@
from .subnet_info import SubnetInfo
from .subnet_state import SubnetState
from .weight_commit_info import WeightCommitInfo
from .utils import custom_rpc_type_registry, decode_account_id, process_stake_data
from .utils import decode_account_id, process_stake_data

ProposalCallData = GenericCall

Expand Down
Loading
Loading