diff --git a/async_substrate_interface/async_substrate.py b/async_substrate_interface/async_substrate.py index 3b96e66..5899057 100644 --- a/async_substrate_interface/async_substrate.py +++ b/async_substrate_interface/async_substrate.py @@ -23,7 +23,7 @@ ) from bt_decode import MetadataV15, PortableRegistry, decode as decode_by_type_string -from scalecodec.base import ScaleBytes, ScaleType, RuntimeConfigurationObject +from scalecodec.base import ScaleBytes, ScaleType from scalecodec.types import ( GenericCall, GenericExtrinsic, @@ -34,13 +34,11 @@ from websockets.asyncio.client import connect from websockets.exceptions import ConnectionClosed, WebSocketException -from async_substrate_interface.const import SS58_FORMAT from async_substrate_interface.errors import ( SubstrateRequestException, ExtrinsicNotFound, BlockNotFound, MaxRetriesExceeded, - MetadataAtVersionNotFound, StateDiscardedError, ) from async_substrate_interface.protocols import Keypair @@ -65,6 +63,7 @@ from async_substrate_interface.utils.decoding import ( _determine_if_old_runtime_call, _bt_decode_to_dict_or_list, + legacy_scale_decode, ) from async_substrate_interface.utils.storage import StorageKey from async_substrate_interface.type_registry import _TYPE_REGISTRY @@ -755,6 +754,9 @@ def __init__( ws_shutdown_timer: how long after the last connection your websocket should close """ + super().__init__( + type_registry, type_registry_preset, use_remote_preset, ss58_format + ) self.max_retries = max_retries self.retry_timeout = retry_timeout self.chain_endpoint = url @@ -783,19 +785,12 @@ def __init__( } self.initialized = False self._forgettable_task = None - self.ss58_format = ss58_format self.type_registry = type_registry self.type_registry_preset = type_registry_preset self.runtime_cache = RuntimeCache() - self.runtime_config = RuntimeConfigurationObject( - ss58_format=self.ss58_format, implements_scale_info=True - ) self._nonces = {} self.metadata_version_hex = "0x0f000000" # v15 - self.reload_type_registry() self._initializing = False - self.registry_type_map = {} - self.type_id_to_name = {} self._mock = _mock async def __aenter__(self): @@ -812,13 +807,50 @@ async def initialize(self): if not self._chain: chain = await self.rpc_request("system_chain", []) self._chain = chain.get("result") - await self.init_runtime() + runtime = await self.init_runtime() + if self.ss58_format is None: + # Check and apply runtime constants + ss58_prefix_constant = await self.get_constant( + "System", "SS58Prefix", runtime=runtime + ) + + if ss58_prefix_constant: + self.ss58_format = ss58_prefix_constant.value self.initialized = True self._initializing = False async def __aexit__(self, exc_type, exc_val, exc_tb): pass + @property + def metadata(self): + warnings.warn( + "Calling AsyncSubstrateInterface.metadata is deprecated, as metadata is runtime-dependent, and it" + "can be unclear which for runtime you seek the metadata. You should instead use the specific runtime's " + "metadata. For now, the most recently used runtime will be given.", + category=DeprecationWarning, + ) + runtime = self.runtime_cache.last_used + if not runtime or runtime.metadata is None: + raise AttributeError( + "Metadata not found. This generally indicates that the AsyncSubstrateInterface object " + "is not properly async initialized." + ) + else: + return runtime.metadata + + @property + def implements_scaleinfo(self) -> Optional[bool]: + """ + Returns True if most-recently-used runtime implements a `PortableRegistry` (`MetadataV14` and higher). Returns + `None` if no runtime has been loaded. + """ + runtime = self.runtime_cache.last_used + if runtime is not None: + return runtime.implements_scaleinfo + else: + return None + @property async def properties(self): if self._properties is None: @@ -857,8 +889,8 @@ async def name(self): async def get_storage_item( self, module: str, storage_function: str, block_hash: str = None ): - await self.init_runtime(block_hash=block_hash) - metadata_pallet = self.runtime.metadata.get_metadata_pallet(module) + runtime = await self.init_runtime(block_hash=block_hash) + metadata_pallet = runtime.metadata.get_metadata_pallet(module) storage_item = metadata_pallet.get_storage_function(storage_function) return storage_item @@ -875,7 +907,7 @@ async def _get_current_block_hash( async def _load_registry_at_block( self, block_hash: Optional[str] - ) -> tuple[MetadataV15, PortableRegistry]: + ) -> tuple[Optional[MetadataV15], Optional[PortableRegistry]]: # Should be called for any block that fails decoding. # Possibly the metadata was different. try: @@ -889,59 +921,38 @@ async def _load_registry_at_block( "Client error: Execution failed: Other: Exported method Metadata_metadata_at_version is not found" in e.args ): - raise MetadataAtVersionNotFound + return None, None else: raise e metadata_option_hex_str = metadata_rpc_result["result"] metadata_option_bytes = bytes.fromhex(metadata_option_hex_str[2:]) metadata = MetadataV15.decode_from_metadata_option(metadata_option_bytes) registry = PortableRegistry.from_metadata_v15(metadata) - self._load_registry_type_map(registry) return metadata, registry - async def _wait_for_registry(self, _attempt: int = 1, _retries: int = 3) -> None: - async def _waiter(): - while self.runtime.registry is None: - await asyncio.sleep(0.1) - return - - try: - if not self.runtime.registry: - await asyncio.wait_for(_waiter(), timeout=10) - except TimeoutError: - # indicates that registry was never loaded - if not self._initializing: - raise AttributeError( - "Registry was never loaded. This did not occur during initialization, which usually indicates " - "you must first initialize the AsyncSubstrateInterface object, either with " - "`await AsyncSubstrateInterface.initialize()` or running with `async with`" - ) - elif _attempt < _retries: - await self._load_registry_at_block(None) - return await self._wait_for_registry(_attempt + 1, _retries) - else: - raise AttributeError( - "Registry was never loaded. This occurred during initialization, which usually indicates a " - "connection or node error." - ) - async def encode_scale( - self, type_string, value: Any, _attempt: int = 1, _retries: int = 3 + self, + type_string, + value: Any, + block_hash: Optional[str] = None, + runtime: Optional[Runtime] = None, ) -> bytes: """ - Helper function to encode arbitrary data into SCALE-bytes for given RUST type_string + Helper function to encode arbitrary data into SCALE-bytes for given RUST type_string. If neither `block_hash` + nor `runtime` are supplied, the runtime of the current block will be used. Args: type_string: the type string of the SCALE object for decoding value: value to encode - _attempt: the current number of attempts to load the registry needed to encode the value - _retries: the maximum number of attempts to load the registry needed to encode the value + block_hash: hash of the block where the desired runtime is located. Ignored if supplying `runtime` + runtime: the runtime to use for the scale encoding. If supplied, `block_hash` is ignored Returns: encoded bytes """ - await self._wait_for_registry(_attempt, _retries) - return self._encode_scale(type_string, value) + if runtime is None: + runtime = await self.init_runtime(block_hash=block_hash) + return self._encode_scale(type_string, value, runtime=runtime) async def decode_scale( self, @@ -950,6 +961,9 @@ async def decode_scale( _attempt=1, _retries=3, return_scale_obj: bool = False, + block_hash: Optional[str] = None, + runtime: Optional[Runtime] = None, + force_legacy: bool = False, ) -> Union[ScaleObj, Any]: """ Helper function to decode arbitrary SCALE-bytes (e.g. 0x02000000) according to given RUST type_string @@ -962,6 +976,10 @@ async def decode_scale( _attempt: the number of attempts to pull the registry before timing out _retries: the number of retries to pull the registry before timing out return_scale_obj: Whether to return the decoded value wrapped in a SCALE-object-like wrapper, or raw. + block_hash: Hash of the block where the desired runtime is located. Ignored if supplying `runtime` + runtime: Optional Runtime object whose registry to use for decoding. If not specified, runtime will be + loaded based on the block hash specified (or latest block if no block_hash is specified) + force_legacy: Whether to explicitly use legacy Python-only decoding (non bt-decode). Returns: Decoded object @@ -970,36 +988,19 @@ async def decode_scale( return None if type_string == "scale_info::0": # Is an AccountId # Decode AccountId bytes to SS58 address - return ss58_encode(scale_bytes, SS58_FORMAT) + return ss58_encode(scale_bytes, self.ss58_format) else: - await self._wait_for_registry(_attempt, _retries) - obj = decode_by_type_string(type_string, self.runtime.registry, scale_bytes) + if not runtime: + runtime = await self.init_runtime(block_hash=block_hash) + if runtime.metadata_v15 is not None or force_legacy is True: + obj = decode_by_type_string(type_string, runtime.registry, scale_bytes) + else: + obj = legacy_scale_decode(type_string, scale_bytes, runtime) if return_scale_obj: return ScaleObj(obj) else: return obj - def load_runtime(self, runtime): - self.runtime = runtime - - # Update type registry - self.reload_type_registry(use_remote_preset=False, auto_discover=True) - - self.runtime_config.set_active_spec_version_id(runtime.runtime_version) - if self.implements_scaleinfo: - logger.debug("Add PortableRegistry from metadata to type registry") - self.runtime_config.add_portable_registry(runtime.metadata) - # Set runtime compatibility flags - try: - _ = self.runtime_config.create_scale_object("sp_weights::weight_v2::Weight") - self.config["is_weight_v2"] = True - self.runtime_config.update_type_registry_types( - {"Weight": "sp_weights::weight_v2::Weight"} - ) - except NotImplementedError: - self.config["is_weight_v2"] = False - self.runtime_config.update_type_registry_types({"Weight": "WeightV1"}) - async def init_runtime( self, block_hash: Optional[str] = None, block_id: Optional[int] = None ) -> Runtime: @@ -1023,10 +1024,16 @@ async def init_runtime( raise ValueError("Cannot provide block_hash and block_id at the same time") if block_id is not None: + if runtime := self.runtime_cache.retrieve(block=block_id): + return runtime block_hash = await self.get_block_hash(block_id) if not block_hash: block_hash = await self.get_chain_head() + else: + self.last_block_hash = block_hash + if runtime := self.runtime_cache.retrieve(block_hash=block_hash): + return runtime runtime_version = await self.get_block_runtime_version_for(block_hash) if runtime_version is None: @@ -1034,26 +1041,10 @@ async def init_runtime( f"No runtime information for block '{block_hash}'" ) - if self.runtime and runtime_version == self.runtime.runtime_version: - return self.runtime - - runtime = self.runtime_cache.retrieve(runtime_version=runtime_version) - if not runtime: - self.last_block_hash = block_hash - - runtime = await self.get_runtime_for_version(runtime_version, block_hash) - - self.load_runtime(runtime) - - if self.ss58_format is None: - # Check and apply runtime constants - ss58_prefix_constant = await self.get_constant( - "System", "SS58Prefix", block_hash=block_hash - ) - - if ss58_prefix_constant: - self.ss58_format = ss58_prefix_constant - return runtime + if runtime := self.runtime_cache.retrieve(runtime_version=runtime_version): + return runtime + else: + return await self.get_runtime_for_version(runtime_version, block_hash) @cached_fetcher(max_size=16, cache_key_index=0) async def get_runtime_for_version( @@ -1073,7 +1064,17 @@ async def get_runtime_for_version( async def _get_runtime_for_version( self, runtime_version: int, block_hash: Optional[str] = None ) -> Runtime: - runtime_block_hash = await self.get_parent_block_hash(block_hash) + if not block_hash: + block_hash, runtime_block_hash, block_number = await asyncio.gather( + self.get_chain_head(), + self.get_parent_block_hash(block_hash), + self.get_block_number(block_hash), + ) + else: + runtime_block_hash, block_number = await asyncio.gather( + self.get_parent_block_hash(block_hash), + self.get_block_number(block_hash), + ) runtime_info, metadata, (metadata_v15, registry) = await asyncio.gather( self.get_block_runtime_info(runtime_block_hash), self.get_block_metadata(block_hash=runtime_block_hash, decode=True), @@ -1084,20 +1085,33 @@ async def _get_runtime_for_version( raise SubstrateRequestException( f"No metadata for block '{runtime_block_hash}'" ) - logger.debug( - f"Retrieved metadata and metadata v15 for {runtime_version} from Substrate node" - ) - + if metadata_v15 is not None: + logger.debug( + f"Retrieved metadata and metadata v15 for {runtime_version} from Substrate node" + ) + else: + logger.debug( + f"Exported method Metadata_metadata_at_version is not found for {runtime_version}. This indicates the " + f"block is quite old, decoding for this block will use legacy Python decoding." + ) + implements_scale_info = metadata.portable_registry is not None runtime = Runtime( chain=self.chain, - runtime_config=self.runtime_config, + runtime_config=self._runtime_config_copy( + implements_scale_info=implements_scale_info + ), metadata=metadata, type_registry=self.type_registry, metadata_v15=metadata_v15, runtime_info=runtime_info, registry=registry, ) - self.runtime_cache.add_item(runtime_version=runtime_version, runtime=runtime) + self.runtime_cache.add_item( + block=block_number, + block_hash=block_hash, + runtime_version=runtime_version, + runtime=runtime, + ) return runtime async def create_storage_key( @@ -1119,14 +1133,14 @@ async def create_storage_key( Returns: StorageKey """ - await self.init_runtime(block_hash=block_hash) + runtime = await self.init_runtime(block_hash=block_hash) return StorageKey.create_from_storage_function( pallet, storage_function, params, - runtime_config=self.runtime_config, - metadata=self.runtime.metadata, + runtime_config=runtime.runtime_config, + metadata=runtime.metadata, ) async def subscribe_storage( @@ -1158,7 +1172,7 @@ async def subscription_handler(storage_key, obj, subscription_id): subscription_handler: coroutine function to handle value changes of subscription """ - await self.init_runtime() + runtime = await self.init_runtime() storage_key_map = {s.to_hex(): s for s in storage_keys} @@ -1202,6 +1216,7 @@ async def result_handler( updated_obj = await self.decode_scale( type_string=change_scale_type, scale_bytes=hex_to_bytes(change_data), + runtime=runtime, ) subscription_result = await subscription_handler( @@ -1253,36 +1268,45 @@ async def retrieve_pending_extrinsics(self) -> list: return extrinsics - async def get_metadata_storage_functions(self, block_hash=None) -> list: + async def get_metadata_storage_functions( + self, block_hash=None, runtime: Optional[Runtime] = None + ) -> list: """ Retrieves a list of all storage functions in metadata active at given block_hash (or chaintip if block_hash is omitted) Args: block_hash: hash of the blockchain block whose runtime to use + runtime: Optional `Runtime` whose metadata to use Returns: list of storage functions """ - await self.init_runtime(block_hash=block_hash) + if not runtime: + runtime = await self.init_runtime(block_hash=block_hash) storage_list = [] - for module_idx, module in enumerate(self.metadata.pallets): + for module_idx, module in enumerate(runtime.metadata.pallets): if module.storage: for storage in module.storage: storage_list.append( self.serialize_storage_item( storage_item=storage, module=module, - spec_version_id=self.runtime.runtime_version, + spec_version_id=runtime.runtime_version, + runtime=runtime, ) ) return storage_list async def get_metadata_storage_function( - self, module_name, storage_name, block_hash=None + self, + module_name, + storage_name, + block_hash=None, + runtime: Optional[Runtime] = None, ): """ Retrieves the details of a storage function for given module name, call function name and block_hash @@ -1291,47 +1315,57 @@ async def get_metadata_storage_function( module_name storage_name block_hash + runtime: Optional `Runtime` whose metadata to use Returns: Metadata storage function """ - await self.init_runtime(block_hash=block_hash) + if not runtime: + runtime = await self.init_runtime(block_hash=block_hash) - pallet = self.metadata.get_metadata_pallet(module_name) + pallet = runtime.metadata.get_metadata_pallet(module_name) if pallet: return pallet.get_storage_function(storage_name) async def get_metadata_errors( - self, block_hash=None + self, block_hash=None, runtime: Optional[Runtime] = None ) -> list[dict[str, Optional[str]]]: """ Retrieves a list of all errors in metadata active at given block_hash (or chaintip if block_hash is omitted) Args: block_hash: hash of the blockchain block whose metadata to use + runtime: Optional `Runtime` whose metadata to use Returns: list of errors in the metadata """ - await self.init_runtime(block_hash=block_hash) + if not runtime: + runtime = await self.init_runtime(block_hash=block_hash) error_list = [] - for module_idx, module in enumerate(self.runtime.metadata.pallets): + for module_idx, module in enumerate(runtime.metadata.pallets): if module.errors: for error in module.errors: error_list.append( self.serialize_module_error( module=module, error=error, - spec_version=self.runtime.runtime_version, + spec_version=runtime.runtime_version, ) ) return error_list - async def get_metadata_error(self, module_name, error_name, block_hash=None): + async def get_metadata_error( + self, + module_name: str, + error_name: str, + block_hash: Optional[str] = None, + runtime: Optional[Runtime] = None, + ): """ Retrieves the details of an error for given module name, call function name and block_hash @@ -1339,21 +1373,23 @@ async def get_metadata_error(self, module_name, error_name, block_hash=None): module_name: module name for the error lookup error_name: error name for the error lookup block_hash: hash of the blockchain block whose metadata to use + runtime: Optional `Runtime` whose metadata to use Returns: error """ - await self.init_runtime(block_hash=block_hash) + if not runtime: + runtime = await self.init_runtime(block_hash=block_hash) - for module_idx, module in enumerate(self.runtime.metadata.pallets): + for module_idx, module in enumerate(runtime.metadata.pallets): if module.name == module_name and module.errors: for error in module.errors: if error_name == error.name: return error async def get_metadata_runtime_call_functions( - self, block_hash: str = None + self, block_hash: str = None, runtime: Optional[Runtime] = None ) -> list[GenericRuntimeCallDefinition]: """ Get a list of available runtime API calls @@ -1361,83 +1397,61 @@ async def get_metadata_runtime_call_functions( Returns: list of runtime call functions """ - await self.init_runtime(block_hash=block_hash) + if not runtime: + runtime = await self.init_runtime(block_hash=block_hash) call_functions = [] - for api, methods in self.runtime_config.type_registry["runtime_api"].items(): + for api, methods in runtime.runtime_config.type_registry["runtime_api"].items(): for method in methods["methods"].keys(): call_functions.append( - await self.get_metadata_runtime_call_function(api, method) + await self.get_metadata_runtime_call_function( + api, method, runtime=runtime + ) ) return call_functions async def get_metadata_runtime_call_function( - self, api: str, method: str, block_hash: str = None - ) -> GenericRuntimeCallDefinition: - """ - Get details of a runtime API call - - Args: - api: Name of the runtime API e.g. 'TransactionPaymentApi' - method: Name of the method e.g. 'query_fee_details' - - Returns: - runtime call function - """ - await self.init_runtime(block_hash=block_hash) - - try: - runtime_call_def = self.runtime_config.type_registry["runtime_api"][api][ - "methods" - ][method] - runtime_call_def["api"] = api - runtime_call_def["method"] = method - runtime_api_types = self.runtime_config.type_registry["runtime_api"][ - api - ].get("types", {}) - except KeyError: - raise ValueError(f"Runtime API Call '{api}.{method}' not found in registry") - - # Add runtime API types to registry - self.runtime_config.update_type_registry_types(runtime_api_types) - - runtime_call_def_obj = await self.create_scale_object("RuntimeCallDefinition") - runtime_call_def_obj.encode(runtime_call_def) - - return runtime_call_def_obj - - async def get_metadata_runtime_call_function( - self, api: str, method: str + self, + api: str, + method: str, + block_hash: Optional[str] = None, + runtime: Optional[Runtime] = None, ) -> GenericRuntimeCallDefinition: """ - Get details of a runtime API call + Get details of a runtime API call. If not supplying `block_hash` or `runtime`, the runtime of the current block + will be used. Args: api: Name of the runtime API e.g. 'TransactionPaymentApi' method: Name of the method e.g. 'query_fee_details' + block_hash: Hash of the block whose runtime to use, if not specifying `runtime` + runtime: The `Runtime` object whose metadata to use. Returns: GenericRuntimeCallDefinition """ - await self.init_runtime(block_hash=block_hash) + if not runtime: + runtime = await self.init_runtime(block_hash=block_hash) try: - runtime_call_def = self.runtime_config.type_registry["runtime_api"][api][ + runtime_call_def = runtime.runtime_config.type_registry["runtime_api"][api][ "methods" ][method] runtime_call_def["api"] = api runtime_call_def["method"] = method - runtime_api_types = self.runtime_config.type_registry["runtime_api"][ + runtime_api_types = runtime.runtime_config.type_registry["runtime_api"][ api ].get("types", {}) except KeyError: raise ValueError(f"Runtime API Call '{api}.{method}' not found in registry") # Add runtime API types to registry - self.runtime_config.update_type_registry_types(runtime_api_types) + runtime.runtime_config.update_type_registry_types(runtime_api_types) - runtime_call_def_obj = await self.create_scale_object("RuntimeCallDefinition") + runtime_call_def_obj = await self.create_scale_object( + "RuntimeCallDefinition", runtime=runtime + ) runtime_call_def_obj.encode(runtime_call_def) return runtime_call_def_obj @@ -1452,7 +1466,7 @@ async def _get_block_handler( subscription_handler: Optional[Callable[[dict], Awaitable[Any]]] = None, ): try: - await self.init_runtime(block_hash=block_hash) + runtime = await self.init_runtime(block_hash=block_hash) except BlockNotFound: return None @@ -1467,15 +1481,15 @@ async def decode_block(block_data, block_data_hash=None) -> dict[str, Any]: block_data["header"]["number"], 16 ) - extrinsic_cls = self.runtime_config.get_decoder_class("Extrinsic") + extrinsic_cls = runtime.runtime_config.get_decoder_class("Extrinsic") if "extrinsics" in block_data: for idx, extrinsic_data in enumerate(block_data["extrinsics"]): try: extrinsic_decoder = extrinsic_cls( data=ScaleBytes(extrinsic_data), - metadata=self.runtime.metadata, - runtime_config=self.runtime_config, + metadata=runtime.metadata, + runtime_config=runtime.runtime_config, ) extrinsic_decoder.decode(check_remaining=True) block_data["extrinsics"][idx] = extrinsic_decoder @@ -1489,7 +1503,7 @@ async def decode_block(block_data, block_data_hash=None) -> dict[str, Any]: if isinstance(log_data, str): # Convert digest log from hex (backwards compatibility) try: - log_digest_cls = self.runtime_config.get_decoder_class( + log_digest_cls = runtime.runtime_config.get_decoder_class( "sp_runtime::generic::digest::DigestItem" ) @@ -1506,17 +1520,20 @@ async def decode_block(block_data, block_data_hash=None) -> dict[str, Any]: block_data["header"]["digest"]["logs"][idx] = log_digest if include_author and "PreRuntime" in log_digest.value: - if self.implements_scaleinfo: + if runtime.implements_scaleinfo: engine = bytes(log_digest[1][0]) # Retrieve validator set parent_hash = block_data["header"]["parentHash"] validator_set = await self.query( - "Session", "Validators", block_hash=parent_hash + "Session", + "Validators", + block_hash=parent_hash, + runtime=runtime, ) if engine == b"BABE": babe_predigest = ( - self.runtime_config.create_scale_object( + runtime.runtime_config.create_scale_object( type_string="RawBabePreDigest", data=ScaleBytes( bytes(log_digest[1][1]) @@ -1539,7 +1556,7 @@ async def decode_block(block_data, block_data_hash=None) -> dict[str, Any]: elif engine == b"aura": aura_predigest = ( - self.runtime_config.create_scale_object( + runtime.runtime_config.create_scale_object( type_string="RawAuraPreDigest", data=ScaleBytes( bytes(log_digest[1][1]) @@ -1568,6 +1585,7 @@ async def decode_block(block_data, block_data_hash=None) -> dict[str, Any]: "Session", "Validators", block_hash=block_hash, + runtime=runtime, ) rank_validator = log_digest.value["PreRuntime"][ "data" @@ -1626,19 +1644,24 @@ async def result_handler( ) ], result_handler=result_handler, + runtime=runtime, ) return result["_get_block_handler"][-1] else: if header_only: - response = await self.rpc_request("chain_getHeader", [block_hash]) + response = await self.rpc_request( + "chain_getHeader", [block_hash], runtime=runtime + ) return await decode_block( {"header": response["result"]}, block_data_hash=block_hash ) else: - response = await self.rpc_request("chain_getBlock", [block_hash]) + response = await self.rpc_request( + "chain_getBlock", [block_hash], runtime=runtime + ) return await decode_block( response["result"]["block"], block_data_hash=block_hash ) @@ -1887,11 +1910,16 @@ def convert_event_data(data): attributes = attributes_data if isinstance(attributes, dict): for key, value in attributes.items(): + if key == "who": + who = ss58_encode(bytes(value[0]), self.ss58_format) + attributes["who"] = who if isinstance(value, dict): # Convert nested single-key dictionaries to their keys as strings - sub_key = next(iter(value.keys())) - if value[sub_key] == (): - attributes[key] = sub_key + for sub_key, sub_value in value.items(): + if isinstance(sub_value, dict): + for sub_sub_key, sub_sub_value in sub_value.items(): + if sub_sub_value == (): + attributes[key][sub_key] = sub_sub_key # Create the converted dictionary converted = { @@ -1917,7 +1945,12 @@ def convert_event_data(data): ) if storage_obj: for item in list(storage_obj): - events.append(convert_event_data(item)) + try: + events.append(convert_event_data(item)) + except ( + AttributeError + ): # indicates this was legacy decoded with scalecodec + events.append(item) return events async def get_metadata(self, block_hash=None) -> MetadataV15: @@ -2061,13 +2094,16 @@ async def _preprocess( storage_function: str, module: str, raw_storage_key: Optional[bytes] = None, + runtime: Optional[Runtime] = None, ) -> Preprocessed: """ Creates a Preprocessed data object for passing to `_make_rpc_request` """ params = query_for if query_for else [] # Search storage call in metadata - metadata_pallet = self.runtime.metadata.get_metadata_pallet(module) + if not runtime: + runtime = self.runtime + metadata_pallet = runtime.metadata.get_metadata_pallet(module) if not metadata_pallet: raise SubstrateRequestException(f'Pallet "{module}" not found') @@ -2094,16 +2130,16 @@ async def _preprocess( pallet=module, storage_function=storage_function, value_scale_type=value_scale_type, - metadata=self.metadata, - runtime_config=self.runtime_config, + metadata=runtime.metadata, + runtime_config=runtime.runtime_config, ) else: storage_key = StorageKey.create_from_storage_function( module, storage_item.value["name"], params, - runtime_config=self.runtime_config, - metadata=self.runtime.metadata, + runtime_config=runtime.runtime_config, + metadata=runtime.metadata, ) method = "state_getStorageAt" return Preprocessed( @@ -2121,6 +2157,7 @@ async def _process_response( value_scale_type: Optional[str] = None, storage_item: Optional[ScaleType] = None, result_handler: Optional[ResultHandler] = None, + runtime: Optional[Runtime] = None, ) -> tuple[Any, bool]: """ Processes the RPC call response by decoding it, returning it as is, or setting a handler for subscriptions, @@ -2132,6 +2169,7 @@ async def _process_response( value_scale_type: Scale Type string used for decoding ScaleBytes results storage_item: The ScaleType object used for decoding ScaleBytes results result_handler: the result handler coroutine used for handling longer-running subscriptions + runtime: Optional Runtime to use for decoding. If not specified, the currently-loaded `self.runtime` is used Returns: (decoded response, completion) @@ -2153,7 +2191,7 @@ async def _process_response( q = bytes(query_value) else: q = query_value - result = await self.decode_scale(value_scale_type, q) + result = await self.decode_scale(value_scale_type, q, runtime=runtime) if asyncio.iscoroutinefunction(result_handler): # For multipart responses as a result of subscriptions. message, bool_result = await result_handler(result, subscription_id) @@ -2167,6 +2205,7 @@ async def _make_rpc_request( storage_item: Optional[ScaleType] = None, result_handler: Optional[ResultHandler] = None, attempt: int = 1, + runtime: Optional[Runtime] = None, ) -> RequestManager.RequestResults: request_manager = RequestManager(payloads) @@ -2210,6 +2249,7 @@ async def _make_rpc_request( value_scale_type, storage_item, result_handler, + runtime=runtime, ) request_manager.add_response( @@ -2269,6 +2309,7 @@ async def rpc_request( result_handler: Optional[ResultHandler] = None, block_hash: Optional[str] = None, reuse_block_hash: bool = False, + runtime: Optional[Runtime] = None, ) -> Any: """ Makes an RPC request to the subtensor. Use this only if `self.query` and `self.query_multiple` and @@ -2282,6 +2323,8 @@ async def rpc_request( hash in the params, and not reusing the block hash reuse_block_hash: whether to reuse the block hash in the params — only mark as True if not supplying the block hash in the params, or via the `block_hash` parameter + runtime: Optional runtime to be used for decoding results of the request. If not specified, the + currently-loaded `self.runtime` is used. Returns: the response from the RPC request @@ -2296,7 +2339,9 @@ async def rpc_request( params + [block_hash] if block_hash else params, ) ] - result = await self._make_rpc_request(payloads, result_handler=result_handler) + result = await self._make_rpc_request( + payloads, result_handler=result_handler, runtime=runtime + ) if "error" in result[payload_id][0]: if "Failed to get runtime version" in ( err_msg := result[payload_id][0]["error"]["message"] @@ -2304,9 +2349,14 @@ async def rpc_request( logger.warning( "Failed to get runtime. Re-fetching from chain, and retrying." ) - await self.init_runtime(block_hash=block_hash) + runtime = await self.init_runtime(block_hash=block_hash) return await self.rpc_request( - method, params, result_handler, block_hash, reuse_block_hash + method, + params, + result_handler, + block_hash, + reuse_block_hash, + runtime=runtime, ) elif ( "Client error: Api called for an unknown Block: State already discarded" @@ -2372,10 +2422,10 @@ async def compose_call( if call_params is None: call_params = {} - await self.init_runtime(block_hash=block_hash) + runtime = await self.init_runtime(block_hash=block_hash) - call = self.runtime_config.create_scale_object( - type_string="Call", metadata=self.runtime.metadata + call = runtime.runtime_config.create_scale_object( + type_string="Call", metadata=runtime.metadata ) call.encode( @@ -2395,6 +2445,7 @@ async def query_multiple( module: str, block_hash: Optional[str] = None, reuse_block_hash: bool = False, + runtime: Optional[Runtime] = None, ) -> dict[str, ScaleType]: """ Queries the subtensor. Only use this when making multiple queries, else use ``self.query`` @@ -2405,10 +2456,13 @@ async def query_multiple( block_hash = await self._get_current_block_hash(block_hash, reuse_block_hash) if block_hash: self.last_block_hash = block_hash - await self.init_runtime(block_hash=block_hash) + if not runtime: + runtime = await self.init_runtime(block_hash=block_hash) preprocessed: tuple[Preprocessed] = await asyncio.gather( *[ - self._preprocess([x], block_hash, storage_function, module) + self._preprocess( + [x], block_hash, storage_function, module, runtime=runtime + ) for x in params ] ) @@ -2421,14 +2475,17 @@ async def query_multiple( storage_item = preprocessed[0].storage_item responses = await self._make_rpc_request( - all_info, value_scale_type, storage_item + all_info, value_scale_type, storage_item, runtime=runtime ) return { param: responses[p.queryable][0] for (param, p) in zip(params, preprocessed) } async def query_multi( - self, storage_keys: list[StorageKey], block_hash: Optional[str] = None + self, + storage_keys: list[StorageKey], + block_hash: Optional[str] = None, + runtime: Optional[Runtime] = None, ) -> list: """ Query multiple storage keys in one request. @@ -2451,15 +2508,20 @@ async def query_multi( Args: storage_keys: list of StorageKey objects block_hash: hash of the block to query against + runtime: Optional `Runtime` to be used for decoding. If not specified, the currently-loaded `self.runtime` + is used. Returns: list of `(storage_key, scale_obj)` tuples """ - await self.init_runtime(block_hash=block_hash) + if not runtime: + runtime = await self.init_runtime(block_hash=block_hash) # Retrieve corresponding value response = await self.rpc_request( - "state_queryStorageAt", [[s.to_hex() for s in storage_keys], block_hash] + "state_queryStorageAt", + [[s.to_hex() for s in storage_keys], block_hash], + runtime=runtime, ) if "error" in response: @@ -2481,7 +2543,7 @@ async def query_multi( ( storage_key, await self.decode_scale( - storage_key.value_scale_type, change_data + storage_key.value_scale_type, change_data, runtime=runtime ), ), ) @@ -2493,6 +2555,7 @@ async def create_scale_object( type_string: str, data: Optional[ScaleBytes] = None, block_hash: Optional[str] = None, + runtime: Optional[Runtime] = None, **kwargs, ) -> "ScaleType": """ @@ -2503,16 +2566,19 @@ async def create_scale_object( type_string: Name of SCALE type to create data: ScaleBytes: ScaleBytes to decode block_hash: block hash for moment of decoding, when omitted the chain tip will be used + runtime: Optional `Runtime` to use for the creation of the scale object. If not specified, the + currently-loaded `self.runtime` will be used. kwargs: keyword args for the Scale Type constructor Returns: The created Scale Type object """ - await self.init_runtime(block_hash=block_hash) + if not runtime: + runtime = await self.init_runtime(block_hash=block_hash) if "metadata" not in kwargs: - kwargs["metadata"] = self.runtime.metadata + kwargs["metadata"] = runtime.metadata - return self.runtime.runtime_config.create_scale_object( + return runtime.runtime_config.create_scale_object( type_string, data=data, **kwargs ) @@ -2527,6 +2593,7 @@ async def generate_signature_payload( ) -> ScaleBytes: # Retrieve genesis hash genesis_hash = await self.get_block_hash(0) + runtime = await self.init_runtime(block_hash=None) if not era: era = "00" @@ -2536,7 +2603,7 @@ async def generate_signature_payload( block_hash = genesis_hash else: # Determine mortality of extrinsic - era_obj = self.runtime_config.create_scale_object("Era") + era_obj = runtime.runtime_config.create_scale_object("Era") if isinstance(era, dict) and "current" not in era and "phase" not in era: raise ValueError( @@ -2549,17 +2616,17 @@ async def generate_signature_payload( ) # Create signature payload - signature_payload = self.runtime_config.create_scale_object( + signature_payload = runtime.runtime_config.create_scale_object( "ExtrinsicPayloadValue" ) # Process signed extensions in metadata - if "signed_extensions" in self.runtime.metadata[1][1]["extrinsic"]: + if "signed_extensions" in runtime.metadata[1][1]["extrinsic"]: # Base signature payload signature_payload.type_mapping = [["call", "CallBytes"]] # Add signed extensions to payload - signed_extensions = self.runtime.metadata.get_signed_extensions() + signed_extensions = runtime.metadata.get_signed_extensions() if "CheckMortality" in signed_extensions: signature_payload.type_mapping.append( @@ -2648,10 +2715,10 @@ async def generate_signature_payload( "era": era, "nonce": nonce, "tip": tip, - "spec_version": self.runtime.runtime_version, + "spec_version": runtime.runtime_version, "genesis_hash": genesis_hash, "block_hash": block_hash, - "transaction_version": self.runtime.transaction_version, + "transaction_version": runtime.transaction_version, "asset_id": {"tip": tip, "asset_id": tip_asset_id}, "metadata_hash": None, "mode": "Disabled", @@ -2693,16 +2760,16 @@ async def create_signed_extrinsic( The signed Extrinsic """ # only support creating extrinsics for current block - await self.init_runtime(block_id=await self.get_block_number()) + runtime = await self.init_runtime() # Check requirements if not isinstance(call, GenericCall): raise TypeError("'call' must be of type Call") # Check if extrinsic version is supported - if self.runtime.metadata[1][1]["extrinsic"]["version"] != 4: # type: ignore + if runtime.metadata[1][1]["extrinsic"]["version"] != 4: # type: ignore raise NotImplementedError( - f"Extrinsic version {self.runtime.metadata[1][1]['extrinsic']['version']} not supported" # type: ignore + f"Extrinsic version {runtime.metadata[1][1]['extrinsic']['version']} not supported" # type: ignore ) # Retrieve nonce @@ -2746,7 +2813,7 @@ async def create_signed_extrinsic( # Create extrinsic extrinsic = self.runtime_config.create_scale_object( - type_string="Extrinsic", metadata=self.runtime.metadata + type_string="Extrinsic", metadata=runtime.metadata ) value = { @@ -2763,8 +2830,8 @@ async def create_signed_extrinsic( } # Check if ExtrinsicSignature is MultiSignature, otherwise omit signature_version - signature_cls = self.runtime_config.get_decoder_class("ExtrinsicSignature") - if issubclass(signature_cls, self.runtime_config.get_decoder_class("Enum")): + signature_cls = runtime.runtime_config.get_decoder_class("ExtrinsicSignature") + if issubclass(signature_cls, runtime.runtime_config.get_decoder_class("Enum")): value["signature_version"] = signature_version extrinsic.encode(value) @@ -2821,6 +2888,7 @@ async def _do_runtime_call_old( method: str, params: Optional[Union[list, dict]] = None, block_hash: Optional[str] = None, + runtime: Optional[Runtime] = None, ) -> ScaleType: logger.debug( f"Decoding old runtime call: {api}.{method} with params: {params} at block hash: {block_hash}" @@ -2851,10 +2919,14 @@ async def _do_runtime_call_old( # RPC request result_data = await self.rpc_request( - "state_call", [f"{api}_{method}", param_data.hex(), block_hash] + "state_call", + [f"{api}_{method}", param_data.hex(), block_hash], + runtime=runtime, ) result_vec_u8_bytes = hex_to_bytes(result_data["result"]) - result_bytes = await self.decode_scale("Vec", result_vec_u8_bytes) + result_bytes = await self.decode_scale( + "Vec", result_vec_u8_bytes, runtime=runtime + ) # Decode result # Get correct type @@ -2890,20 +2962,32 @@ async def runtime_call( params = {} try: - metadata_v15_value = runtime.metadata_v15.value() + if runtime.metadata_v15 is None: + _ = self.runtime_config.type_registry["runtime_api"][api]["methods"][ + method + ] + runtime_api_types = self.runtime_config.type_registry["runtime_api"][ + api + ].get("types", {}) + runtime.runtime_config.update_type_registry_types(runtime_api_types) + return await self._do_runtime_call_old( + api, method, params, block_hash, runtime=runtime + ) - apis = {entry["name"]: entry for entry in metadata_v15_value["apis"]} - api_entry = apis[api] - methods = {entry["name"]: entry for entry in api_entry["methods"]} - runtime_call_def = methods[method] + else: + metadata_v15_value = runtime.metadata_v15.value() + + apis = {entry["name"]: entry for entry in metadata_v15_value["apis"]} + api_entry = apis[api] + methods = {entry["name"]: entry for entry in api_entry["methods"]} + runtime_call_def = methods[method] + if _determine_if_old_runtime_call(runtime_call_def, metadata_v15_value): + return await self._do_runtime_call_old( + api, method, params, block_hash, runtime=runtime + ) except KeyError: raise ValueError(f"Runtime API Call '{api}.{method}' not found in registry") - if _determine_if_old_runtime_call(runtime_call_def, metadata_v15_value): - result = await self._do_runtime_call_old(api, method, params, block_hash) - - return result - if isinstance(params, list) and len(params) != len(runtime_call_def["inputs"]): raise ValueError( f"Number of parameter provided ({len(params)}) does not " @@ -2926,13 +3010,17 @@ async def runtime_call( # RPC request result_data = await self.rpc_request( - "state_call", [f"{api}_{method}", param_data.hex(), block_hash] + "state_call", + [f"{api}_{method}", param_data.hex(), block_hash], + runtime=runtime, ) output_type_string = f"scale_info::{runtime_call_def['output']}" # Decode result result_bytes = hex_to_bytes(result_data["result"]) - result_obj = ScaleObj(await self.decode_scale(output_type_string, result_bytes)) + result_obj = ScaleObj( + await self.decode_scale(output_type_string, result_bytes, runtime=runtime) + ) return result_obj @@ -2998,7 +3086,7 @@ async def get_metadata_constants(self, block_hash=None) -> list[dict]: constant_list = [] - for module_idx, module in enumerate(self.metadata.pallets): + for module_idx, module in enumerate(runtime.metadata.pallets): for constant in module.constants or []: constant_list.append( self.serialize_constant(constant, module, runtime.runtime_version) @@ -3006,7 +3094,13 @@ async def get_metadata_constants(self, block_hash=None) -> list[dict]: return constant_list - async def get_metadata_constant(self, module_name, constant_name, block_hash=None): + async def get_metadata_constant( + self, + module_name: str, + constant_name: str, + block_hash: Optional[str] = None, + runtime: Optional[Runtime] = None, + ): """ Retrieves the details of a constant for given module name, call function name and block_hash (or chaintip if block_hash is omitted) @@ -3015,13 +3109,15 @@ async def get_metadata_constant(self, module_name, constant_name, block_hash=Non module_name: name of the module you are querying constant_name: name of the constant you are querying block_hash: hash of the block at which to make the runtime API call + runtime: Runtime whose metadata you are querying. Returns: MetadataModuleConstants """ - await self.init_runtime(block_hash=block_hash) + if not runtime: + runtime = await self.init_runtime(block_hash=block_hash) - for module in self.runtime.metadata.pallets: + for module in runtime.metadata.pallets: if module_name == module.name and module.constants: for constant in module.constants: if constant_name == constant.value["name"]: @@ -3033,6 +3129,7 @@ async def get_constant( constant_name: str, block_hash: Optional[str] = None, reuse_block_hash: bool = False, + runtime: Optional[Runtime] = None, ) -> Optional[ScaleObj]: """ Returns the decoded `ScaleType` object of the constant for given module name, call function name and block_hash @@ -3043,18 +3140,22 @@ async def get_constant( constant_name: Name of the constant to query block_hash: Hash of the block at which to make the runtime API call reuse_block_hash: Reuse last-used block hash if set to true + runtime: Runtime to use for querying the constant Returns: ScaleType from the runtime call """ block_hash = await self._get_current_block_hash(block_hash, reuse_block_hash) constant = await self.get_metadata_constant( - module_name, constant_name, block_hash=block_hash + module_name, constant_name, block_hash=block_hash, runtime=runtime ) if constant: # Decode to ScaleType return await self.decode_scale( - constant.type, bytes(constant.constant_value), return_scale_obj=True + constant.type, + bytes(constant.constant_value), + return_scale_obj=True, + runtime=runtime, ) else: return None @@ -3114,14 +3215,14 @@ async def get_type_registry( Returns: dict mapping the type strings to the type decompositions """ - await self.init_runtime(block_hash=block_hash) + runtime = await self.init_runtime(block_hash=block_hash) - if not self.implements_scaleinfo: + if not runtime.implements_scaleinfo: raise NotImplementedError("MetadataV14 or higher runtimes is required") type_registry = {} - for scale_info_type in self.metadata.portable_registry["types"]: + for scale_info_type in runtime.metadata.portable_registry["types"]: if ( "path" in scale_info_type.value["type"] and len(scale_info_type.value["type"]["path"]) > 0 @@ -3163,21 +3264,21 @@ async def get_metadata_modules(self, block_hash=None) -> list[dict[str, Any]]: Returns: List of metadata modules """ - await self.init_runtime(block_hash=block_hash) + runtime = await self.init_runtime(block_hash=block_hash) return [ { "metadata_index": idx, "module_id": module.get_identifier(), "name": module.name, - "spec_version": self.runtime.runtime_version, + "spec_version": runtime.runtime_version, "count_call_functions": len(module.calls or []), "count_storage_functions": len(module.storage or []), "count_events": len(module.events or []), "count_constants": len(module.constants or []), "count_errors": len(module.errors or []), } - for idx, module in enumerate(self.metadata.pallets) + for idx, module in enumerate(runtime.metadata.pallets) ] async def get_metadata_module(self, name, block_hash=None) -> ScaleType: @@ -3191,9 +3292,9 @@ async def get_metadata_module(self, name, block_hash=None) -> ScaleType: Returns: MetadataModule """ - await self.init_runtime(block_hash=block_hash) + runtime = await self.init_runtime(block_hash=block_hash) - return self.metadata.get_metadata_pallet(name) + return runtime.metadata.get_metadata_pallet(name) async def query( self, @@ -3204,6 +3305,7 @@ async def query( raw_storage_key: Optional[bytes] = None, subscription_handler=None, reuse_block_hash: bool = False, + runtime: Optional[Runtime] = None, ) -> Optional[Union["ScaleObj", Any]]: """ Queries substrate. This should only be used when making a single request. For multiple requests, @@ -3212,9 +3314,15 @@ async def query( block_hash = await self._get_current_block_hash(block_hash, reuse_block_hash) if block_hash: self.last_block_hash = block_hash - await self.init_runtime(block_hash=block_hash) + if not runtime: + runtime = await self.init_runtime(block_hash=block_hash) preprocessed: Preprocessed = await self._preprocess( - params, block_hash, storage_function, module, raw_storage_key + params, + block_hash, + storage_function, + module, + raw_storage_key, + runtime=runtime, ) payload = [ self.make_payload( @@ -3229,6 +3337,7 @@ async def query( value_scale_type, storage_item, result_handler=subscription_handler, + runtime=runtime, ) result = responses[preprocessed.queryable][0] if isinstance(result, (list, tuple, int, float)): @@ -3286,7 +3395,7 @@ async def query_map( self.last_block_hash = block_hash runtime = await self.init_runtime(block_hash=block_hash) - metadata_pallet = self.runtime.metadata.get_metadata_pallet(module) + metadata_pallet = runtime.metadata.get_metadata_pallet(module) if not metadata_pallet: raise ValueError(f'Pallet "{module}" not found') storage_item = metadata_pallet.get_storage_function(storage_function) @@ -3313,8 +3422,8 @@ async def query_map( module, storage_item.value["name"], params, - runtime_config=self.runtime_config, - metadata=self.runtime.metadata, + runtime_config=runtime.runtime_config, + metadata=runtime.metadata, ) prefix = storage_key.to_hex() @@ -3329,6 +3438,7 @@ async def query_map( response = await self.rpc_request( method="state_getKeysPaged", params=[prefix, page_size, start_key, block_hash], + runtime=runtime, ) if "error" in response: @@ -3344,7 +3454,9 @@ async def query_map( # Retrieve corresponding value response = await self.rpc_request( - method="state_queryStorageAt", params=[result_keys, block_hash] + method="state_queryStorageAt", + params=[result_keys, block_hash], + runtime=runtime, ) if "error" in response: @@ -3600,9 +3712,9 @@ async def get_metadata_call_function( Returns: list of call functions """ - await self.init_runtime(block_hash=block_hash) + runtime = await self.init_runtime(block_hash=block_hash) - for pallet in self.runtime.metadata.pallets: + for pallet in runtime.metadata.pallets: if pallet.name == module_name and pallet.calls: for call in pallet.calls: if call.name == call_function_name: @@ -3624,7 +3736,7 @@ async def get_metadata_events(self, block_hash=None) -> list[dict]: event_list = [] - for event_index, (module, event) in self.metadata.event_index.items(): + for event_index, (module, event) in runtime.metadata.event_index.items(): event_list.append( self.serialize_module_event( module, event, runtime.runtime_version, event_index diff --git a/async_substrate_interface/sync_substrate.py b/async_substrate_interface/sync_substrate.py index b5148a8..8a6bf66 100644 --- a/async_substrate_interface/sync_substrate.py +++ b/async_substrate_interface/sync_substrate.py @@ -13,17 +13,15 @@ ss58_encode, MultiAccountId, ) -from scalecodec.base import RuntimeConfigurationObject, ScaleBytes, ScaleType +from scalecodec.base import ScaleBytes, ScaleType from websockets.sync.client import connect, ClientConnection from websockets.exceptions import ConnectionClosed -from async_substrate_interface.const import SS58_FORMAT from async_substrate_interface.errors import ( ExtrinsicNotFound, SubstrateRequestException, BlockNotFound, MaxRetriesExceeded, - MetadataAtVersionNotFound, StateDiscardedError, ) from async_substrate_interface.protocols import Keypair @@ -45,6 +43,7 @@ _determine_if_old_runtime_call, _bt_decode_to_dict_or_list, decode_query_map, + legacy_scale_decode, ) from async_substrate_interface.utils.storage import StorageKey from async_substrate_interface.type_registry import _TYPE_REGISTRY @@ -506,6 +505,9 @@ def __init__( _log_raw_websockets: whether to log raw websocket requests during RPC requests """ + super().__init__( + type_registry, type_registry_preset, use_remote_preset, ss58_format + ) self.max_retries = max_retries self.retry_timeout = retry_timeout self.chain_endpoint = url @@ -518,17 +520,10 @@ def __init__( "strict_scale_decode": True, } self.initialized = False - self.ss58_format = ss58_format self.type_registry = type_registry self.type_registry_preset = type_registry_preset self.runtime_cache = RuntimeCache() - self.runtime_config = RuntimeConfigurationObject( - ss58_format=self.ss58_format, implements_scale_info=True - ) self.metadata_version_hex = "0x0f000000" # v15 - self.reload_type_registry() - self.registry_type_map = {} - self.type_id_to_name = {} self._mock = _mock self.log_raw_websockets = _log_raw_websockets if not _mock: @@ -558,11 +553,28 @@ def initialize(self): chain = self.rpc_request("system_chain", []) self._chain = chain.get("result") self.init_runtime() + if self.ss58_format is None: + # Check and apply runtime constants + ss58_prefix_constant = self.get_constant( + "System", "SS58Prefix", block_hash=self.last_block_hash + ) + if ss58_prefix_constant: + self.ss58_format = ss58_prefix_constant.value self.initialized = True def __exit__(self, exc_type, exc_val, exc_tb): self.ws.close() + @property + def metadata(self): + if not self.runtime or self.runtime.metadata is None: + raise AttributeError( + "Metadata not found. This generally indicates that the AsyncSubstrateInterface object " + "is not properly async initialized." + ) + else: + return self.runtime.metadata + @property def properties(self): if self._properties is None: @@ -645,14 +657,13 @@ def _load_registry_at_block(self, block_hash: Optional[str]) -> MetadataV15: "Client error: Execution failed: Other: Exported method Metadata_metadata_at_version is not found" in e.args ): - raise MetadataAtVersionNotFound + return None, None else: raise e metadata_option_hex_str = metadata_rpc_result["result"] metadata_option_bytes = bytes.fromhex(metadata_option_hex_str[2:]) metadata = MetadataV15.decode_from_metadata_option(metadata_option_bytes) registry = PortableRegistry.from_metadata_v15(metadata) - self._load_registry_type_map(registry) return metadata, registry def decode_scale( @@ -676,9 +687,14 @@ def decode_scale( """ if type_string == "scale_info::0": # Is an AccountId # Decode AccountId bytes to SS58 address - return ss58_encode(scale_bytes, SS58_FORMAT) + return ss58_encode(scale_bytes, self.ss58_format) else: - obj = decode_by_type_string(type_string, self.runtime.registry, scale_bytes) + if self.runtime.metadata_v15 is not None: + obj = decode_by_type_string( + type_string, self.runtime.registry, scale_bytes + ) + else: + obj = legacy_scale_decode(type_string, scale_bytes, self.runtime) if return_scale_obj: return ScaleObj(obj) else: @@ -688,21 +704,21 @@ def load_runtime(self, runtime): self.runtime = runtime # Update type registry - self.reload_type_registry(use_remote_preset=False, auto_discover=True) + self.runtime.reload_type_registry(use_remote_preset=False, auto_discover=True) self.runtime_config.set_active_spec_version_id(runtime.runtime_version) - if self.implements_scaleinfo: + if self.runtime.implements_scaleinfo: logger.debug("Add PortableRegistry from metadata to type registry") self.runtime_config.add_portable_registry(runtime.metadata) # Set runtime compatibility flags try: _ = self.runtime_config.create_scale_object("sp_weights::weight_v2::Weight") - self.config["is_weight_v2"] = True + self.runtime.config["is_weight_v2"] = True self.runtime_config.update_type_registry_types( {"Weight": "sp_weights::weight_v2::Weight"} ) except NotImplementedError: - self.config["is_weight_v2"] = False + self.runtime.config["is_weight_v2"] = False self.runtime_config.update_type_registry_types({"Weight": "WeightV1"}) def init_runtime( @@ -727,11 +743,25 @@ def init_runtime( if block_id and block_hash: raise ValueError("Cannot provide block_hash and block_id at the same time") - if block_id: + if block_id is not None: + if runtime := self.runtime_cache.retrieve(block=block_id): + self.runtime = runtime + self.runtime.load_runtime() + if self.runtime.registry: + self.runtime.load_registry_type_map() + return self.runtime block_hash = self.get_block_hash(block_id) if not block_hash: block_hash = self.get_chain_head() + else: + self.last_block_hash = block_hash + if runtime := self.runtime_cache.retrieve(block_hash=block_hash): + self.runtime = runtime + self.runtime.load_runtime() + if self.runtime.registry: + self.runtime.load_registry_type_map() + return self.runtime runtime_version = self.get_block_runtime_version_for(block_hash) if runtime_version is None: @@ -742,58 +772,75 @@ def init_runtime( if self.runtime and runtime_version == self.runtime.runtime_version: return self.runtime - runtime = self.runtime_cache.retrieve(runtime_version=runtime_version) - if not runtime: - self.last_block_hash = block_hash + if runtime := self.runtime_cache.retrieve(runtime_version=runtime_version): + self.runtime = runtime + self.runtime.load_runtime() + if self.runtime.registry: + self.runtime.load_registry_type_map() + return runtime + else: + self.runtime = self.get_runtime_for_version(runtime_version, block_hash) + self.runtime.load_runtime() + if self.runtime.registry: + self.runtime.load_registry_type_map() + return self.runtime - runtime_block_hash = self.get_parent_block_hash(block_hash) + def get_runtime_for_version( + self, runtime_version: int, block_hash: Optional[str] = None + ) -> Runtime: + """ + Retrieves the `Runtime` for a given runtime version at a given block hash. + Args: + runtime_version: version of the runtime (from `get_block_runtime_version_for`) + block_hash: hash of the block to query - runtime_info = self.get_block_runtime_info(runtime_block_hash) + Returns: + Runtime object for the given runtime version + """ + if not block_hash: + block_hash = self.get_chain_head() + runtime_block_hash = self.get_parent_block_hash(block_hash) + block_number = self.get_block_number(block_hash) + runtime_info = self.get_block_runtime_info(runtime_block_hash) - metadata = self.get_block_metadata( - block_hash=runtime_block_hash, decode=True - ) - if metadata is None: - # does this ever happen? - raise SubstrateRequestException( - f"No metadata for block '{runtime_block_hash}'" - ) - logger.debug( - "Retrieved metadata for {} from Substrate node".format(runtime_version) + metadata = self.get_block_metadata(block_hash=runtime_block_hash, decode=True) + if metadata is None: + # does this ever happen? + raise SubstrateRequestException( + f"No metadata for block '{runtime_block_hash}'" ) + logger.debug( + "Retrieved metadata for {} from Substrate node".format(runtime_version) + ) - metadata_v15, registry = self._load_registry_at_block( - block_hash=runtime_block_hash - ) + metadata_v15, registry = self._load_registry_at_block( + block_hash=runtime_block_hash + ) + if metadata_v15 is not None: logger.debug( - "Retrieved metadata v15 for {} from Substrate node".format( - runtime_version - ) + f"Retrieved metadata and metadata v15 for {runtime_version} from Substrate node" ) - - runtime = Runtime( - chain=self.chain, - runtime_config=self.runtime_config, - metadata=metadata, - type_registry=self.type_registry, - metadata_v15=metadata_v15, - runtime_info=runtime_info, - registry=registry, - ) - self.runtime_cache.add_item( - runtime_version=runtime_version, runtime=runtime - ) - - self.load_runtime(runtime) - - if self.ss58_format is None: - # Check and apply runtime constants - ss58_prefix_constant = self.get_constant( - "System", "SS58Prefix", block_hash=block_hash + else: + logger.debug( + f"Exported method Metadata_metadata_at_version is not found for {runtime_version}. This indicates the " + f"block is quite old, decoding for this block will use legacy Python decoding." ) - if ss58_prefix_constant: - self.ss58_format = ss58_prefix_constant + runtime = Runtime( + chain=self.chain, + runtime_config=self.runtime_config, + metadata=metadata, + type_registry=self.type_registry, + metadata_v15=metadata_v15, + runtime_info=runtime_info, + registry=registry, + ) + self.runtime_cache.add_item( + block=block_number, + block_hash=block_hash, + runtime_version=runtime_version, + runtime=runtime, + ) return runtime def create_storage_key( @@ -1069,6 +1116,7 @@ def get_metadata_runtime_call_function( Args: api: Name of the runtime API e.g. 'TransactionPaymentApi' method: Name of the method e.g. 'query_fee_details' + block_hash: block hash whose metadata to query Returns: runtime call function @@ -1095,41 +1143,6 @@ def get_metadata_runtime_call_function( return runtime_call_def_obj - def get_metadata_runtime_call_function( - self, api: str, method: str - ) -> GenericRuntimeCallDefinition: - """ - Get details of a runtime API call - - Args: - api: Name of the runtime API e.g. 'TransactionPaymentApi' - method: Name of the method e.g. 'query_fee_details' - - Returns: - GenericRuntimeCallDefinition - """ - self.init_runtime() - - try: - runtime_call_def = self.runtime_config.type_registry["runtime_api"][api][ - "methods" - ][method] - runtime_call_def["api"] = api - runtime_call_def["method"] = method - runtime_api_types = self.runtime_config.type_registry["runtime_api"][ - api - ].get("types", {}) - except KeyError: - raise ValueError(f"Runtime API Call '{api}.{method}' not found in registry") - - # Add runtime API types to registry - self.runtime_config.update_type_registry_types(runtime_api_types) - - runtime_call_def_obj = self.create_scale_object("RuntimeCallDefinition") - runtime_call_def_obj.encode(runtime_call_def) - - return runtime_call_def_obj - def _get_block_handler( self, block_hash: str, @@ -1194,7 +1207,7 @@ def decode_block(block_data, block_data_hash=None) -> dict[str, Any]: block_data["header"]["digest"]["logs"][idx] = log_digest if include_author and "PreRuntime" in log_digest.value: - if self.implements_scaleinfo: + if self.runtime.implements_scaleinfo: engine = bytes(log_digest[1][0]) # Retrieve validator set parent_hash = block_data["header"]["parentHash"] @@ -1569,11 +1582,16 @@ def convert_event_data(data): attributes = attributes_data if isinstance(attributes, dict): for key, value in attributes.items(): + if key == "who": + who = ss58_encode(bytes(value[0]), self.ss58_format) + attributes["who"] = who if isinstance(value, dict): # Convert nested single-key dictionaries to their keys as strings - sub_key = next(iter(value.keys())) - if value[sub_key] == (): - attributes[key] = sub_key + for sub_key, sub_value in value.items(): + if isinstance(sub_value, dict): + for sub_sub_key, sub_sub_value in sub_value.items(): + if sub_sub_value == (): + attributes[key][sub_key] = sub_sub_key # Create the converted dictionary converted = { @@ -1599,7 +1617,12 @@ def convert_event_data(data): ) if storage_obj: for item in list(storage_obj): - events.append(convert_event_data(item)) + try: + events.append(convert_event_data(item)) + except ( + AttributeError + ): # indicates this was legacy decoded with scalecodec + events.append(item) return events def get_metadata(self, block_hash=None) -> MetadataV15: @@ -2520,20 +2543,28 @@ def runtime_call( params = {} try: - metadata_v15_value = runtime.metadata_v15.value() + if runtime.metadata_v15 is None: + _ = self.runtime_config.type_registry["runtime_api"][api]["methods"][ + method + ] + runtime_api_types = self.runtime_config.type_registry["runtime_api"][ + api + ].get("types", {}) + runtime.runtime_config.update_type_registry_types(runtime_api_types) + return self._do_runtime_call_old(api, method, params, block_hash) + else: + metadata_v15_value = runtime.metadata_v15.value() + + apis = {entry["name"]: entry for entry in metadata_v15_value["apis"]} + api_entry = apis[api] + methods = {entry["name"]: entry for entry in api_entry["methods"]} + runtime_call_def = methods[method] + if _determine_if_old_runtime_call(runtime_call_def, metadata_v15_value): + return self._do_runtime_call_old(api, method, params, block_hash) - apis = {entry["name"]: entry for entry in metadata_v15_value["apis"]} - api_entry = apis[api] - methods = {entry["name"]: entry for entry in api_entry["methods"]} - runtime_call_def = methods[method] except KeyError: raise ValueError(f"Runtime API Call '{api}.{method}' not found in registry") - if _determine_if_old_runtime_call(runtime_call_def, metadata_v15_value): - result = self._do_runtime_call_old(api, method, params, block_hash) - - return result - if isinstance(params, list) and len(params) != len(runtime_call_def["inputs"]): raise ValueError( f"Number of parameter provided ({len(params)}) does not " @@ -2545,13 +2576,15 @@ def runtime_call( for idx, param in enumerate(runtime_call_def["inputs"]): param_type_string = f"scale_info::{param['ty']}" if isinstance(params, list): - param_data += self.encode_scale(param_type_string, params[idx]) + param_data += self.encode_scale( + param_type_string, params[idx], runtime=runtime + ) else: if param["name"] not in params: raise ValueError(f"Runtime Call param '{param['name']}' is missing") param_data += self.encode_scale( - param_type_string, params[param["name"]] + param_type_string, params[param["name"]], runtime=runtime ) # RPC request @@ -2733,7 +2766,7 @@ def get_type_registry(self, block_hash: str = None, max_recursion: int = 4) -> d """ self.init_runtime(block_hash=block_hash) - if not self.implements_scaleinfo: + if not self.runtime.implements_scaleinfo: raise NotImplementedError("MetadataV14 or higher runtimes is required") type_registry = {} @@ -2895,7 +2928,6 @@ def query_map( Returns: QueryMapResult object """ - hex_to_bytes_ = hex_to_bytes params = params or [] block_hash = self._get_current_block_hash(block_hash, reuse_block_hash) if block_hash: diff --git a/async_substrate_interface/types.py b/async_substrate_interface/types.py index e29e30c..1d330f5 100644 --- a/async_substrate_interface/types.py +++ b/async_substrate_interface/types.py @@ -15,19 +15,33 @@ from .const import SS58_FORMAT from .utils import json - logger = logging.getLogger("async_substrate_interface") class RuntimeCache: + """ + Cache that holds all the Runtime objects used by AsyncSubstrateInterface and SubstrateInterface. See the docstring + for Runtime for more information about Runtime objects specifically. + + For SubstrateInterface (sync), this serves purely as a quick way of retrieving a previously loaded Runtime. For + AsyncSubstrateInterface, this is very important, as, while it does the same as for SubstrateInterface, it also + serves as an easy way for a user to fetch a Runtime whose registry or metadata they wish to utilize in some way. + + The `last_used` attribute is always updated with the most recently inserted or retrieved Runtime object. If you're + querying numerous blocks at once with different runtimes, and you wish to use the metadata or registry directly, it + is important you are utilizing the correct version. + """ + blocks: dict[int, "Runtime"] block_hashes: dict[str, "Runtime"] versions: dict[int, "Runtime"] + last_used: Optional["Runtime"] def __init__(self): self.blocks = {} self.block_hashes = {} self.versions = {} + self.last_used = None def add_item( self, @@ -35,7 +49,11 @@ def add_item( block: Optional[int] = None, block_hash: Optional[str] = None, runtime_version: Optional[int] = None, - ): + ) -> None: + """ + Adds a Runtime object to the cache mapped to its version, block number, and/or block hash. + """ + self.last_used = runtime if block is not None: self.blocks[block] = runtime if block_hash is not None: @@ -49,18 +67,35 @@ def retrieve( block_hash: Optional[str] = None, runtime_version: Optional[int] = None, ) -> Optional["Runtime"]: + """ + Retrieves a Runtime object from the cache, using the key of its block number, block hash, or runtime version. + Retrieval happens in this order. If no Runtime is found mapped to any of your supplied keys, returns `None`. + """ if block is not None: - return self.blocks.get(block) - elif block_hash is not None: - return self.block_hashes.get(block_hash) - elif runtime_version is not None: - return self.versions.get(runtime_version) - else: - return None + runtime = self.blocks.get(block) + if runtime is not None: + self.last_used = runtime + return runtime + if block_hash is not None: + runtime = self.block_hashes.get(block_hash) + if runtime is not None: + self.last_used = runtime + return runtime + if runtime_version is not None: + runtime = self.versions.get(runtime_version) + if runtime is not None: + self.last_used = runtime + return runtime + return None class Runtime: - runtime_version = None + """ + The Runtime object holds the necessary metadata and registry information required to do necessary scale encoding and + decoding. Currently only Metadata V15 is supported for decoding, though we plan to release legacy decoding options. + """ + + runtime_version: Optional[int] = None transaction_version = None cache_region = None metadata = None @@ -69,10 +104,12 @@ class Runtime: runtime_info = None type_registry_preset = None registry: Optional[PortableRegistry] = None + registry_type_map: dict[str, int] + type_id_to_name: dict[int, str] def __init__( self, - chain, + chain: str, runtime_config: RuntimeConfigurationObject, metadata, type_registry, @@ -90,97 +127,230 @@ def __init__( self.registry = registry self.runtime_version = runtime_info.get("specVersion") self.transaction_version = runtime_info.get("transactionVersion") + self.load_runtime() + if registry is not None: + self.load_registry_type_map() + + def load_runtime(self): + """ + Initial loading of the runtime's type registry information. + """ + # Update type registry + self.reload_type_registry(use_remote_preset=False, auto_discover=True) + + self.runtime_config.set_active_spec_version_id(self.runtime_version) + if self.implements_scaleinfo: + logger.debug("Adding PortableRegistry from metadata to type registry") + self.runtime_config.add_portable_registry(self.metadata) + # Set runtime compatibility flags + try: + _ = self.runtime_config.create_scale_object("sp_weights::weight_v2::Weight") + self.config["is_weight_v2"] = True + self.runtime_config.update_type_registry_types( + {"Weight": "sp_weights::weight_v2::Weight"} + ) + except NotImplementedError: + self.config["is_weight_v2"] = False + self.runtime_config.update_type_registry_types({"Weight": "WeightV1"}) + + @property + def implements_scaleinfo(self) -> Optional[bool]: + """ + Returns True if current runtime implements a `PortableRegistry` (`MetadataV14` and higher) + """ + if self.metadata: + return self.metadata.portable_registry is not None + else: + return None def __str__(self): return f"Runtime: {self.chain} | {self.config}" + def reload_type_registry( + self, use_remote_preset: bool = True, auto_discover: bool = True + ): + """ + Reload type registry and preset used to instantiate the SubstrateInterface object. Useful to periodically apply + changes in type definitions when a runtime upgrade occurred + + Args: + use_remote_preset: When True preset is downloaded from Github master, otherwise use files from local + installed scalecodec package + auto_discover: Whether to automatically discover the type registry presets based on the chain name and the + type registry + """ + self.runtime_config.clear_type_registry() + + self.runtime_config.implements_scale_info = self.implements_scaleinfo + + # Load metadata types in runtime configuration + self.runtime_config.update_type_registry(load_type_registry_preset(name="core")) + self.apply_type_registry_presets( + use_remote_preset=use_remote_preset, auto_discover=auto_discover + ) + + def apply_type_registry_presets( + self, + use_remote_preset: bool = True, + auto_discover: bool = True, + ): + """ + Applies type registry presets to the runtime + + Args: + use_remote_preset: whether to use presets from remote + auto_discover: whether to use presets from local installed scalecodec package + """ + if self.type_registry_preset is not None: + # Load type registry according to preset + type_registry_preset_dict = load_type_registry_preset( + name=self.type_registry_preset, use_remote_preset=use_remote_preset + ) + + if not type_registry_preset_dict: + raise ValueError( + f"Type registry preset '{self.type_registry_preset}' not found" + ) + + elif auto_discover: + # Try to auto discover type registry preset by chain name + type_registry_name = self.chain.lower().replace(" ", "-") + try: + type_registry_preset_dict = load_type_registry_preset( + type_registry_name + ) + self.type_registry_preset = type_registry_name + except ValueError: + type_registry_preset_dict = None + + else: + type_registry_preset_dict = None + + if type_registry_preset_dict: + # Load type registries in runtime configuration + if self.implements_scaleinfo is False: + # Only runtime with no embedded types in metadata need the default set of explicit defined types + self.runtime_config.update_type_registry( + load_type_registry_preset( + "legacy", use_remote_preset=use_remote_preset + ) + ) + + if self.type_registry_preset != "legacy": + self.runtime_config.update_type_registry(type_registry_preset_dict) + + if self.type_registry: + # Load type registries in runtime configuration + self.runtime_config.update_type_registry(self.type_registry) + + def load_registry_type_map(self) -> None: + """ + Loads the runtime's type mapping according to registry + """ + registry_type_map = {} + type_id_to_name = {} + types = json.loads(self.registry.registry)["types"] + type_by_id = {entry["id"]: entry for entry in types} + + # Pass 1: Gather simple types + for type_entry in types: + type_id = type_entry["id"] + type_def = type_entry["type"]["def"] + type_path = type_entry["type"].get("path") + if type_entry.get("params") or "variant" in type_def: + continue + if type_path: + type_name = type_path[-1] + registry_type_map[type_name] = type_id + type_id_to_name[type_id] = type_name + else: + # Possibly a primitive + if "primitive" in type_def: + prim_name = type_def["primitive"] + registry_type_map[prim_name] = type_id + type_id_to_name[type_id] = prim_name + + # Pass 2: Resolve remaining types + pending_ids = set(type_by_id.keys()) - set(type_id_to_name.keys()) + + def resolve_type_definition(type_id_): + type_entry_ = type_by_id[type_id_] + type_def_ = type_entry_["type"]["def"] + type_path_ = type_entry_["type"].get("path", []) + type_params = type_entry_["type"].get("params", []) + + if type_id_ in type_id_to_name: + return type_id_to_name[type_id_] + + # Resolve complex types with paths (including generics like Option etc) + if type_path_: + type_name_ = type_path_[-1] + if type_params: + inner_names = [] + for param in type_params: + dep_id = param["type"] + if dep_id not in type_id_to_name: + return None + inner_names.append(type_id_to_name[dep_id]) + return f"{type_name_}<{', '.join(inner_names)}>" + if "variant" in type_def_: + return None + return type_name_ + + elif "sequence" in type_def_: + sequence_type_id = type_def_["sequence"]["type"] + inner_type = type_id_to_name.get(sequence_type_id) + if inner_type: + type_name_ = f"Vec<{inner_type}>" + return type_name_ + + elif "array" in type_def_: + array_type_id = type_def_["array"]["type"] + inner_type = type_id_to_name.get(array_type_id) + maybe_len = type_def_["array"].get("len") + if inner_type: + if maybe_len: + type_name_ = f"[{inner_type}; {maybe_len}]" + else: + type_name_ = f"[{inner_type}]" + return type_name_ -# @property -# def implements_scaleinfo(self) -> bool: -# """ -# Returns True if current runtime implementation a `PortableRegistry` (`MetadataV14` and higher) -# """ -# if self.metadata: -# return self.metadata.portable_registry is not None -# else: -# return False -# -# def reload_type_registry( -# self, use_remote_preset: bool = True, auto_discover: bool = True -# ): -# """ -# Reload type registry and preset used to instantiate the SubstrateInterface object. Useful to periodically apply -# changes in type definitions when a runtime upgrade occurred -# -# Args: -# use_remote_preset: When True preset is downloaded from Github master, otherwise use files from local -# installed scalecodec package -# auto_discover: Whether to automatically discover the type registry presets based on the chain name and the -# type registry -# """ -# self.runtime_config.clear_type_registry() -# -# self.runtime_config.implements_scale_info = self.implements_scaleinfo -# -# # Load metadata types in runtime configuration -# self.runtime_config.update_type_registry(load_type_registry_preset(name="core")) -# self.apply_type_registry_presets( -# use_remote_preset=use_remote_preset, auto_discover=auto_discover -# ) -# -# def apply_type_registry_presets( -# self, -# use_remote_preset: bool = True, -# auto_discover: bool = True, -# ): -# """ -# Applies type registry presets to the runtime -# -# Args: -# use_remote_preset: whether to use presets from remote -# auto_discover: whether to use presets from local installed scalecodec package -# """ -# if self.type_registry_preset is not None: -# # Load type registry according to preset -# type_registry_preset_dict = load_type_registry_preset( -# name=self.type_registry_preset, use_remote_preset=use_remote_preset -# ) -# -# if not type_registry_preset_dict: -# raise ValueError( -# f"Type registry preset '{self.type_registry_preset}' not found" -# ) -# -# elif auto_discover: -# # Try to auto discover type registry preset by chain name -# type_registry_name = self.chain.lower().replace(" ", "-") -# try: -# type_registry_preset_dict = load_type_registry_preset( -# type_registry_name -# ) -# self.type_registry_preset = type_registry_name -# except ValueError: -# type_registry_preset_dict = None -# -# else: -# type_registry_preset_dict = None -# -# if type_registry_preset_dict: -# # Load type registries in runtime configuration -# if self.implements_scaleinfo is False: -# # Only runtime with no embedded types in metadata need the default set of explicit defined types -# self.runtime_config.update_type_registry( -# load_type_registry_preset( -# "legacy", use_remote_preset=use_remote_preset -# ) -# ) -# -# if self.type_registry_preset != "legacy": -# self.runtime_config.update_type_registry(type_registry_preset_dict) -# -# if self.type_registry: -# # Load type registries in runtime configuration -# self.runtime_config.update_type_registry(self.type_registry) + elif "compact" in type_def_: + compact_type_id = type_def_["compact"]["type"] + inner_type = type_id_to_name.get(compact_type_id) + if inner_type: + type_name_ = f"Compact<{inner_type}>" + return type_name_ + + elif "tuple" in type_def_: + tuple_type_ids = type_def_["tuple"] + type_names = [] + for inner_type_id in tuple_type_ids: + if inner_type_id not in type_id_to_name: + return None + type_names.append(type_id_to_name[inner_type_id]) + type_name_ = ", ".join(type_names) + type_name_ = f"({type_name_})" + return type_name_ + + elif "variant" in type_def_: + return None + + return None + + resolved_type = True + while resolved_type and pending_ids: + resolved_type = False + for type_id in list(pending_ids): + name = resolve_type_definition(type_id) + if name is not None: + type_id_to_name[type_id] = name + registry_type_map[name] = type_id + pending_ids.remove(type_id) + resolved_type = True + + self.registry_type_map = registry_type_map + self.type_id_to_name = type_id_to_name class RequestManager: @@ -373,40 +543,59 @@ class SubstrateMixin(ABC): type_registry: Optional[dict] ss58_format: Optional[int] ws_max_size = 2**32 - registry_type_map: dict[str, int] - type_id_to_name: dict[int, str] - runtime: Runtime = None + runtime: Runtime = None # TODO remove - @property - def chain(self): - """ - Returns the substrate chain currently associated with object - """ - return self._chain - - @property - def metadata(self): - if not self.runtime or self.runtime.metadata is None: - raise AttributeError( - "Metadata not found. This generally indicates that the AsyncSubstrateInterface object " - "is not properly async initialized." + def __init__( + self, + type_registry: Optional[dict] = None, + type_registry_preset: Optional[str] = None, + use_remote_preset: bool = False, + ss58_format: Optional[int] = None, + ): + # We load a very basic RuntimeConfigurationObject that is only used for the initial metadata decoding + self.runtime_config = RuntimeConfigurationObject(ss58_format=ss58_format) + self.ss58_format = ss58_format + self.runtime_config.update_type_registry(load_type_registry_preset(name="core")) + if type_registry_preset is not None: + type_registry_preset_dict = load_type_registry_preset( + name=type_registry_preset, use_remote_preset=use_remote_preset ) + if not type_registry_preset_dict: + raise ValueError( + f"Type registry preset '{type_registry_preset}' not found" + ) else: - return self.runtime.metadata + type_registry_preset_dict = None + + if type_registry_preset_dict: + self.runtime_config.update_type_registry( + load_type_registry_preset("legacy", use_remote_preset=use_remote_preset) + ) + if type_registry_preset != "legacy": + self.runtime_config.update_type_registry(type_registry_preset_dict) + if type_registry: + # Load type registries in runtime configuration + self.runtime_config.update_type_registry(type_registry) + + def _runtime_config_copy(self, implements_scale_info: bool = False): + runtime_config = RuntimeConfigurationObject( + ss58_format=self.ss58_format, implements_scale_info=implements_scale_info + ) + runtime_config.active_spec_version_id = ( + self.runtime_config.active_spec_version_id + ) + runtime_config.chain_id = self.runtime_config.chain_id + # TODO. This works, but deepcopy does not. Indicating this gets updated somewhere else. + runtime_config.type_registry = self.runtime_config.type_registry + assert runtime_config.type_registry == self.runtime_config.type_registry + return runtime_config @property - def implements_scaleinfo(self) -> Optional[bool]: + def chain(self): """ - Returns True if current runtime implementation a `PortableRegistry` (`MetadataV14` and higher) - - Returns - ------- - bool + Returns the substrate chain currently associated with object """ - if self.runtime and self.runtime.metadata: - return self.runtime.metadata.portable_registry is not None - else: - return None + return self._chain def ss58_encode( self, public_key: Union[str, bytes], ss58_format: int = None @@ -454,7 +643,11 @@ def is_valid_ss58_address(self, value: str) -> bool: return is_valid_ss58_address(value, valid_ss58_format=self.ss58_format) def serialize_storage_item( - self, storage_item: ScaleType, module, spec_version_id + self, + storage_item: ScaleType, + module: str, + spec_version_id: int, + runtime: Optional[Runtime] = None, ) -> dict: """ Helper function to serialize a storage item @@ -463,10 +656,17 @@ def serialize_storage_item( storage_item: the storage item to serialize module: the module to use to serialize the storage item spec_version_id: the version id + runtime: The runtime to serialize the storage item Returns: dict """ + if not runtime: + runtime = self.runtime + metadata = self.metadata + else: + metadata = runtime.metadata + storage_dict = { "storage_name": storage_item.name, "storage_modifier": storage_item.modifier, @@ -497,10 +697,10 @@ def serialize_storage_item( query_value = storage_item.value_object["default"].value_object try: - obj = self.runtime_config.create_scale_object( + obj = runtime.runtime_config.create_scale_object( type_string=value_scale_type, data=ScaleBytes(query_value), - metadata=self.metadata, + metadata=metadata, ) obj.decode() storage_dict["storage_default"] = obj.decode() @@ -622,183 +822,6 @@ def serialize_module_error(module, error, spec_version) -> dict: "spec_version": spec_version, } - def _load_registry_type_map(self, registry): - registry_type_map = {} - type_id_to_name = {} - types = json.loads(registry.registry)["types"] - type_by_id = {entry["id"]: entry for entry in types} - - # Pass 1: Gather simple types - for type_entry in types: - type_id = type_entry["id"] - type_def = type_entry["type"]["def"] - type_path = type_entry["type"].get("path") - if type_entry.get("params") or "variant" in type_def: - continue - if type_path: - type_name = type_path[-1] - registry_type_map[type_name] = type_id - type_id_to_name[type_id] = type_name - else: - # Possibly a primitive - if "primitive" in type_def: - prim_name = type_def["primitive"] - registry_type_map[prim_name] = type_id - type_id_to_name[type_id] = prim_name - - # Pass 2: Resolve remaining types - pending_ids = set(type_by_id.keys()) - set(type_id_to_name.keys()) - - def resolve_type_definition(type_id_): - type_entry_ = type_by_id[type_id_] - type_def_ = type_entry_["type"]["def"] - type_path_ = type_entry_["type"].get("path", []) - type_params = type_entry_["type"].get("params", []) - - if type_id_ in type_id_to_name: - return type_id_to_name[type_id_] - - # Resolve complex types with paths (including generics like Option etc) - if type_path_: - type_name_ = type_path_[-1] - if type_params: - inner_names = [] - for param in type_params: - dep_id = param["type"] - if dep_id not in type_id_to_name: - return None - inner_names.append(type_id_to_name[dep_id]) - return f"{type_name_}<{', '.join(inner_names)}>" - if "variant" in type_def_: - return None - return type_name_ - - elif "sequence" in type_def_: - sequence_type_id = type_def_["sequence"]["type"] - inner_type = type_id_to_name.get(sequence_type_id) - if inner_type: - type_name_ = f"Vec<{inner_type}>" - return type_name_ - - elif "array" in type_def_: - array_type_id = type_def_["array"]["type"] - inner_type = type_id_to_name.get(array_type_id) - maybe_len = type_def_["array"].get("len") - if inner_type: - if maybe_len: - type_name_ = f"[{inner_type}; {maybe_len}]" - else: - type_name_ = f"[{inner_type}]" - return type_name_ - - elif "compact" in type_def_: - compact_type_id = type_def_["compact"]["type"] - inner_type = type_id_to_name.get(compact_type_id) - if inner_type: - type_name_ = f"Compact<{inner_type}>" - return type_name_ - - elif "tuple" in type_def_: - tuple_type_ids = type_def_["tuple"] - type_names = [] - for inner_type_id in tuple_type_ids: - if inner_type_id not in type_id_to_name: - return None - type_names.append(type_id_to_name[inner_type_id]) - type_name_ = ", ".join(type_names) - type_name_ = f"({type_name_})" - return type_name_ - - elif "variant" in type_def_: - return None - - return None - - resolved_type = True - while resolved_type and pending_ids: - resolved_type = False - for type_id in list(pending_ids): - name = resolve_type_definition(type_id) - if name is not None: - type_id_to_name[type_id] = name - registry_type_map[name] = type_id - pending_ids.remove(type_id) - resolved_type = True - - self.registry_type_map = registry_type_map - self.type_id_to_name = type_id_to_name - - def reload_type_registry( - self, use_remote_preset: bool = True, auto_discover: bool = True - ): - """ - Reload type registry and preset used to instantiate the `AsyncSubstrateInterface` object. Useful to - periodically apply changes in type definitions when a runtime upgrade occurred - - Args: - use_remote_preset: When True preset is downloaded from Github master, - otherwise use files from local installed scalecodec package - auto_discover: Whether to automatically discover the type_registry - presets based on the chain name and typer registry - """ - self.runtime_config.clear_type_registry() - - self.runtime_config.implements_scale_info = self.implements_scaleinfo - - # Load metadata types in runtime configuration - self.runtime_config.update_type_registry(load_type_registry_preset(name="core")) - self.apply_type_registry_presets( - use_remote_preset=use_remote_preset, auto_discover=auto_discover - ) - - def apply_type_registry_presets( - self, use_remote_preset: bool = True, auto_discover: bool = True - ): - if self.type_registry_preset is not None: - # Load type registry according to preset - type_registry_preset_dict = load_type_registry_preset( - name=self.type_registry_preset, use_remote_preset=use_remote_preset - ) - - if not type_registry_preset_dict: - raise ValueError( - f"Type registry preset '{self.type_registry_preset}' not found" - ) - - elif auto_discover: - # Try to auto discover type registry preset by chain name - type_registry_name = self.chain.lower().replace(" ", "-") - try: - type_registry_preset_dict = load_type_registry_preset( - type_registry_name - ) - logger.debug( - f"Auto set type_registry_preset to {type_registry_name} ..." - ) - self.type_registry_preset = type_registry_name - except ValueError: - type_registry_preset_dict = None - - else: - type_registry_preset_dict = None - - if type_registry_preset_dict: - # Load type registries in runtime configuration - if self.implements_scaleinfo is False: - # Only runtime with no embedded types in metadata need the default set of explicit defined types - self.runtime_config.update_type_registry( - load_type_registry_preset( - "legacy", use_remote_preset=use_remote_preset - ) - ) - - if self.type_registry_preset != "legacy": - self.runtime_config.update_type_registry(type_registry_preset_dict) - - if self.type_registry: - # Load type registries in runtime configuration - self.runtime_config.update_type_registry(self.type_registry) - def extension_call(self, name, **kwargs): raise NotImplementedError( "Extensions not implemented in AsyncSubstrateInterface" @@ -836,13 +859,16 @@ def make_payload(id_: str, method: str, params: list) -> dict: "payload": {"jsonrpc": "2.0", "method": method, "params": params}, } - def _encode_scale(self, type_string, value: Any) -> bytes: + def _encode_scale( + self, type_string, value: Any, runtime: Optional[Runtime] = None + ) -> bytes: """ Helper function to encode arbitrary data into SCALE-bytes for given RUST type_string Args: type_string: the type string of the SCALE object for decoding value: value to encode + runtime: Optional Runtime whose registry to use for encoding Returns: encoded bytes @@ -850,14 +876,16 @@ def _encode_scale(self, type_string, value: Any) -> bytes: if value is None: result = b"\x00" else: + if not runtime: + runtime = self.runtime try: vec_acct_id = ( - f"scale_info::{self.registry_type_map['Vec']}" + f"scale_info::{runtime.registry_type_map['Vec']}" ) except KeyError: vec_acct_id = "scale_info::152" try: - optional_acct_u16 = f"scale_info::{self.registry_type_map['Option<(AccountId32, u16)>']}" + optional_acct_u16 = f"scale_info::{runtime.registry_type_map['Option<(AccountId32, u16)>']}" except KeyError: optional_acct_u16 = "scale_info::579" @@ -902,12 +930,11 @@ def _encode_scale(self, type_string, value: Any) -> bytes: else: value = value.value # Unwrap the value of the type - result = bytes( - encode_by_type_string(type_string, self.runtime.registry, value) - ) + result = bytes(encode_by_type_string(type_string, runtime.registry, value)) return result - def _encode_account_id(self, account) -> bytes: + @staticmethod + def _encode_account_id(account) -> bytes: """Encode an account ID into bytes. Args: diff --git a/async_substrate_interface/utils/decoding.py b/async_substrate_interface/utils/decoding.py index 4c582d8..88d23c0 100644 --- a/async_substrate_interface/utils/decoding.py +++ b/async_substrate_interface/utils/decoding.py @@ -1,6 +1,7 @@ from typing import Union, TYPE_CHECKING from bt_decode import AxonInfo, PrometheusInfo, decode_list +from scalecodec import ScaleBytes from async_substrate_interface.utils import hex_to_bytes from async_substrate_interface.types import ScaleObj @@ -55,10 +56,16 @@ def _bt_decode_to_dict_or_list(obj) -> Union[dict, list[dict]]: def _decode_scale_list_with_runtime( type_strings: list[str], scale_bytes_list: list[bytes], - runtime_registry, + runtime: "Runtime", return_scale_obj: bool = False, ): - obj = decode_list(type_strings, runtime_registry, scale_bytes_list) + if runtime.metadata_v15 is not None: + obj = decode_list(type_strings, runtime.registry, scale_bytes_list) + else: + obj = [ + legacy_scale_decode(x, y, runtime) + for (x, y) in zip(type_strings, scale_bytes_list) + ] if return_scale_obj: return [ScaleObj(x) for x in obj] else: @@ -109,7 +116,7 @@ def concat_hash_len(key_hasher: str) -> int: all_decoded = _decode_scale_list_with_runtime( pre_decoded_key_types + pre_decoded_value_types, pre_decoded_keys + pre_decoded_values, - runtime.registry, + runtime, ) middl_index = len(all_decoded) // 2 decoded_keys = all_decoded[:middl_index] @@ -132,3 +139,18 @@ def concat_hash_len(key_hasher: str) -> int: item_value = dv result.append([item_key, item_value]) return result + + +def legacy_scale_decode( + type_string: str, scale_bytes: Union[str, ScaleBytes], runtime: "Runtime" +): + if isinstance(scale_bytes, (str, bytes)): + scale_bytes = ScaleBytes(scale_bytes) + + obj = runtime.runtime_config.create_scale_object( + type_string=type_string, data=scale_bytes, metadata=runtime.metadata + ) + + obj.decode(check_remaining=runtime.config.get("strict_scale_decode")) + + return obj.value diff --git a/tests/helpers/settings.py b/tests/helpers/settings.py index ae1d7cb..ab11ca1 100644 --- a/tests/helpers/settings.py +++ b/tests/helpers/settings.py @@ -32,3 +32,5 @@ AURA_NODE_URL = ( environ.get("SUBSTRATE_AURA_NODE_URL") or "wss://acala-rpc-1.aca-api.network" ) + +ARCHIVE_ENTRYPOINT = "wss://archive.chain.opentensor.ai:443" diff --git a/tests/integration_tests/test_async_substrate_interface.py b/tests/integration_tests/test_async_substrate_interface.py new file mode 100644 index 0000000..afdf646 --- /dev/null +++ b/tests/integration_tests/test_async_substrate_interface.py @@ -0,0 +1,32 @@ +import pytest + +from async_substrate_interface.async_substrate import AsyncSubstrateInterface +from async_substrate_interface.types import ScaleObj +from tests.helpers.settings import ARCHIVE_ENTRYPOINT + + +@pytest.mark.asyncio +async def test_legacy_decoding(): + # roughly 4000 blocks before metadata v15 was added + pre_metadata_v15_block = 3_010_611 + + async with AsyncSubstrateInterface(ARCHIVE_ENTRYPOINT) as substrate: + block_hash = await substrate.get_block_hash(pre_metadata_v15_block) + events = await substrate.get_events(block_hash) + assert isinstance(events, list) + + query_map_result = await substrate.query_map( + module="SubtensorModule", + storage_function="NetworksAdded", + block_hash=block_hash, + ) + async for key, value in query_map_result: + assert isinstance(key, int) + assert isinstance(value, ScaleObj) + + timestamp = await substrate.query( + "Timestamp", + "Now", + block_hash=block_hash, + ) + assert timestamp.value == 1716358476004 diff --git a/tests/integration_tests/test_substrate_interface.py b/tests/integration_tests/test_substrate_interface.py new file mode 100644 index 0000000..3af6e65 --- /dev/null +++ b/tests/integration_tests/test_substrate_interface.py @@ -0,0 +1,29 @@ +from async_substrate_interface.sync_substrate import SubstrateInterface +from async_substrate_interface.types import ScaleObj +from tests.helpers.settings import ARCHIVE_ENTRYPOINT + + +def test_legacy_decoding(): + # roughly 4000 blocks before metadata v15 was added + pre_metadata_v15_block = 3_010_611 + + with SubstrateInterface(ARCHIVE_ENTRYPOINT) as substrate: + block_hash = substrate.get_block_hash(pre_metadata_v15_block) + events = substrate.get_events(block_hash) + assert isinstance(events, list) + + query_map_result = substrate.query_map( + module="SubtensorModule", + storage_function="NetworksAdded", + block_hash=block_hash, + ) + for key, value in query_map_result: + assert isinstance(key, int) + assert isinstance(value, ScaleObj) + + timestamp = substrate.query( + "Timestamp", + "Now", + block_hash=block_hash, + ) + assert timestamp.value == 1716358476004 diff --git a/tests/unit_tests/asyncio_/test_substrate_interface.py b/tests/unit_tests/asyncio_/test_substrate_interface.py index a64d570..1ea30ef 100644 --- a/tests/unit_tests/asyncio_/test_substrate_interface.py +++ b/tests/unit_tests/asyncio_/test_substrate_interface.py @@ -1,5 +1,5 @@ import asyncio -from unittest.mock import AsyncMock, MagicMock +from unittest.mock import AsyncMock, MagicMock, ANY import pytest from websockets.exceptions import InvalidURI @@ -64,7 +64,7 @@ async def test_runtime_call(monkeypatch): # Patch RPC request with correct behavior substrate.rpc_request = AsyncMock( - side_effect=lambda method, params: { + side_effect=lambda method, params, runtime: { "result": "0x00" if method == "state_call" else {"parentHash": "0xDEADBEEF"} } ) @@ -83,14 +83,16 @@ async def test_runtime_call(monkeypatch): assert result.value == "decoded_result" # Check decode_scale called correctly - substrate.decode_scale.assert_called_once_with("scale_info::1", b"\x00") + substrate.decode_scale.assert_called_once_with( + "scale_info::1", b"\x00", runtime=ANY + ) # encode_scale should not be called since no inputs substrate.encode_scale.assert_not_called() # Check RPC request called for the state_call substrate.rpc_request.assert_any_call( - "state_call", ["SubstrateApi_SubstrateMethod", "", None] + "state_call", ["SubstrateApi_SubstrateMethod", "", None], runtime=ANY ) diff --git a/tests/unit_tests/sync/test_substrate_interface.py b/tests/unit_tests/sync/test_substrate_interface.py index 6d9c471..ea6d7b5 100644 --- a/tests/unit_tests/sync/test_substrate_interface.py +++ b/tests/unit_tests/sync/test_substrate_interface.py @@ -72,3 +72,4 @@ def test_runtime_call(monkeypatch): substrate.rpc_request.assert_any_call( "state_call", ["SubstrateApi_SubstrateMethod", "", None] ) + substrate.close() diff --git a/tests/unit_tests/test_cache.py b/tests/unit_tests/test_cache.py index dddb2e8..726c94c 100644 --- a/tests/unit_tests/test_cache.py +++ b/tests/unit_tests/test_cache.py @@ -71,8 +71,11 @@ async def error_method(x): @pytest.mark.asyncio async def test_cached_fetcher_eviction(): """Tests that LRU eviction works in CachedFetcher.""" - mock_method = mock.AsyncMock(side_effect=lambda x: f"val_{x}") - fetcher = CachedFetcher(max_size=2, method=mock_method) + + async def side_effect_method(x): + return f"val_{x}" + + fetcher = CachedFetcher(max_size=2, method=side_effect_method) # Fill cache await fetcher("key1")