diff --git a/bittensor/core/async_subtensor.py b/bittensor/core/async_subtensor.py index 273c8b8c4c..2a5fe6d3d9 100644 --- a/bittensor/core/async_subtensor.py +++ b/bittensor/core/async_subtensor.py @@ -161,6 +161,7 @@ ) from bittensor.utils.balance import ( Balance, + FixedPoint, check_balance_amount, fixed_to_float, ) @@ -1064,7 +1065,7 @@ async def blocks_since_last_step( reuse_block=reuse_block, params=[netuid], ) - return query.value if query is not None and hasattr(query, "value") else query + return cast(Optional[int], getattr(query, "value", query)) async def blocks_since_last_update( self, @@ -1328,13 +1329,12 @@ async def get_admin_freeze_window( - """ block_hash = await self.determine_block_hash(block, block_hash, reuse_block) - return ( - await self.substrate.query( - module="SubtensorModule", - storage_function="AdminFreezeWindow", - block_hash=block_hash, - ) - ).value + query = await self.substrate.query( + module="SubtensorModule", + storage_function="AdminFreezeWindow", + block_hash=block_hash, + ) + return cast(int, getattr(query, "value", query)) async def get_all_subnets_info( self, @@ -1929,7 +1929,11 @@ async def get_children_pending( ), reuse_block_hash=reuse_block, ) - children, cooldown = response.value + pending_value = getattr(response, "value", response) + children, cooldown = cast( + tuple[list[tuple[int, Any]], int], + pending_value, + ) return ( [ @@ -1978,8 +1982,10 @@ async def get_coldkey_swap_announcement( block_hash=block_hash, reuse_block_hash=reuse_block, ) + if query is None: + return None return ColdkeySwapAnnouncementInfo.from_query( - coldkey_ss58=coldkey_ss58, query=query + coldkey_ss58=coldkey_ss58, query=cast(ScaleObj, query) ) async def get_coldkey_swap_announcements( @@ -2050,7 +2056,8 @@ async def get_coldkey_swap_announcement_delay( block_hash=block_hash, reuse_block_hash=reuse_block, ) - return query.value if getattr(query, "value", None) else 0 + value = getattr(query, "value", query) + return cast(int, value) if value is not None else 0 async def get_coldkey_swap_reannouncement_delay( self, @@ -2083,7 +2090,8 @@ async def get_coldkey_swap_reannouncement_delay( block_hash=block_hash, reuse_block_hash=reuse_block, ) - return query.value if getattr(query, "value", None) else 0 + value = getattr(query, "value", query) + return cast(int, value) if value is not None else 0 async def get_coldkey_swap_dispute( self, @@ -2119,7 +2127,11 @@ async def get_coldkey_swap_dispute( block_hash=block_hash, reuse_block_hash=reuse_block, ) - return ColdkeySwapDisputeInfo.from_query(coldkey_ss58=coldkey_ss58, query=query) + if query is None: + return None + return ColdkeySwapDisputeInfo.from_query( + coldkey_ss58=coldkey_ss58, query=cast(ScaleObj, query) + ) async def get_coldkey_swap_disputes( self, @@ -2295,7 +2307,9 @@ async def get_commitment_metadata( block_hash=block_hash, reuse_block_hash=reuse_block, ) - return commit_data + if commit_data is None: + return "" + return cast(Union[str, dict], getattr(commit_data, "value", commit_data)) async def get_crowdloan_constants( self, @@ -2464,7 +2478,8 @@ async def get_crowdloan_next_id( storage_function="NextCrowdloanId", block_hash=block_hash, ) - return int(result.value or 0) + value = getattr(result, "value", result) + return int(value or 0) async def get_crowdloans( self, @@ -2826,7 +2841,7 @@ async def get_hotkey_owner( if hk_owner_query: exists = await self.does_hotkey_exist(hotkey_ss58, block_hash=block_hash) hotkey_owner = hk_owner_query if exists else None - return hotkey_owner + return cast(Optional[str], getattr(hotkey_owner, "value", hotkey_owner)) async def get_last_bonds_reset( self, @@ -3393,13 +3408,18 @@ async def get_mev_shield_submission( block_hash=block_hash, ) - if query is None or not isinstance(query, dict): + query_value = getattr(query, "value", query) + if query_value is None or not isinstance(query_value, dict): return None - autor = decode_account_id(query.get("author")) - commitment = bytes(query.get("commitment")[0]) - ciphertext = bytes(query.get("ciphertext")[0]) - submitted_in = query.get("submitted_in") + author_raw = cast(Union[bytes, str], query_value.get("author")) + commitment_raw = cast(list[bytes], query_value.get("commitment")) + ciphertext_raw = cast(list[bytes], query_value.get("ciphertext")) + submitted_in = cast(int, query_value.get("submitted_in")) + + autor = decode_account_id(author_raw) + commitment = bytes(commitment_raw[0]) + ciphertext = bytes(ciphertext_raw[0]) return { "author": autor, @@ -3841,7 +3861,8 @@ async def get_proxy_announcement( block_hash=block_hash, reuse_block_hash=reuse_block, ) - return ProxyAnnouncementInfo.from_dict(query.value[0]) + query_value = getattr(query, "value", query) + return ProxyAnnouncementInfo.from_dict(cast(list[Any], query_value)[0]) async def get_proxy_announcements( self, @@ -4057,9 +4078,11 @@ async def get_root_claim_type( block_hash=block_hash, reuse_block_hash=reuse_block, ) + query_value = getattr(query, "value", query) + claim_type = cast(dict[str, Any], query_value) # Query returns enum as dict: {"Swap": ()} or {"Keep": ()} or {"KeepSubnets": {"subnets": [1, 2, 3]}} - variant_name = next(iter(query.keys())) - variant_value = query[variant_name] + variant_name = next(iter(claim_type.keys())) + variant_value = claim_type[variant_name] # For simple variants (Swap, Keep), value is empty tuple, return string if not variant_value or variant_value == (): @@ -4105,7 +4128,8 @@ async def get_root_alpha_dividends_per_subnet( block_hash=block_hash, reuse_block_hash=reuse_block, ) - return Balance.from_rao(query.value).set_unit(netuid=netuid) + value = getattr(query, "value", query) + return Balance.from_rao(cast(int, value)).set_unit(netuid=netuid) async def get_root_claimable_rate( self, @@ -4173,7 +4197,8 @@ async def get_root_claimable_all_rates( block_hash=block_hash, reuse_block_hash=reuse_block, ) - bits_list = next(iter(query.value)) + query_value = getattr(query, "value", query) + bits_list = next(iter(cast(list[list[tuple[int, FixedPoint]]], query_value))) return {bits[0]: fixed_to_float(bits[1], frac_bits=32) for bits in bits_list} async def get_root_claimable_stake( @@ -4272,7 +4297,8 @@ async def get_root_claimed( block_hash=block_hash, reuse_block_hash=reuse_block, ) - return Balance.from_rao(query.value).set_unit(netuid=netuid) + value = getattr(query, "value", query) + return Balance.from_rao(cast(int, value)).set_unit(netuid=netuid) async def get_stake( self, @@ -4322,14 +4348,14 @@ async def get_stake( params=[hotkey_ss58, netuid], ) - hotkey_alpha: int = getattr(hotkey_alpha_result, "value", 0) + hotkey_alpha = getattr(hotkey_alpha_result, "value", hotkey_alpha_result) alpha_shares_as_float = fixed_to_float(alpha_shares) hotkey_shares_as_float = fixed_to_float(hotkey_shares) if hotkey_shares_as_float == 0: return Balance.from_rao(0).set_unit(netuid=netuid) - stake = alpha_shares_as_float / hotkey_shares_as_float * hotkey_alpha + stake = alpha_shares_as_float / hotkey_shares_as_float * cast(int, hotkey_alpha) return Balance.from_rao(int(stake)).set_unit(netuid=netuid) @@ -4582,7 +4608,7 @@ async def get_stake_weight( params=[netuid], block_hash=block_hash, ) - return [u16_normalized_float(w) for w in result] + return [u16_normalized_float(w) for w in cast(list[int], result or [])] async def get_start_call_delay( self, @@ -4741,13 +4767,14 @@ async def get_subnet_owner_hotkey( Returns: The hotkey of the subnet owner if available; None otherwise. """ - return await self.query_subtensor( + query = await self.query_subtensor( name="SubnetOwnerHotkey", params=[netuid], block=block, block_hash=block_hash, reuse_block=reuse_block, ) + return cast(Optional[str], getattr(query, "value", query)) async def get_subnet_price( self, @@ -4894,7 +4921,7 @@ async def get_subnet_validator_permits( block_hash=block_hash, reuse_block=reuse_block, ) - return query.value if query is not None and hasattr(query, "value") else query + return cast(Optional[list[bool]], getattr(query, "value", query)) async def get_timelocked_weight_commits( self, @@ -5115,12 +5142,15 @@ async def get_vote_data( network, particularly how proposals are received and acted upon by the governing body. """ block_hash = await self.determine_block_hash(block, block_hash, reuse_block) - vote_data: dict[str, Any] = await self.substrate.query( - module="Triumvirate", - storage_function="Voting", - params=[proposal_hash], - block_hash=block_hash, - reuse_block_hash=reuse_block, + vote_data = cast( + Optional[dict[str, Any]], + await self.substrate.query( + module="Triumvirate", + storage_function="Voting", + params=[proposal_hash], + block_hash=block_hash, + reuse_block_hash=reuse_block, + ), ) if vote_data is None: @@ -5160,7 +5190,7 @@ async def get_uid_for_hotkey_on_subnet( block_hash=block_hash, reuse_block_hash=reuse_block, ) - return getattr(result, "value", result) + return cast(Optional[int], getattr(result, "value", result)) async def filter_netuids_by_registered_hotkeys( self, @@ -5759,23 +5789,21 @@ async def query_identity( parameters. """ block_hash = await self.determine_block_hash(block, block_hash, reuse_block) - identity_info = cast( - dict, - await self.substrate.query( - module="SubtensorModule", - storage_function="IdentitiesV2", - params=[coldkey_ss58], - block_hash=block_hash, - reuse_block_hash=reuse_block, - ), + identity_info = await self.substrate.query( + module="SubtensorModule", + storage_function="IdentitiesV2", + params=[coldkey_ss58], + block_hash=block_hash, + reuse_block_hash=reuse_block, ) if not identity_info: return None try: + identity_data = getattr(identity_info, "value", identity_info) return ChainIdentity.from_dict( - decode_hex_identity_dict(identity_info), + decode_hex_identity_dict(cast(dict[str, Any], identity_data)), ) except TypeError: return None @@ -6006,7 +6034,9 @@ async def handler(block_data: dict): return True return None - current_block = await self.substrate.get_block() + current_block = cast(Optional[dict[str, Any]], await self.substrate.get_block()) + if current_block is None: + return False current_block_hash = current_block.get("header", {}).get("hash") if block is not None: diff --git a/bittensor/core/subtensor.py b/bittensor/core/subtensor.py index e2fcb0b25f..9e586ef1be 100644 --- a/bittensor/core/subtensor.py +++ b/bittensor/core/subtensor.py @@ -495,7 +495,7 @@ def _runtime_call_with_fallback( def get_hyperparameter( self, param_name: str, netuid: int, block: Optional[int] = None - ) -> Optional[Any]: + ) -> Any | None: """Retrieves a specified hyperparameter for a specific subnet. This method queries the blockchain for subnet-specific hyperparameters such as difficulty, tempo, immunity @@ -869,7 +869,7 @@ def blocks_since_last_step( query = self.query_subtensor( name="BlocksSinceLastStep", block=block, params=[netuid] ) - return query.value if query is not None and hasattr(query, "value") else query + return cast(Optional[int], getattr(query, "value", query)) def blocks_since_last_update( self, netuid: int, uid: int, block: Optional[int] = None @@ -1060,11 +1060,12 @@ def get_admin_freeze_window(self, block: Optional[int] = None) -> int: - """ - return self.substrate.query( + query = self.substrate.query( module="SubtensorModule", storage_function="AdminFreezeWindow", block_hash=self.determine_block_hash(block), - ).value + ) + return cast(int, getattr(query, "value", query)) def get_all_subnets_info(self, block: Optional[int] = None) -> list["SubnetInfo"]: """Retrieves detailed information about all subnets within the Bittensor network. @@ -1351,7 +1352,7 @@ def get_balance(self, address: str, block: Optional[int] = None) -> Balance: params=[address], block_hash=self.determine_block_hash(block), ) - return Balance(balance["data"]["free"]) + return Balance(cast(dict[str, Any], balance)["data"]["free"]) def get_balances( self, @@ -1546,12 +1547,16 @@ def get_children_pending( - """ - children, cooldown = self.substrate.query( + pending_query = self.substrate.query( module="SubtensorModule", storage_function="PendingChildKeys", params=[netuid, hotkey_ss58], block_hash=self.determine_block_hash(block), - ).value + ) + children, cooldown = cast( + tuple[list[tuple[int, Any]], int], + getattr(pending_query, "value", pending_query), + ) return ( [ @@ -1595,8 +1600,10 @@ def get_coldkey_swap_announcement( params=[coldkey_ss58], block_hash=block_hash, ) + if query is None: + return None return ColdkeySwapAnnouncementInfo.from_query( - coldkey_ss58=coldkey_ss58, query=query + coldkey_ss58=coldkey_ss58, query=cast(ScaleObj, query) ) def get_coldkey_swap_announcements( @@ -1654,7 +1661,8 @@ def get_coldkey_swap_announcement_delay( storage_function="ColdkeySwapAnnouncementDelay", block_hash=block_hash, ) - return query.value if getattr(query, "value", None) else 0 + value = getattr(query, "value", query) + return cast(int, value) if value is not None else 0 def get_coldkey_swap_constants( self, @@ -1728,7 +1736,8 @@ def get_coldkey_swap_reannouncement_delay( storage_function="ColdkeySwapReannouncementDelay", block_hash=block_hash, ) - return query.value if getattr(query, "value", None) else 0 + value = getattr(query, "value", query) + return cast(int, value) if value is not None else 0 def get_coldkey_swap_dispute( self, @@ -1759,7 +1768,11 @@ def get_coldkey_swap_dispute( params=[coldkey_ss58], block_hash=block_hash, ) - return ColdkeySwapDisputeInfo.from_query(coldkey_ss58=coldkey_ss58, query=query) + if query is None: + return None + return ColdkeySwapDisputeInfo.from_query( + coldkey_ss58=coldkey_ss58, query=cast(ScaleObj, query) + ) def get_coldkey_swap_disputes( self, @@ -1850,7 +1863,9 @@ def get_commitment_metadata( params=[netuid, hotkey_ss58], block_hash=self.determine_block_hash(block), ) - return commit_data + if commit_data is None: + return "" + return cast(Union[str, dict], getattr(commit_data, "value", commit_data)) def get_crowdloan_constants( self, @@ -1992,7 +2007,8 @@ def get_crowdloan_next_id( storage_function="NextCrowdloanId", block_hash=block_hash, ) - return int(result.value or 0) + value = cast(int, getattr(result, "value", result)) + return int(value) or 0 def get_crowdloans( self, @@ -2280,7 +2296,7 @@ def get_hotkey_owner( if hk_owner_query: exists = self.does_hotkey_exist(hotkey_ss58, block=block) hotkey_owner = hk_owner_query if exists else None - return hotkey_owner + return cast(Optional[str], getattr(hotkey_owner, "value", hotkey_owner)) def get_last_bonds_reset( self, netuid: int, hotkey_ss58: str, block: Optional[int] = None @@ -2793,13 +2809,18 @@ def get_mev_shield_submission( block_hash=block_hash, ) - if query is None or not isinstance(query, dict): + query_value = getattr(query, "value", query) + if query_value is None or not isinstance(query_value, dict): return None - autor = decode_account_id(query.get("author")) - commitment = bytes(query.get("commitment")[0]) - ciphertext = bytes(query.get("ciphertext")[0]) - submitted_in = query.get("submitted_in") + author_raw = cast(Union[bytes, str], query_value.get("author")) + commitment_raw = cast(list[bytes], query_value.get("commitment")) + ciphertext_raw = cast(list[bytes], query_value.get("ciphertext")) + submitted_in = cast(int, query_value.get("submitted_in")) + + autor = decode_account_id(author_raw) + commitment = bytes(commitment_raw[0]) + ciphertext = bytes(ciphertext_raw[0]) return { "author": autor, @@ -3167,7 +3188,8 @@ def get_proxy_announcement( params=[delegate_account_ss58], block_hash=block_hash, ) - return ProxyAnnouncementInfo.from_dict(query.value[0]) + query_value = getattr(query, "value", query) + return ProxyAnnouncementInfo.from_dict(cast(list[Any], query_value)[0]) def get_proxy_announcements( self, @@ -3357,9 +3379,11 @@ def get_root_claim_type( params=[coldkey_ss58], block_hash=self.determine_block_hash(block), ) + query_value = getattr(query, "value", query) + claim_type = cast(dict[str, Any], query_value) # Query returns enum as dict: {"Swap": ()} or {"Keep": ()} or {"KeepSubnets": {"subnets": [1, 2, 3]}} - variant_name = next(iter(query.keys())) - variant_value = query[variant_name] + variant_name = next(iter(claim_type.keys())) + variant_value = claim_type[variant_name] # For simple variants (Swap, Keep), value is empty tuple, return string if not variant_value or variant_value == (): @@ -3399,7 +3423,8 @@ def get_root_alpha_dividends_per_subnet( params=[netuid, hotkey_ss58], block_hash=self.determine_block_hash(block), ) - return Balance.from_rao(query.value).set_unit(netuid=netuid) + value = getattr(query, "value", query) + return Balance.from_rao(cast(int, value)).set_unit(netuid=netuid) def get_root_claimable_rate( self, @@ -3456,7 +3481,8 @@ def get_root_claimable_all_rates( params=[hotkey_ss58], block_hash=self.determine_block_hash(block), ) - bits_list = next(iter(query.value)) + query_value = getattr(query, "value", query) + bits_list = next(iter(cast(list[list[tuple[int, FixedPoint]]], query_value))) return {bits[0]: fixed_to_float(bits[1], frac_bits=32) for bits in bits_list} def get_root_claimable_stake( @@ -3538,7 +3564,8 @@ def get_root_claimed( params=[netuid, hotkey_ss58, coldkey_ss58], block_hash=self.determine_block_hash(block), ) - return Balance.from_rao(query.value).set_unit(netuid=netuid) + value = getattr(query, "value", query) + return Balance.from_rao(cast(int, value)).set_unit(netuid=netuid) def get_stake( self, @@ -3569,11 +3596,14 @@ def get_stake( ) alpha_shares = alpha_shares_query - hotkey_alpha_obj: ScaleObj = self.query_module( - module="SubtensorModule", - name="TotalHotkeyAlpha", - block=block, - params=[hotkey_ss58, netuid], + hotkey_alpha_obj = cast( + ScaleObj, + self.query_module( + module="SubtensorModule", + name="TotalHotkeyAlpha", + block=block, + params=[hotkey_ss58, netuid], + ), ) hotkey_alpha = hotkey_alpha_obj.value @@ -3773,7 +3803,7 @@ def get_stake_weight(self, netuid: int, block: Optional[int] = None) -> list[flo params=[netuid], block_hash=block_hash, ) - return [u16_normalized_float(w) for w in result] + return [u16_normalized_float(w) for w in cast(list[int], result or [])] def get_start_call_delay(self, block: Optional[int] = None) -> int: """ @@ -3891,9 +3921,10 @@ def get_subnet_owner_hotkey( Returns: The hotkey of the subnet owner if available; `None` otherwise. """ - return self.query_subtensor( + query = self.query_subtensor( name="SubnetOwnerHotkey", params=[netuid], block=block ) + return cast(Optional[str], getattr(query, "value", query)) def get_subnet_price( self, @@ -4013,7 +4044,7 @@ def get_subnet_validator_permits( params=[netuid], block=block, ) - return query.value if query is not None and hasattr(query, "value") else query + return cast(Optional[list[bool]], getattr(query, "value", query)) def get_timelocked_weight_commits( self, @@ -4180,11 +4211,14 @@ def get_vote_data( This function is important for tracking and understanding the decision-making processes within the Bittensor network, particularly how proposals are received and acted upon by the governing body. """ - vote_data: dict[str, Any] = self.substrate.query( - module="Triumvirate", - storage_function="Voting", - params=[proposal_hash], - block_hash=self.determine_block_hash(block), + vote_data = cast( + Optional[dict[str, Any]], + self.substrate.query( + module="Triumvirate", + storage_function="Voting", + params=[proposal_hash], + block_hash=self.determine_block_hash(block), + ), ) if vote_data is None: @@ -4215,7 +4249,7 @@ def get_uid_for_hotkey_on_subnet( params=[netuid, hotkey_ss58], block_hash=self.determine_block_hash(block), ) - return getattr(result, "value", result) + return cast(Optional[int], getattr(result, "value", result)) def filter_netuids_by_registered_hotkeys( self, @@ -4690,8 +4724,13 @@ def query_identity( return None try: + identity_data = ( + identity_info.value + if hasattr(identity_info, "value") + else identity_info + ) return ChainIdentity.from_dict( - decode_hex_identity_dict(identity_info), + decode_hex_identity_dict(cast(dict[str, Any], identity_data)), ) except TypeError: return None @@ -4847,7 +4886,9 @@ def handler(block_data: dict): return True return None - current_block = self.substrate.get_block() + current_block = cast(Optional[dict[str, Any]], self.substrate.get_block()) + if current_block is None: + return False current_block_hash = current_block.get("header", {}).get("hash") if block is not None: target_block = block @@ -5120,8 +5161,8 @@ def sign_and_send_extrinsic( raise ChainError.from_error(response_error_message) extrinsic_response.success = False - extrinsic_response.message = format_error_message(response_error_message) - extrinsic_response.error = response_error_message + extrinsic_response.message = format_error_message(response_error_message) # type: ignore + extrinsic_response.error = response_error_message # type: ignore return extrinsic_response except SubstrateRequestException as error: diff --git a/tests/e2e_tests/test_root_claim.py b/tests/e2e_tests/test_root_claim.py index 08a928ce02..77a87b30a2 100644 --- a/tests/e2e_tests/test_root_claim.py +++ b/tests/e2e_tests/test_root_claim.py @@ -214,7 +214,8 @@ async def test_root_claim_swap_async( netuid=root_sn.netuid ) ) - await async_subtensor.wait_for_block(block=next_epoch_start_block) + await async_subtensor.wait_for_block(block=next_epoch_start_block + 4) + charlie_root_stake = await async_subtensor.staking.get_stake( coldkey_ss58=charlie_wallet.coldkey.ss58_address, hotkey_ss58=alice_wallet.hotkey.ss58_address, @@ -534,6 +535,8 @@ async def test_root_claim_keep_with_zero_num_root_auto_claims_async( ) assert response.success, response.message + await async_subtensor.wait_for_block(await async_subtensor.block + 4) + # === Check Charlie after manual claim === claimed_after_charlie = await async_subtensor.staking.get_root_claimed( coldkey_ss58=charlie_wallet.coldkey.ss58_address,