Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
130 changes: 80 additions & 50 deletions bittensor/core/async_subtensor.py
Original file line number Diff line number Diff line change
Expand Up @@ -161,6 +161,7 @@
)
from bittensor.utils.balance import (
Balance,
FixedPoint,
check_balance_amount,
fixed_to_float,
)
Expand Down Expand Up @@ -1064,7 +1065,7 @@ async def blocks_since_last_step(
reuse_block=reuse_block,
params=[netuid],
)
return query.value if query is not None and hasattr(query, "value") else query
return cast(Optional[int], getattr(query, "value", query))

async def blocks_since_last_update(
self,
Expand Down Expand Up @@ -1328,13 +1329,12 @@ async def get_admin_freeze_window(
- <https://docs.learnbittensor.org/learn/chain-rate-limits#administrative-freeze-window>
"""
block_hash = await self.determine_block_hash(block, block_hash, reuse_block)
return (
await self.substrate.query(
module="SubtensorModule",
storage_function="AdminFreezeWindow",
block_hash=block_hash,
)
).value
query = await self.substrate.query(
module="SubtensorModule",
storage_function="AdminFreezeWindow",
block_hash=block_hash,
)
return cast(int, getattr(query, "value", query))

async def get_all_subnets_info(
self,
Expand Down Expand Up @@ -1929,7 +1929,11 @@ async def get_children_pending(
),
reuse_block_hash=reuse_block,
)
children, cooldown = response.value
pending_value = getattr(response, "value", response)
children, cooldown = cast(
tuple[list[tuple[int, Any]], int],
pending_value,
)

return (
[
Expand Down Expand Up @@ -1978,8 +1982,10 @@ async def get_coldkey_swap_announcement(
block_hash=block_hash,
reuse_block_hash=reuse_block,
)
if query is None:
return None
return ColdkeySwapAnnouncementInfo.from_query(
coldkey_ss58=coldkey_ss58, query=query
coldkey_ss58=coldkey_ss58, query=cast(ScaleObj, query)
)

async def get_coldkey_swap_announcements(
Expand Down Expand Up @@ -2050,7 +2056,8 @@ async def get_coldkey_swap_announcement_delay(
block_hash=block_hash,
reuse_block_hash=reuse_block,
)
return query.value if getattr(query, "value", None) else 0
value = getattr(query, "value", query)
return cast(int, value) if value is not None else 0

async def get_coldkey_swap_reannouncement_delay(
self,
Expand Down Expand Up @@ -2083,7 +2090,8 @@ async def get_coldkey_swap_reannouncement_delay(
block_hash=block_hash,
reuse_block_hash=reuse_block,
)
return query.value if getattr(query, "value", None) else 0
value = getattr(query, "value", query)
return cast(int, value) if value is not None else 0

async def get_coldkey_swap_dispute(
self,
Expand Down Expand Up @@ -2119,7 +2127,11 @@ async def get_coldkey_swap_dispute(
block_hash=block_hash,
reuse_block_hash=reuse_block,
)
return ColdkeySwapDisputeInfo.from_query(coldkey_ss58=coldkey_ss58, query=query)
if query is None:
return None
return ColdkeySwapDisputeInfo.from_query(
coldkey_ss58=coldkey_ss58, query=cast(ScaleObj, query)
)

async def get_coldkey_swap_disputes(
self,
Expand Down Expand Up @@ -2295,7 +2307,9 @@ async def get_commitment_metadata(
block_hash=block_hash,
reuse_block_hash=reuse_block,
)
return commit_data
if commit_data is None:
return ""
return cast(Union[str, dict], getattr(commit_data, "value", commit_data))

async def get_crowdloan_constants(
self,
Expand Down Expand Up @@ -2464,7 +2478,8 @@ async def get_crowdloan_next_id(
storage_function="NextCrowdloanId",
block_hash=block_hash,
)
return int(result.value or 0)
value = getattr(result, "value", result)
return int(value or 0)

async def get_crowdloans(
self,
Expand Down Expand Up @@ -2826,7 +2841,7 @@ async def get_hotkey_owner(
if hk_owner_query:
exists = await self.does_hotkey_exist(hotkey_ss58, block_hash=block_hash)
hotkey_owner = hk_owner_query if exists else None
return hotkey_owner
return cast(Optional[str], getattr(hotkey_owner, "value", hotkey_owner))

async def get_last_bonds_reset(
self,
Expand Down Expand Up @@ -3393,13 +3408,18 @@ async def get_mev_shield_submission(
block_hash=block_hash,
)

if query is None or not isinstance(query, dict):
query_value = getattr(query, "value", query)
if query_value is None or not isinstance(query_value, dict):
return None

autor = decode_account_id(query.get("author"))
commitment = bytes(query.get("commitment")[0])
ciphertext = bytes(query.get("ciphertext")[0])
submitted_in = query.get("submitted_in")
author_raw = cast(Union[bytes, str], query_value.get("author"))
commitment_raw = cast(list[bytes], query_value.get("commitment"))
ciphertext_raw = cast(list[bytes], query_value.get("ciphertext"))
submitted_in = cast(int, query_value.get("submitted_in"))

autor = decode_account_id(author_raw)
commitment = bytes(commitment_raw[0])
ciphertext = bytes(ciphertext_raw[0])

return {
"author": autor,
Expand Down Expand Up @@ -3841,7 +3861,8 @@ async def get_proxy_announcement(
block_hash=block_hash,
reuse_block_hash=reuse_block,
)
return ProxyAnnouncementInfo.from_dict(query.value[0])
query_value = getattr(query, "value", query)
return ProxyAnnouncementInfo.from_dict(cast(list[Any], query_value)[0])

async def get_proxy_announcements(
self,
Expand Down Expand Up @@ -4057,9 +4078,11 @@ async def get_root_claim_type(
block_hash=block_hash,
reuse_block_hash=reuse_block,
)
query_value = getattr(query, "value", query)
claim_type = cast(dict[str, Any], query_value)
# Query returns enum as dict: {"Swap": ()} or {"Keep": ()} or {"KeepSubnets": {"subnets": [1, 2, 3]}}
variant_name = next(iter(query.keys()))
variant_value = query[variant_name]
variant_name = next(iter(claim_type.keys()))
variant_value = claim_type[variant_name]

# For simple variants (Swap, Keep), value is empty tuple, return string
if not variant_value or variant_value == ():
Expand Down Expand Up @@ -4105,7 +4128,8 @@ async def get_root_alpha_dividends_per_subnet(
block_hash=block_hash,
reuse_block_hash=reuse_block,
)
return Balance.from_rao(query.value).set_unit(netuid=netuid)
value = getattr(query, "value", query)
return Balance.from_rao(cast(int, value)).set_unit(netuid=netuid)

async def get_root_claimable_rate(
self,
Expand Down Expand Up @@ -4173,7 +4197,8 @@ async def get_root_claimable_all_rates(
block_hash=block_hash,
reuse_block_hash=reuse_block,
)
bits_list = next(iter(query.value))
query_value = getattr(query, "value", query)
bits_list = next(iter(cast(list[list[tuple[int, FixedPoint]]], query_value)))
return {bits[0]: fixed_to_float(bits[1], frac_bits=32) for bits in bits_list}

async def get_root_claimable_stake(
Expand Down Expand Up @@ -4272,7 +4297,8 @@ async def get_root_claimed(
block_hash=block_hash,
reuse_block_hash=reuse_block,
)
return Balance.from_rao(query.value).set_unit(netuid=netuid)
value = getattr(query, "value", query)
return Balance.from_rao(cast(int, value)).set_unit(netuid=netuid)

async def get_stake(
self,
Expand Down Expand Up @@ -4322,14 +4348,14 @@ async def get_stake(
params=[hotkey_ss58, netuid],
)

hotkey_alpha: int = getattr(hotkey_alpha_result, "value", 0)
hotkey_alpha = getattr(hotkey_alpha_result, "value", hotkey_alpha_result)
alpha_shares_as_float = fixed_to_float(alpha_shares)
hotkey_shares_as_float = fixed_to_float(hotkey_shares)

if hotkey_shares_as_float == 0:
return Balance.from_rao(0).set_unit(netuid=netuid)

stake = alpha_shares_as_float / hotkey_shares_as_float * hotkey_alpha
stake = alpha_shares_as_float / hotkey_shares_as_float * cast(int, hotkey_alpha)

return Balance.from_rao(int(stake)).set_unit(netuid=netuid)

Expand Down Expand Up @@ -4582,7 +4608,7 @@ async def get_stake_weight(
params=[netuid],
block_hash=block_hash,
)
return [u16_normalized_float(w) for w in result]
return [u16_normalized_float(w) for w in cast(list[int], result or [])]

async def get_start_call_delay(
self,
Expand Down Expand Up @@ -4741,13 +4767,14 @@ async def get_subnet_owner_hotkey(
Returns:
The hotkey of the subnet owner if available; None otherwise.
"""
return await self.query_subtensor(
query = await self.query_subtensor(
name="SubnetOwnerHotkey",
params=[netuid],
block=block,
block_hash=block_hash,
reuse_block=reuse_block,
)
return cast(Optional[str], getattr(query, "value", query))

async def get_subnet_price(
self,
Expand Down Expand Up @@ -4894,7 +4921,7 @@ async def get_subnet_validator_permits(
block_hash=block_hash,
reuse_block=reuse_block,
)
return query.value if query is not None and hasattr(query, "value") else query
return cast(Optional[list[bool]], getattr(query, "value", query))

async def get_timelocked_weight_commits(
self,
Expand Down Expand Up @@ -5115,12 +5142,15 @@ async def get_vote_data(
network, particularly how proposals are received and acted upon by the governing body.
"""
block_hash = await self.determine_block_hash(block, block_hash, reuse_block)
vote_data: dict[str, Any] = await self.substrate.query(
module="Triumvirate",
storage_function="Voting",
params=[proposal_hash],
block_hash=block_hash,
reuse_block_hash=reuse_block,
vote_data = cast(
Optional[dict[str, Any]],
await self.substrate.query(
module="Triumvirate",
storage_function="Voting",
params=[proposal_hash],
block_hash=block_hash,
reuse_block_hash=reuse_block,
),
)

if vote_data is None:
Expand Down Expand Up @@ -5160,7 +5190,7 @@ async def get_uid_for_hotkey_on_subnet(
block_hash=block_hash,
reuse_block_hash=reuse_block,
)
return getattr(result, "value", result)
return cast(Optional[int], getattr(result, "value", result))

async def filter_netuids_by_registered_hotkeys(
self,
Expand Down Expand Up @@ -5759,23 +5789,21 @@ async def query_identity(
parameters.
"""
block_hash = await self.determine_block_hash(block, block_hash, reuse_block)
identity_info = cast(
dict,
await self.substrate.query(
module="SubtensorModule",
storage_function="IdentitiesV2",
params=[coldkey_ss58],
block_hash=block_hash,
reuse_block_hash=reuse_block,
),
identity_info = await self.substrate.query(
module="SubtensorModule",
storage_function="IdentitiesV2",
params=[coldkey_ss58],
block_hash=block_hash,
reuse_block_hash=reuse_block,
)

if not identity_info:
return None

try:
identity_data = getattr(identity_info, "value", identity_info)
return ChainIdentity.from_dict(
decode_hex_identity_dict(identity_info),
decode_hex_identity_dict(cast(dict[str, Any], identity_data)),
)
except TypeError:
return None
Expand Down Expand Up @@ -6006,7 +6034,9 @@ async def handler(block_data: dict):
return True
return None

current_block = await self.substrate.get_block()
current_block = cast(Optional[dict[str, Any]], await self.substrate.get_block())
if current_block is None:
return False
current_block_hash = current_block.get("header", {}).get("hash")

if block is not None:
Expand Down
Loading