forked from opentensor/bittensor
-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathasync_subtensor.py
More file actions
5750 lines (5048 loc) · 234 KB
/
async_subtensor.py
File metadata and controls
5750 lines (5048 loc) · 234 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
709
710
711
712
713
714
715
716
717
718
719
720
721
722
723
724
725
726
727
728
729
730
731
732
733
734
735
736
737
738
739
740
741
742
743
744
745
746
747
748
749
750
751
752
753
754
755
756
757
758
759
760
761
762
763
764
765
766
767
768
769
770
771
772
773
774
775
776
777
778
779
780
781
782
783
784
785
786
787
788
789
790
791
792
793
794
795
796
797
798
799
800
801
802
803
804
805
806
807
808
809
810
811
812
813
814
815
816
817
818
819
820
821
822
823
824
825
826
827
828
829
830
831
832
833
834
835
836
837
838
839
840
841
842
843
844
845
846
847
848
849
850
851
852
853
854
855
856
857
858
859
860
861
862
863
864
865
866
867
868
869
870
871
872
873
874
875
876
877
878
879
880
881
882
883
884
885
886
887
888
889
890
891
892
893
894
895
896
897
898
899
900
901
902
903
904
905
906
907
908
909
910
911
912
913
914
915
916
917
918
919
920
921
922
923
924
925
926
927
928
929
930
931
932
933
934
935
936
937
938
939
940
941
942
943
944
945
946
947
948
949
950
951
952
953
954
955
956
957
958
959
960
961
962
963
964
965
966
967
968
969
970
971
972
973
974
975
976
977
978
979
980
981
982
983
984
985
986
987
988
989
990
991
992
993
994
995
996
997
998
999
1000
import asyncio
import copy
import ssl
from datetime import datetime, timezone
from functools import partial
from typing import cast, Optional, Any, Union, Iterable, TYPE_CHECKING
import asyncstdlib as a
import numpy as np
import scalecodec
from async_substrate_interface import AsyncSubstrateInterface
from async_substrate_interface.substrate_addons import RetryAsyncSubstrate
from async_substrate_interface.utils.storage import StorageKey
from bittensor_drand import get_encrypted_commitment
from bittensor_wallet.utils import SS58_FORMAT
from numpy.typing import NDArray
from scalecodec import GenericCall
from bittensor.core.chain_data import (
DelegateInfo,
DynamicInfo,
MetagraphInfo,
NeuronInfoLite,
NeuronInfo,
ProposalVoteData,
SelectiveMetagraphIndex,
StakeInfo,
SubnetHyperparameters,
SubnetIdentity,
SubnetInfo,
WeightCommitInfo,
decode_account_id,
)
from bittensor.core.chain_data.chain_identity import ChainIdentity
from bittensor.core.chain_data.delegate_info import DelegatedInfo
from bittensor.core.chain_data.utils import (
decode_block,
decode_metadata,
decode_revealed_commitment,
decode_revealed_commitment_with_hotkey,
)
from bittensor.core.config import Config
from bittensor.core.errors import ChainError, SubstrateRequestException
from bittensor.core.extrinsics.asyncex.children import (
root_set_pending_childkey_cooldown_extrinsic,
set_children_extrinsic,
)
from bittensor.core.extrinsics.asyncex.commit_reveal import commit_reveal_v3_extrinsic
from bittensor.core.extrinsics.asyncex.move_stake import (
transfer_stake_extrinsic,
swap_stake_extrinsic,
move_stake_extrinsic,
)
from bittensor.core.extrinsics.asyncex.registration import (
burned_register_extrinsic,
register_extrinsic,
register_subnet_extrinsic,
set_subnet_identity_extrinsic,
)
from bittensor.core.extrinsics.asyncex.root import (
set_root_weights_extrinsic,
root_register_extrinsic,
)
from bittensor.core.extrinsics.asyncex.serving import (
get_last_bonds_reset,
publish_metadata,
get_metadata,
)
from bittensor.core.extrinsics.asyncex.serving import serve_axon_extrinsic
from bittensor.core.extrinsics.asyncex.staking import (
add_stake_extrinsic,
add_stake_multiple_extrinsic,
)
from bittensor.core.extrinsics.asyncex.start_call import start_call_extrinsic
from bittensor.core.extrinsics.asyncex.take import (
decrease_take_extrinsic,
increase_take_extrinsic,
)
from bittensor.core.extrinsics.asyncex.transfer import transfer_extrinsic
from bittensor.core.extrinsics.asyncex.unstaking import (
unstake_all_extrinsic,
unstake_extrinsic,
unstake_multiple_extrinsic,
)
from bittensor.core.extrinsics.asyncex.weights import (
commit_weights_extrinsic,
set_weights_extrinsic,
reveal_weights_extrinsic,
)
from bittensor.core.metagraph import AsyncMetagraph
from bittensor.core.settings import version_as_int, TYPE_REGISTRY
from bittensor.core.types import ParamWithTypes, SubtensorMixin
from bittensor.utils import (
Certificate,
decode_hex_identity_dict,
format_error_message,
is_valid_ss58_address,
torch,
u16_normalized_float,
u64_normalized_float,
get_transfer_fn_params,
)
from bittensor.core.extrinsics.asyncex.liquidity import (
add_liquidity_extrinsic,
modify_liquidity_extrinsic,
remove_liquidity_extrinsic,
toggle_user_liquidity_extrinsic,
)
from bittensor.utils.balance import (
Balance,
fixed_to_float,
check_and_convert_to_balance,
)
from bittensor.utils import deprecated_message
from bittensor.utils.btlogging import logging
from bittensor.utils.liquidity import (
calculate_fees,
get_fees,
tick_to_price,
price_to_tick,
LiquidityPosition,
)
from bittensor.utils.weight_utils import (
generate_weight_hash,
convert_uids_and_weights,
U16_MAX,
)
if TYPE_CHECKING:
from async_substrate_interface.types import ScaleObj
from bittensor_wallet import Wallet
from bittensor.core.axon import Axon
from async_substrate_interface import AsyncQueryMapResult
class AsyncSubtensor(SubtensorMixin):
"""Asynchronous interface for interacting with the Bittensor blockchain.
This class provides a thin layer over the Substrate Interface, offering a collection of frequently-used calls for
querying blockchain data, managing stakes, registering neurons, and interacting with the Bittensor network.
"""
def __init__(
self,
network: Optional[str] = None,
config: Optional["Config"] = None,
log_verbose: bool = False,
fallback_endpoints: Optional[list[str]] = None,
retry_forever: bool = False,
_mock: bool = False,
archive_endpoints: Optional[list[str]] = None,
websocket_shutdown_timer: float = 5.0,
):
"""Initializes an AsyncSubtensor instance for blockchain interaction.
Arguments:
network: The network name or type to connect to (e.g., "finney", "test"). If ``None``, uses the default
network from config.
config: Configuration object for the AsyncSubtensor instance. If ``None``, uses the default configuration.
log_verbose: Enables or disables verbose logging. Defaults to ``False``.
fallback_endpoints: List of fallback endpoints to use if default or provided network is not available.
Defaults to ``None``.
retry_forever: Whether to retry forever on connection errors. Defaults to ``False``.
_mock: Whether this is a mock instance. Mainly for testing purposes. Defaults to ``False``.
archive_endpoints: Similar to fallback_endpoints, but specifically only archive nodes. Will be used in
cases where you are requesting a block that is too old for your current (presumably lite) node.
Defaults to ``None``.
websocket_shutdown_timer: Amount of time, in seconds, to wait after the last response from the chain to
close the connection. Defaults to ``5.0``.
Returns:
None
Raises:
ConnectionError: If unable to connect to the specified network.
ValueError: If invalid network or configuration parameters are provided.
Exception: Any other exceptions raised during setup or configuration.
Typical usage example:
import bittensor as bt
import asyncio
async def main():
async with bt.AsyncSubtensor(network="finney") as subtensor:
block_hash = await subtensor.get_block_hash()
asyncio.run(main())
"""
if config is None:
config = AsyncSubtensor.config()
self._config = copy.deepcopy(config)
self.chain_endpoint, self.network = AsyncSubtensor.setup_config(
network, self._config
)
self.log_verbose = log_verbose
self._check_and_log_network_settings()
logging.debug(
f"Connecting to network: [blue]{self.network}[/blue], "
f"chain_endpoint: [blue]{self.chain_endpoint}[/blue]..."
)
self.substrate = self._get_substrate(
fallback_endpoints=fallback_endpoints,
retry_forever=retry_forever,
_mock=_mock,
archive_endpoints=archive_endpoints,
ws_shutdown_timer=websocket_shutdown_timer,
)
if self.log_verbose:
logging.info(
f"Connected to {self.network} network and {self.chain_endpoint}."
)
async def close(self):
"""Closes the connection to the blockchain.
Use this to explicitly clean up resources and close the network connection instead of waiting for garbage
collection.
Returns:
None
Example:
subtensor = AsyncSubtensor(network="finney")
await subtensor.initialize()
# Use the subtensor...
balance = await subtensor.get_balance(address="5F...")
# Close when done
await subtensor.close()
"""
if self.substrate:
await self.substrate.close()
async def initialize(self):
"""Initializes the connection to the blockchain.
This method establishes the connection to the Bittensor blockchain and should be called after creating an
AsyncSubtensor instance before making any queries.
Returns:
AsyncSubtensor: The initialized instance (self) for method chaining.
Raises:
ConnectionError: If unable to connect to the blockchain due to timeout or connection refusal.
Example:
subtensor = AsyncSubtensor(network="finney")
# Initialize the connection
await subtensor.initialize()
# Now you can make queries
balance = await subtensor.get_balance(address="5F...")
# Or chain the initialization
subtensor = await AsyncSubtensor(network="finney").initialize()
"""
logging.info(
f"[magenta]Connecting to Substrate:[/magenta] [blue]{self}[/blue][magenta]...[/magenta]"
)
try:
await self.substrate.initialize()
return self
except TimeoutError:
logging.error(
f"[red]Error[/red]: Timeout occurred connecting to substrate."
f" Verify your chain and network settings: {self}"
)
raise ConnectionError
except (ConnectionRefusedError, ssl.SSLError) as error:
logging.error(
f"[red]Error[/red]: Connection refused when connecting to substrate. "
f"Verify your chain and network settings: {self}. Error: {error}"
)
raise ConnectionError
async def __aenter__(self):
logging.info(
f"[magenta]Connecting to Substrate:[/magenta] [blue]{self}[/blue][magenta]...[/magenta]"
)
try:
await self.substrate.initialize()
return self
except TimeoutError:
logging.error(
f"[red]Error[/red]: Timeout occurred connecting to substrate."
f" Verify your chain and network settings: {self}"
)
raise ConnectionError
except (ConnectionRefusedError, ssl.SSLError) as error:
logging.error(
f"[red]Error[/red]: Connection refused when connecting to substrate. "
f"Verify your chain and network settings: {self}. Error: {error}"
)
raise ConnectionError
async def __aexit__(self, exc_type, exc_val, exc_tb):
await self.substrate.close()
async def determine_block_hash(
self,
block: Optional[int],
block_hash: Optional[str] = None,
reuse_block: bool = False,
) -> Optional[str]:
"""Determine the appropriate block hash based on the provided parameters.
Ensures that only one of the block specification parameters is used and returns the appropriate block hash
for blockchain queries.
Arguments:
block: The block number to get the hash for. Do not specify if using block_hash or reuse_block.
block_hash: The hash of the blockchain block. Do not specify if using block or reuse_block.
reuse_block: Whether to reuse the last-used block hash. Do not set if using block or reuse_block.
Returns:
Optional[str]: The block hash if one can be determined, None otherwise.
Raises:
ValueError: If more than one of block, block_hash, or reuse_block is specified.
Example:
# Get hash for specific block
block_hash = await subtensor.determine_block_hash(block=1000000)
# Use provided block hash
hash = await subtensor.determine_block_hash(block_hash="0x1234...")
# Reuse last block hash
hash = await subtensor.determine_block_hash(reuse_block=True)
"""
# Ensure that only one of the parameters is specified.
if sum(bool(x) for x in [block, block_hash, reuse_block]) > 1:
raise ValueError(
"Only one of ``block``, ``block_hash``, or ``reuse_block`` can be specified."
)
# Return the appropriate value.
if block_hash:
return block_hash
if block:
return await self.get_block_hash(block)
return None
async def encode_params(
self,
call_definition: dict[str, list["ParamWithTypes"]],
params: Union[list[Any], dict[str, Any]],
) -> str:
"""Encodes parameters into a hex string using their type definitions.
This method takes a call definition (which specifies parameter types) and actual parameter values, then
encodes them into a hex string that can be used for blockchain transactions.
Arguments:
call_definition: A dictionary containing parameter type definitions. Should have a "params" key with a
list of parameter definitions.
params: The actual parameter values to encode. Can be either a list (for positional parameters) or a
dictionary (for named parameters).
Returns:
str: A hex-encoded string representation of the parameters.
Raises:
ValueError: If a required parameter is missing from the params dictionary.
Example:
# Define parameter types
call_def = {
"params": [
{"name": "amount", "type": "u64"},
{"name": "coldkey_ss58", "type": "str"}
]
}
# Encode parameters as a dictionary
params_dict = {
"amount": 1000000,
"coldkey_ss58": "5F..."
}
encoded = await subtensor.encode_params(call_definition=call_def, params=params_dict)
# Or encode as a list (positional)
params_list = [1000000, "5F..."]
encoded = await subtensor.encode_params(call_definition=call_def, params=params_list)
"""
param_data = scalecodec.ScaleBytes(b"")
for i, param in enumerate(call_definition["params"]):
scale_obj = await self.substrate.create_scale_object(param["type"])
if isinstance(params, list):
param_data += scale_obj.encode(params[i])
else:
if param["name"] not in params:
raise ValueError(f"Missing param {param['name']} in params dict.")
param_data += scale_obj.encode(params[param["name"]])
return param_data.to_hex()
async def get_hyperparameter(
self,
param_name: str,
netuid: int,
block: Optional[int] = None,
block_hash: Optional[str] = None,
reuse_block: bool = False,
) -> Optional[Any]:
"""Retrieves a specified hyperparameter for a specific subnet.
This method queries the blockchain for subnet-specific hyperparameters such as difficulty, tempo, immunity
period, and other network configuration values.
Arguments:
param_name: The name of the hyperparameter to retrieve (e.g., "Difficulty", "Tempo", "ImmunityPeriod").
netuid: The unique identifier of the subnet.
block: The block number at which to retrieve the hyperparameter. Do not specify if using block_hash or
reuse_block.
block_hash: The hash of the blockchain block for the query. Do not specify if using block or reuse_block.
reuse_block: Whether to reuse the last-used block hash. Do not set if using block_hash or block.
Returns:
The value of the specified hyperparameter if the subnet exists, None otherwise.
Example:
# Get difficulty for subnet 1
difficulty = await subtensor.get_hyperparameter(param_name="Difficulty", netuid=1)
# Get tempo at a specific block
tempo = await subtensor.get_hyperparameter(param_name="Tempo", netuid=1, block=1000000)
# Get immunity period using block hash
immunity = await subtensor.get_hyperparameter(param_name="ImmunityPeriod", netuid=1, block_hash="0x1234...")
"""
block_hash = await self.determine_block_hash(block, block_hash, reuse_block)
if not await self.subnet_exists(
netuid, block_hash=block_hash, reuse_block=reuse_block
):
logging.error(f"subnet {netuid} does not exist")
return None
result = await self.substrate.query(
module="SubtensorModule",
storage_function=param_name,
params=[netuid],
block_hash=block_hash,
reuse_block_hash=reuse_block,
)
return getattr(result, "value", result)
def _get_substrate(
self,
fallback_endpoints: Optional[list[str]] = None,
retry_forever: bool = False,
_mock: bool = False,
archive_endpoints: Optional[list[str]] = None,
ws_shutdown_timer: float = 5.0,
) -> Union[AsyncSubstrateInterface, RetryAsyncSubstrate]:
"""Creates the Substrate instance based on provided arguments.
This internal method creates either a standard AsyncSubstrateInterface or a RetryAsyncSubstrate depending on
the configuration parameters.
Arguments:
fallback_endpoints: List of fallback endpoints to use if default or provided network is not available.
Defaults to ``None``.
retry_forever: Whether to retry forever on connection errors. Defaults to ``False``.
_mock: Whether this is a mock instance. Mainly for testing purposes. Defaults to ``False``.
archive_endpoints: Similar to fallback_endpoints, but specifically only archive nodes. Will be used in
cases where you are requesting a block that is too old for your current (presumably lite) node. Defaults
to ``None``.
ws_shutdown_timer: Amount of time, in seconds, to wait after the last response from the chain to close the
connection.
Returns:
Either AsyncSubstrateInterface or RetryAsyncSubstrate.
"""
if fallback_endpoints or retry_forever or archive_endpoints:
return RetryAsyncSubstrate(
url=self.chain_endpoint,
fallback_chains=fallback_endpoints,
ss58_format=SS58_FORMAT,
type_registry=TYPE_REGISTRY,
retry_forever=retry_forever,
use_remote_preset=True,
chain_name="Bittensor",
_mock=_mock,
archive_nodes=archive_endpoints,
ws_shutdown_timer=ws_shutdown_timer,
)
return AsyncSubstrateInterface(
url=self.chain_endpoint,
ss58_format=SS58_FORMAT,
type_registry=TYPE_REGISTRY,
use_remote_preset=True,
chain_name="Bittensor",
_mock=_mock,
ws_shutdown_timer=ws_shutdown_timer,
)
# Subtensor queries ===========================================================================================
async def query_constant(
self,
module_name: str,
constant_name: str,
block: Optional[int] = None,
block_hash: Optional[str] = None,
reuse_block: bool = False,
) -> Optional["ScaleObj"]:
"""Retrieves a constant from the specified module on the Bittensor blockchain.
This function is used to access fixed values defined within the blockchain's modules, which are essential for
understanding the network's configuration and rules. These include include critical network parameters such as
inflation rates, consensus rules, or validation thresholds, providing a deeper understanding of the Bittensor
network's operational parameters.
Arguments:
module_name: The name of the module containing the constant (e.g., "Balances", "SubtensorModule").
constant_name: The name of the constant to retrieve (e.g., "ExistentialDeposit").
block: The blockchain block number at which to query the constant. Do not specify if using block_hash or
reuse_block.
block_hash: The hash of the blockchain block at which to query the constant. Do not specify if using
block or reuse_block.
reuse_block: Whether to reuse the blockchain block at which to query the constant. Defaults to ``False``.
Returns:
Optional[async_substrate_interface.types.ScaleObj]: The value of the constant if found, ``None`` otherwise.
Example:
# Get existential deposit constant
existential_deposit = await subtensor.query_constant(
module_name="Balances",
constant_name="ExistentialDeposit"
)
# Get constant at specific block
constant = await subtensor.query_constant(
module_name="SubtensorModule",
constant_name="SomeConstant",
block=1000000
)
"""
block_hash = await self.determine_block_hash(block, block_hash, reuse_block)
return await self.substrate.get_constant(
module_name=module_name,
constant_name=constant_name,
block_hash=block_hash,
reuse_block_hash=reuse_block,
)
async def query_map(
self,
module: str,
name: str,
block: Optional[int] = None,
block_hash: Optional[str] = None,
reuse_block: bool = False,
params: Optional[list] = None,
) -> "AsyncQueryMapResult":
"""Queries map storage from any module on the Bittensor blockchain.
This function retrieves data structures that represent key-value mappings, essential for accessing complex and
structured data within the blockchain modules.
Arguments:
module: The name of the module from which to query the map storage (e.g., "SubtensorModule", "System").
name: The specific storage function within the module to query (e.g., "Bonds", "Weights").
block: The blockchain block number at which to perform the query. Defaults to ``None`` (latest block).
block_hash: The hash of the block to retrieve the parameter from. Do not specify if using block or
reuse_block.
reuse_block: Whether to use the last-used block. Do not set if using block_hash or block. Defaults to
``False``.
params: Parameters to be passed to the query. Defaults to ``None``.
Returns:
AsyncQueryMapResult: A data structure representing the map storage if found, None otherwise.
Example:
# Query bonds for subnet 1
bonds = await subtensor.query_map(module="SubtensorModule", name="Bonds", params=[1])
# Query weights at specific block
weights = await subtensor.query_map(module="SubtensorModule", name="Weights", params=[1], block=1000000)
"""
block_hash = await self.determine_block_hash(block, block_hash, reuse_block)
result = await self.substrate.query_map(
module=module,
storage_function=name,
params=params,
block_hash=block_hash,
reuse_block_hash=reuse_block,
)
return result
async def query_map_subtensor(
self,
name: str,
block: Optional[int] = None,
block_hash: Optional[str] = None,
reuse_block: bool = False,
params: Optional[list] = None,
) -> "AsyncQueryMapResult":
"""Queries map storage from the Subtensor module on the Bittensor blockchain. This function is designed to
retrieve a map-like data structure, which can include various neuron-specific details or network-wide
attributes.
Arguments:
name: The name of the map storage function to query.
block: The blockchain block number at which to perform the query.
block_hash: The hash of the block to retrieve the parameter from. Do not specify if using block or
reuse_block.
reuse_block: Whether to use the last-used block. Do not set if using block_hash or block.
params: A list of parameters to pass to the query function.
Returns:
An object containing the map-like data structure, or ``None`` if not found.
This function is particularly useful for analyzing and understanding complex network structures and
relationships within the Bittensor ecosystem, such as interneuronal connections and stake distributions.
"""
block_hash = await self.determine_block_hash(block, block_hash, reuse_block)
return await self.substrate.query_map(
module="SubtensorModule",
storage_function=name,
params=params,
block_hash=block_hash,
reuse_block_hash=reuse_block,
)
async def query_module(
self,
module: str,
name: str,
block: Optional[int] = None,
block_hash: Optional[str] = None,
reuse_block: bool = False,
params: Optional[list] = None,
) -> Optional[Union["ScaleObj", Any]]:
"""Queries any module storage on the Bittensor blockchain with the specified parameters and block number.
This function is a generic query interface that allows for flexible and diverse data retrieval from various
blockchain modules.
Arguments:
module: The name of the module from which to query data.
name: The name of the storage function within the module.
block: The blockchain block number at which to perform the query.
block_hash: The hash of the block to retrieve the parameter from. Do not specify if using block or
reuse_block.
reuse_block: Whether to use the last-used block. Do not set if using block_hash or block.
params: A list of parameters to pass to the query function.
Returns:
An object containing the requested data if found, ``None`` otherwise.
This versatile query function is key to accessing a wide range of data and insights from different parts of the
Bittensor blockchain, enhancing the understanding and analysis of the network's state and dynamics.
"""
block_hash = await self.determine_block_hash(block, block_hash, reuse_block)
return await self.substrate.query(
module=module,
storage_function=name,
params=params,
block_hash=block_hash,
reuse_block_hash=reuse_block,
)
async def query_runtime_api(
self,
runtime_api: str,
method: str,
params: Optional[Union[list[Any], dict[str, Any]]],
block: Optional[int] = None,
block_hash: Optional[str] = None,
reuse_block: bool = False,
) -> Optional[Any]:
"""Queries the runtime API of the Bittensor blockchain, providing a way to interact with the underlying runtime
and retrieve data encoded in Scale Bytes format. This function is essential for advanced users who need to
interact with specific runtime methods and decode complex data types.
Arguments:
runtime_api: The name of the runtime API to query.
method: The specific method within the runtime API to call.
params: The parameters to pass to the method call.
block: the block number for this query. Do not specify if using block_hash or reuse_block.
block_hash: The hash of the blockchain block number at which to perform the query. Do not specify if using
block or reuse_block.
reuse_block: Whether to reuse the last-used block hash. Do not set if using block_hash or block.
Returns:
The decoded result from the runtime API call, or ``None`` if the call fails.
This function enables access to the deeper layers of the Bittensor blockchain, allowing for detailed and
specific interactions with the network's runtime environment.
"""
block_hash = await self.determine_block_hash(block, block_hash, reuse_block)
if not block_hash and reuse_block:
block_hash = self.substrate.last_block_hash
result = await self.substrate.runtime_call(
runtime_api, method, params, block_hash
)
return result.value
async def query_subtensor(
self,
name: str,
block: Optional[int] = None,
block_hash: Optional[str] = None,
reuse_block: bool = False,
params: Optional[list] = None,
) -> Optional[Union["ScaleObj", Any]]:
"""Queries named storage from the Subtensor module on the Bittensor blockchain. This function is used to
retrieve specific data or parameters from the blockchain, such as stake, rank, or other neuron-specific
attributes.
Arguments:
name: The name of the storage function to query.
block: The blockchain block number at which to perform the query.
block_hash: The hash of the block to retrieve the parameter from. Do not specify if using block or
reuse_block.
reuse_block: Whether to use the last-used block. Do not set if using block_hash or block.
params: A list of parameters to pass to the query function.
Returns:
query_response: An object containing the requested data.
This query function is essential for accessing detailed information about the network and its neurons, providing
valuable insights into the state and dynamics of the Bittensor ecosystem.
"""
block_hash = await self.determine_block_hash(block, block_hash, reuse_block)
return await self.substrate.query(
module="SubtensorModule",
storage_function=name,
params=params,
block_hash=block_hash,
reuse_block_hash=reuse_block,
)
async def state_call(
self,
method: str,
data: str,
block: Optional[int] = None,
block_hash: Optional[str] = None,
reuse_block: bool = False,
) -> dict[Any, Any]:
"""Makes a state call to the Bittensor blockchain, allowing for direct queries of the blockchain's state.
This function is typically used for advanced queries that require specific method calls and data inputs.
Arguments:
method: The method name for the state call.
data: The data to be passed to the method.
block: The blockchain block number at which to perform the state call.
block_hash: The hash of the block to retrieve the parameter from. Do not specify if using block or
reuse_block.
reuse_block: Whether to use the last-used block. Do not set if using block_hash or block.
Returns:
result (dict[Any, Any]): The result of the rpc call.
The state call function provides a more direct and flexible way of querying blockchain data, useful for specific
use cases where standard queries are insufficient.
"""
block_hash = await self.determine_block_hash(block, block_hash, reuse_block)
return await self.substrate.rpc_request(
method="state_call",
params=[method, data],
block_hash=block_hash,
reuse_block_hash=reuse_block,
)
# Common subtensor methods =========================================================================================
@property
async def block(self):
"""Provides an asynchronous property to retrieve the current block."""
return await self.get_current_block()
async def all_subnets(
self,
block_number: Optional[int] = None,
block_hash: Optional[str] = None,
reuse_block: bool = False,
) -> Optional[list[DynamicInfo]]:
"""Queries the blockchain for comprehensive information about all subnets, including their dynamic parameters
and operational status.
Arguments:
block_number: The block number to query the subnet information from. Do not specify if using block_hash or
reuse_block.
block_hash: The hash of the blockchain block number for the query. Do not specify if using reuse_block or
block.
reuse_block: Whether to reuse the last-used blockchain block hash. Do not set if using block_hash or block.
Returns:
Optional[list[DynamicInfo]]: A list of DynamicInfo objects, each containing detailed information about a
subnet, or None if the query fails.
Example:
# Get all subnets at current block
subnets = await subtensor.all_subnets()
"""
block_hash = await self.determine_block_hash(
block=block_number, block_hash=block_hash, reuse_block=reuse_block
)
if not block_hash and reuse_block:
block_hash = self.substrate.last_block_hash
query, subnet_prices = await asyncio.gather(
self.substrate.runtime_call(
api="SubnetInfoRuntimeApi",
method="get_all_dynamic_info",
block_hash=block_hash,
),
self.get_subnet_prices(block_hash=block_hash),
return_exceptions=True,
)
decoded = query.decode()
if not isinstance(subnet_prices, (SubstrateRequestException, ValueError)):
for sn in decoded:
sn.update(
{"price": subnet_prices.get(sn["netuid"], Balance.from_tao(0))}
)
else:
logging.warning(
f"Unable to fetch subnet prices for block {block_number}, block hash {block_hash}: {subnet_prices}"
)
return DynamicInfo.list_from_dicts(decoded)
async def blocks_since_last_step(
self,
netuid: int,
block: Optional[int] = None,
block_hash: Optional[str] = None,
reuse_block: bool = False,
) -> Optional[int]:
"""Queries the blockchain to determine how many blocks have passed since the last epoch step for a specific
subnet.
Arguments:
netuid: The unique identifier of the subnetwork.
block: The block number for this query. Do not specify if using block_hash or reuse_block.
block_hash: The hash of the blockchain block number for the query. Do not specify if using reuse_block or
block.
reuse_block: Whether to reuse the last-used blockchain block hash. Do not set if using block_hash or block.
Returns:
The number of blocks since the last step in the subnet, or None if the query fails.
Example:
# Get blocks since last step for subnet 1
blocks = await subtensor.blocks_since_last_step(netuid=1)
# Get blocks since last step at specific block
blocks = await subtensor.blocks_since_last_step(netuid=1, block=1000000)
"""
query = await self.query_subtensor(
name="BlocksSinceLastStep",
block=block,
block_hash=block_hash,
reuse_block=reuse_block,
params=[netuid],
)
return query.value if query is not None and hasattr(query, "value") else query
async def blocks_since_last_update(self, netuid: int, uid: int) -> Optional[int]:
"""Returns the number of blocks since the last update, or ``None`` if the subnetwork or UID does not exist.
Arguments:
netuid: The unique identifier of the subnetwork.
uid: The unique identifier of the neuron.
Returns:
Optional[int]: The number of blocks since the last update, or None if the subnetwork or UID does not exist.
Example:
# Get blocks since last update for UID 5 in subnet 1
blocks = await subtensor.blocks_since_last_update(netuid=1, uid=5)
# Check if neuron needs updating
blocks_since_update = await subtensor.blocks_since_last_update(netuid=1, uid=10)
"""
call = await self.get_hyperparameter(param_name="LastUpdate", netuid=netuid)
return None if call is None else await self.get_current_block() - int(call[uid])
async def bonds(
self,
netuid: int,
block: Optional[int] = None,
block_hash: Optional[str] = None,
reuse_block: bool = False,
) -> list[tuple[int, list[tuple[int, int]]]]:
"""Retrieves the bond distribution set by subnet validators within a specific subnet.
Bonds represent the "investment" a subnet validator has made in evaluating a specific subnet miner. This
bonding mechanism is integral to the Yuma Consensus' design intent of incentivizing high-quality performance
by subnet miners, and honest evaluation by subnet validators.
Arguments:
netuid: The unique identifier of the subnet.
block: The block number for this query. Do not specify if using block_hash or reuse_block.
block_hash: The hash of the block for the query. Do not specify if using reuse_block or block.
reuse_block: Whether to reuse the last-used block hash. Do not set if using block_hash or block.
Returns:
List of tuples mapping each neuron's UID to its bonds with other neurons.
Example:
# Get bonds for subnet 1 at block 1000000
bonds = await subtensor.bonds(netuid=1, block=1000000)
Notes:
- See <https://docs.learnbittensor.org/glossary#validator-miner-bonds>
- See <https://docs.learnbittensor.org/glossary#yuma-consensus>
"""
block_hash = await self.determine_block_hash(block, block_hash, reuse_block)
b_map_encoded = await self.substrate.query_map(
module="SubtensorModule",
storage_function="Bonds",
params=[netuid],
block_hash=block_hash,
reuse_block_hash=reuse_block,
)
b_map = []
async for uid, b in b_map_encoded:
if b.value is not None:
b_map.append((uid, b.value))
return b_map
async def commit(
self, wallet: "Wallet", netuid: int, data: str, period: Optional[int] = None
) -> bool:
"""Commits arbitrary data to the Bittensor network by publishing metadata.
This method allows neurons to publish arbitrary data to the blockchain, which can be used for various purposes
such as sharing model updates, configuration data, or other network-relevant information.
Arguments:
wallet: The wallet associated with the neuron committing the data.
netuid: The unique identifier of the subnet.
data: The data to be committed to the network.
period: The number of blocks during which the transaction will remain valid after it's submitted. If the
transaction is not included in a block within that number of blocks, it will expire and be rejected. You
can think of it as an expiration date for the transaction.
Returns:
bool: True if the commit was successful, False otherwise.
Example:
# Commit some data to subnet 1
success = await subtensor.commit(wallet=my_wallet, netuid=1, data="Hello Bittensor!")
# Commit with custom period
success = await subtensor.commit(wallet=my_wallet, netuid=1, data="Model update v2.0", period=100)
Note: See <https://docs.learnbittensor.org/glossary#commit-reveal>
"""
return await publish_metadata(
subtensor=self,
wallet=wallet,
netuid=netuid,
data_type=f"Raw{len(data)}",
data=data.encode(),
period=period,
)
set_commitment = commit
async def commit_reveal_enabled(
self,
netuid: int,
block: Optional[int] = None,
block_hash: Optional[str] = None,
reuse_block: bool = False,
) -> bool:
"""Check if commit-reveal mechanism is enabled for a given subnet at a specific block.
The commit reveal feature is designed to solve the weight-copying problem by giving would-be weight-copiers
access only to stale weights. Copying stale weights should result in subnet validators departing from consensus.
Arguments:
netuid: The unique identifier of the subnet for which to check the commit-reveal mechanism.
block: The block number to query. Do not specify if using block_hash or reuse_block.
block_hash: The block hash at which to check the parameter. Do not set if using block or reuse_block.
reuse_block: Whether to reuse the last-used block hash. Do not set if using block_hash or block.
Returns:
bool: True if commit-reveal mechanism is enabled, False otherwise.
Example:
# Check if commit-reveal is enabled for subnet 1