diff --git a/async_substrate_interface/async_substrate.py b/async_substrate_interface/async_substrate.py index 94abf59..d38cc53 100644 --- a/async_substrate_interface/async_substrate.py +++ b/async_substrate_interface/async_substrate.py @@ -30,6 +30,7 @@ GenericExtrinsic, GenericRuntimeCallDefinition, ss58_encode, + MultiAccountId, ) from websockets.asyncio.client import connect from websockets.exceptions import ConnectionClosed @@ -1039,6 +1040,130 @@ async def create_storage_key( metadata=self.runtime.metadata, ) + async def subscribe_storage( + self, + storage_keys: list[StorageKey], + subscription_handler: Callable[[StorageKey, Any, str], Awaitable[Any]], + ): + """ + + Subscribe to provided storage_keys and keep tracking until `subscription_handler` returns a value + + Example of a StorageKey: + ``` + StorageKey.create_from_storage_function( + "System", "Account", ["5GrwvaEF5zXb26Fz9rcQpDWS57CtERHpNehXCPcNoHGKutQY"] + ) + ``` + + Example of a subscription handler: + ``` + async def subscription_handler(storage_key, obj, subscription_id): + if obj is not None: + # the subscription will run until your subscription_handler returns something other than `None` + return obj + ``` + + Args: + storage_keys: StorageKey list of storage keys to subscribe to + subscription_handler: coroutine function to handle value changes of subscription + + """ + await self.init_runtime() + + storage_key_map = {s.to_hex(): s for s in storage_keys} + + async def result_handler( + message: dict, subscription_id: str + ) -> tuple[bool, Optional[Any]]: + result_found = False + subscription_result = None + if "params" in message: + # Process changes + for change_storage_key, change_data in message["params"]["result"][ + "changes" + ]: + # Check for target storage key + storage_key = storage_key_map[change_storage_key] + + if change_data is not None: + change_scale_type = storage_key.value_scale_type + result_found = True + elif ( + storage_key.metadata_storage_function.value["modifier"] + == "Default" + ): + # Fallback to default value of storage function if no result + change_scale_type = storage_key.value_scale_type + change_data = ( + storage_key.metadata_storage_function.value_object[ + "default" + ].value_object + ) + else: + # No result is interpreted as an Option<...> result + change_scale_type = f"Option<{storage_key.value_scale_type}>" + change_data = ( + storage_key.metadata_storage_function.value_object[ + "default" + ].value_object + ) + + # Decode SCALE result data + updated_obj = await self.decode_scale( + type_string=change_scale_type, + scale_bytes=hex_to_bytes(change_data), + ) + + subscription_result = await subscription_handler( + storage_key, updated_obj, subscription_id + ) + + if subscription_result is not None: + # Handler returned end result: unsubscribe from further updates + self._forgettable_task = asyncio.create_task( + self.rpc_request( + "state_unsubscribeStorage", [subscription_id] + ) + ) + + return result_found, subscription_result + + if not callable(subscription_handler): + raise ValueError("Provided `subscription_handler` is not callable") + + return await self.rpc_request( + "state_subscribeStorage", + [[s.to_hex() for s in storage_keys]], + result_handler=result_handler, + ) + + async def retrieve_pending_extrinsics(self) -> list: + """ + Retrieves and decodes pending extrinsics from the node's transaction pool + + Returns: + list of extrinsics + """ + + runtime = await self.init_runtime() + + result_data = await self.rpc_request("author_pendingExtrinsics", []) + + extrinsics = [] + + for extrinsic_data in result_data["result"]: + extrinsic = runtime.runtime_config.create_scale_object( + "Extrinsic", metadata=runtime.metadata + ) + extrinsic.decode( + ScaleBytes(extrinsic_data), + check_remaining=self.config.get("strict_scale_decode"), + ) + extrinsics.append(extrinsic) + + return extrinsics + async def get_metadata_storage_functions(self, block_hash=None) -> list: """ Retrieves a list of all storage functions in metadata active at given block_hash (or chaintip if block_hash is @@ -1193,6 +1318,41 @@ async def get_metadata_runtime_call_function( return runtime_call_def_obj + async def get_metadata_runtime_call_function( + self, api: str, method: str + ) -> GenericRuntimeCallDefinition: + """ + Get details of a runtime API call + + Args: + api: Name of the runtime API e.g. 'TransactionPaymentApi' + method: Name of the method e.g. 'query_fee_details' + + Returns: + GenericRuntimeCallDefinition + """ + await self.init_runtime(block_hash=block_hash) + + try: + runtime_call_def = self.runtime_config.type_registry["runtime_api"][api][ + "methods" + ][method] + runtime_call_def["api"] = api + runtime_call_def["method"] = method + runtime_api_types = self.runtime_config.type_registry["runtime_api"][ + api + ].get("types", {}) + except KeyError: + raise ValueError(f"Runtime API Call '{api}.{method}' not found in registry") + + # Add runtime API types to registry + self.runtime_config.update_type_registry_types(runtime_api_types) + + runtime_call_def_obj = await self.create_scale_object("RuntimeCallDefinition") + runtime_call_def_obj.encode(runtime_call_def) + + return runtime_call_def_obj + async def _get_block_handler( self, block_hash: str, @@ -1669,6 +1829,21 @@ def convert_event_data(data): events.append(convert_event_data(item)) return events + async def get_metadata(self, block_hash=None) -> MetadataV15: + """ + Returns `MetadataVersioned` object for given block_hash or chaintip if block_hash is omitted + + + Args: + block_hash + + Returns: + MetadataVersioned + """ + runtime = await self.init_runtime(block_hash=block_hash) + + return runtime.metadata_v15 + @a.lru_cache(maxsize=512) async def get_parent_block_hash(self, block_hash): return await self._get_parent_block_hash(block_hash) @@ -1685,10 +1860,43 @@ async def _get_parent_block_hash(self, block_hash): return block_hash return parent_block_hash + async def get_storage_by_key(self, block_hash: str, storage_key: str) -> Any: + """ + A pass-though to existing JSONRPC method `state_getStorage`/`state_getStorageAt` + + Args: + block_hash: hash of the block + storage_key: storage key to query + + Returns: + result of the query + + """ + + if await self.supports_rpc_method("state_getStorageAt"): + response = await self.rpc_request( + "state_getStorageAt", [storage_key, block_hash] + ) + else: + response = await self.rpc_request( + "state_getStorage", [storage_key, block_hash] + ) + + if "result" in response: + return response.get("result") + elif "error" in response: + raise SubstrateRequestException(response["error"]["message"]) + else: + raise SubstrateRequestException( + "Unknown error occurred during retrieval of events" + ) + @a.lru_cache(maxsize=16) async def get_block_runtime_info(self, block_hash: str) -> dict: return await self._get_block_runtime_info(block_hash) + get_block_runtime_version = get_block_runtime_info + async def _get_block_runtime_info(self, block_hash: str) -> dict: """ Retrieve the runtime info of given block_hash @@ -2443,6 +2651,34 @@ async def create_signed_extrinsic( return extrinsic + async def create_unsigned_extrinsic(self, call: GenericCall) -> GenericExtrinsic: + """ + Create unsigned extrinsic for given `Call` + + Args: + call: GenericCall the call the extrinsic should contain + + Returns: + GenericExtrinsic + """ + + runtime = await self.init_runtime() + + # Create extrinsic + extrinsic = self.runtime_config.create_scale_object( + type_string="Extrinsic", metadata=runtime.metadata + ) + + extrinsic.encode( + { + "call_function": call.value["call_function"], + "call_module": call.value["call_module"], + "call_args": call.value["call_args"], + } + ) + + return extrinsic + async def get_chain_finalised_head(self): """ A pass-though to existing JSONRPC method `chain_getFinalizedHead` @@ -2627,6 +2863,29 @@ async def get_account_next_index(self, account_address: str) -> int: self._nonces[account_address] += 1 return self._nonces[account_address] + async def get_metadata_constants(self, block_hash=None) -> list[dict]: + """ + Retrieves a list of all constants in metadata active at given block_hash (or chaintip if block_hash is omitted) + + Args: + block_hash: hash of the block + + Returns: + list of constants + """ + + runtime = await self.init_runtime(block_hash=block_hash) + + constant_list = [] + + for module_idx, module in enumerate(self.metadata.pallets): + for constant in module.constants or []: + constant_list.append( + self.serialize_constant(constant, module, runtime.runtime_version) + ) + + return constant_list + async def get_metadata_constant(self, module_name, constant_name, block_hash=None): """ Retrieves the details of a constant for given module name, call function name and block_hash @@ -2994,6 +3253,100 @@ async def query_map( ignore_decoding_errors=ignore_decoding_errors, ) + async def create_multisig_extrinsic( + self, + call: GenericCall, + keypair: Keypair, + multisig_account: MultiAccountId, + max_weight: Optional[Union[dict, int]] = None, + era: dict = None, + nonce: int = None, + tip: int = 0, + tip_asset_id: int = None, + signature: Union[bytes, str] = None, + ) -> GenericExtrinsic: + """ + Create a Multisig extrinsic that will be signed by one of the signatories. Checks on-chain if the threshold + of the multisig account is reached and try to execute the call accordingly. + + Args: + call: GenericCall to create extrinsic for + keypair: Keypair of the signatory to approve given call + multisig_account: MultiAccountId to use of origin of the extrinsic (see `generate_multisig_account()`) + max_weight: Maximum allowed weight to execute the call ( Uses `get_payment_info()` by default) + era: Specify mortality in blocks in follow format: {'period': [amount_blocks]} If omitted the extrinsic is + immortal + nonce: nonce to include in extrinsics, if omitted the current nonce is retrieved on-chain + tip: The tip for the block author to gain priority during network congestion + tip_asset_id: Optional asset ID with which to pay the tip + signature: Optionally provide signature if externally signed + + Returns: + GenericExtrinsic + """ + if max_weight is None: + payment_info = await self.get_payment_info(call, keypair) + max_weight = payment_info["weight"] + + # Check if call has existing approvals + multisig_details_ = await self.query( + "Multisig", "Multisigs", [multisig_account.value, call.call_hash] + ) + multisig_details = getattr(multisig_details_, "value", multisig_details_) + if multisig_details: + maybe_timepoint = multisig_details["when"] + else: + maybe_timepoint = None + + # Compose 'as_multi' when final, 'approve_as_multi' otherwise + if ( + multisig_details.value + and len(multisig_details.value["approvals"]) + 1 + == multisig_account.threshold + ): + multi_sig_call = await self.compose_call( + "Multisig", + "as_multi", + { + "other_signatories": [ + s + for s in multisig_account.signatories + if s != f"0x{keypair.public_key.hex()}" + ], + "threshold": multisig_account.threshold, + "maybe_timepoint": maybe_timepoint, + "call": call, + "store_call": False, + "max_weight": max_weight, + }, + ) + else: + multi_sig_call = await self.compose_call( + "Multisig", + "approve_as_multi", + { + "other_signatories": [ + s + for s in multisig_account.signatories + if s != f"0x{keypair.public_key.hex()}" + ], + "threshold": multisig_account.threshold, + "maybe_timepoint": maybe_timepoint, + "call_hash": call.call_hash, + "max_weight": max_weight, + }, + ) + + return await self.create_signed_extrinsic( + multi_sig_call, + keypair, + era=era, + nonce=nonce, + tip=tip, + tip_asset_id=tip_asset_id, + signature=signature, + ) + async def submit_extrinsic( self, extrinsic: GenericExtrinsic, @@ -3136,6 +3489,55 @@ async def get_metadata_call_function( return call return None + async def get_metadata_events(self, block_hash=None) -> list[dict]: + """ + Retrieves a list of all events in metadata active for given block_hash (or chaintip if block_hash is omitted) + + Args: + block_hash + + Returns: + list of module events + """ + + runtime = await self.init_runtime(block_hash=block_hash) + + event_list = [] + + for event_index, (module, event) in self.metadata.event_index.items(): + event_list.append( + self.serialize_module_event( + module, event, runtime.runtime_version, event_index + ) + ) + + return event_list + + async def get_metadata_event( + self, module_name, event_name, block_hash=None + ) -> Optional[Any]: + """ + Retrieves the details of an event for given module name, call function name and block_hash + (or chaintip if block_hash is omitted) + + Args: + module_name: name of the module to call + event_name: name of the event + block_hash: hash of the block + + Returns: + Metadata event + + """ + + runtime = await self.init_runtime(block_hash=block_hash) + + for pallet in runtime.metadata.pallets: + if pallet.name == module_name and pallet.events: + for event in pallet.events: + if event.name == event_name: + return event + async def get_block_number(self, block_hash: Optional[str] = None) -> int: """Async version of `substrateinterface.base.get_block_number` method.""" response = await self.rpc_request("chain_getHeader", [block_hash]) diff --git a/async_substrate_interface/sync_substrate.py b/async_substrate_interface/sync_substrate.py index a3a9f4d..45eaddb 100644 --- a/async_substrate_interface/sync_substrate.py +++ b/async_substrate_interface/sync_substrate.py @@ -11,6 +11,7 @@ GenericExtrinsic, GenericRuntimeCallDefinition, ss58_encode, + MultiAccountId, ) from scalecodec.base import RuntimeConfigurationObject, ScaleBytes, ScaleType from websockets.sync.client import connect @@ -796,6 +797,126 @@ def create_storage_key( metadata=self.runtime.metadata, ) + def subscribe_storage( + self, + storage_keys: list[StorageKey], + subscription_handler: Callable[[StorageKey, Any, str], Any], + ): + """ + + Subscribe to provided storage_keys and keep tracking until `subscription_handler` returns a value + + Example of a StorageKey: + ``` + StorageKey.create_from_storage_function( + "System", "Account", ["5GrwvaEF5zXb26Fz9rcQpDWS57CtERHpNehXCPcNoHGKutQY"] + ) + ``` + + Example of a subscription handler: + ``` + def subscription_handler(storage_key, obj, subscription_id): + if obj is not None: + # the subscription will run until your subscription_handler returns something other than `None` + return obj + ``` + + Args: + storage_keys: StorageKey list of storage keys to subscribe to + subscription_handler: function to handle value changes of subscription + + """ + self.init_runtime() + + storage_key_map = {s.to_hex(): s for s in storage_keys} + + def result_handler( + message: dict, subscription_id: str + ) -> tuple[bool, Optional[Any]]: + result_found = False + subscription_result = None + if "params" in message: + # Process changes + for change_storage_key, change_data in message["params"]["result"][ + "changes" + ]: + # Check for target storage key + storage_key = storage_key_map[change_storage_key] + + if change_data is not None: + change_scale_type = storage_key.value_scale_type + result_found = True + elif ( + storage_key.metadata_storage_function.value["modifier"] + == "Default" + ): + # Fallback to default value of storage function if no result + change_scale_type = storage_key.value_scale_type + change_data = ( + storage_key.metadata_storage_function.value_object[ + "default" + ].value_object + ) + else: + # No result is interpreted as an Option<...> result + change_scale_type = f"Option<{storage_key.value_scale_type}>" + change_data = ( + storage_key.metadata_storage_function.value_object[ + "default" + ].value_object + ) + + # Decode SCALE result data + updated_obj = self.decode_scale( + type_string=change_scale_type, + scale_bytes=hex_to_bytes(change_data), + ) + + subscription_result = subscription_handler( + storage_key, updated_obj, subscription_id + ) + + if subscription_result is not None: + # Handler returned end result: unsubscribe from further updates + self.rpc_request("state_unsubscribeStorage", [subscription_id]) + + return result_found, subscription_result + + if not callable(subscription_handler): + raise ValueError("Provided `subscription_handler` is not callable") + + return self.rpc_request( + "state_subscribeStorage", + [[s.to_hex() for s in storage_keys]], + result_handler=result_handler, + ) + + def retrieve_pending_extrinsics(self) -> list: + """ + Retrieves and decodes pending extrinsics from the node's transaction pool + + Returns: + list of extrinsics + """ + + runtime = self.init_runtime() + + result_data = self.rpc_request("author_pendingExtrinsics", []) + + extrinsics = [] + + for extrinsic_data in result_data["result"]: + extrinsic = runtime.runtime_config.create_scale_object( + "Extrinsic", metadata=runtime.metadata + ) + extrinsic.decode( + ScaleBytes(extrinsic_data), + check_remaining=self.config.get("strict_scale_decode"), + ) + extrinsics.append(extrinsic) + + return extrinsics + def get_metadata_storage_functions(self, block_hash=None) -> list: """ Retrieves a list of all storage functions in metadata active at given block_hash (or chaintip if block_hash is @@ -946,6 +1067,41 @@ def get_metadata_runtime_call_function( return runtime_call_def_obj + def get_metadata_runtime_call_function( + self, api: str, method: str + ) -> GenericRuntimeCallDefinition: + """ + Get details of a runtime API call + + Args: + api: Name of the runtime API e.g. 'TransactionPaymentApi' + method: Name of the method e.g. 'query_fee_details' + + Returns: + GenericRuntimeCallDefinition + """ + self.init_runtime() + + try: + runtime_call_def = self.runtime_config.type_registry["runtime_api"][api][ + "methods" + ][method] + runtime_call_def["api"] = api + runtime_call_def["method"] = method + runtime_api_types = self.runtime_config.type_registry["runtime_api"][ + api + ].get("types", {}) + except KeyError: + raise ValueError(f"Runtime API Call '{api}.{method}' not found in registry") + + # Add runtime API types to registry + self.runtime_config.update_type_registry_types(runtime_api_types) + + runtime_call_def_obj = self.create_scale_object("RuntimeCallDefinition") + runtime_call_def_obj.encode(runtime_call_def) + + return runtime_call_def_obj + def _get_block_handler( self, block_hash: str, @@ -1416,6 +1572,21 @@ def convert_event_data(data): events.append(convert_event_data(item)) return events + def get_metadata(self, block_hash=None) -> MetadataV15: + """ + Returns `MetadataVersioned` object for given block_hash or chaintip if block_hash is omitted + + + Args: + block_hash + + Returns: + MetadataVersioned + """ + runtime = self.init_runtime(block_hash=block_hash) + + return runtime.metadata_v15 + @functools.lru_cache(maxsize=512) def get_parent_block_hash(self, block_hash): block_header = self.rpc_request("chain_getHeader", [block_hash]) @@ -1429,6 +1600,33 @@ def get_parent_block_hash(self, block_hash): return block_hash return parent_block_hash + def get_storage_by_key(self, block_hash: str, storage_key: str) -> Any: + """ + A pass-though to existing JSONRPC method `state_getStorage`/`state_getStorageAt` + + Args: + block_hash: hash of the block + storage_key: storage key to query + + Returns: + result of the query + + """ + + if self.supports_rpc_method("state_getStorageAt"): + response = self.rpc_request("state_getStorageAt", [storage_key, block_hash]) + else: + response = self.rpc_request("state_getStorage", [storage_key, block_hash]) + + if "result" in response: + return response.get("result") + elif "error" in response: + raise SubstrateRequestException(response["error"]["message"]) + else: + raise SubstrateRequestException( + "Unknown error occurred during retrieval of events" + ) + @functools.lru_cache(maxsize=16) def get_block_runtime_info(self, block_hash: str) -> dict: """ @@ -1437,6 +1635,8 @@ def get_block_runtime_info(self, block_hash: str) -> dict: response = self.rpc_request("state_getRuntimeVersion", [block_hash]) return response.get("result") + get_block_runtime_version = get_block_runtime_info + @functools.lru_cache(maxsize=512) def get_block_runtime_version_for(self, block_hash: str): """ @@ -2161,6 +2361,34 @@ def create_signed_extrinsic( return extrinsic + def create_unsigned_extrinsic(self, call: GenericCall) -> GenericExtrinsic: + """ + Create unsigned extrinsic for given `Call` + + Args: + call: GenericCall the call the extrinsic should contain + + Returns: + GenericExtrinsic + """ + + runtime = self.init_runtime() + + # Create extrinsic + extrinsic = self.runtime_config.create_scale_object( + type_string="Extrinsic", metadata=runtime.metadata + ) + + extrinsic.encode( + { + "call_function": call.value["call_function"], + "call_module": call.value["call_module"], + "call_args": call.value["call_args"], + } + ) + + return extrinsic + def get_chain_finalised_head(self): """ A pass-though to existing JSONRPC method `chain_getFinalizedHead` @@ -2334,6 +2562,29 @@ def get_account_next_index(self, account_address: str) -> int: nonce_obj = self.rpc_request("account_nextIndex", [account_address]) return nonce_obj["result"] + def get_metadata_constants(self, block_hash=None) -> list[dict]: + """ + Retrieves a list of all constants in metadata active at given block_hash (or chaintip if block_hash is omitted) + + Args: + block_hash: hash of the block + + Returns: + list of constants + """ + + runtime = self.init_runtime(block_hash=block_hash) + + constant_list = [] + + for module_idx, module in enumerate(self.metadata.pallets): + for constant in module.constants or []: + constant_list.append( + self.serialize_constant(constant, module, runtime.runtime_version) + ) + + return constant_list + def get_metadata_constant(self, module_name, constant_name, block_hash=None): """ Retrieves the details of a constant for given module name, call function name and block_hash @@ -2698,6 +2949,100 @@ def query_map( ignore_decoding_errors=ignore_decoding_errors, ) + def create_multisig_extrinsic( + self, + call: GenericCall, + keypair: Keypair, + multisig_account: MultiAccountId, + max_weight: Optional[Union[dict, int]] = None, + era: dict = None, + nonce: int = None, + tip: int = 0, + tip_asset_id: int = None, + signature: Union[bytes, str] = None, + ) -> GenericExtrinsic: + """ + Create a Multisig extrinsic that will be signed by one of the signatories. Checks on-chain if the threshold + of the multisig account is reached and try to execute the call accordingly. + + Args: + call: GenericCall to create extrinsic for + keypair: Keypair of the signatory to approve given call + multisig_account: MultiAccountId to use of origin of the extrinsic (see `generate_multisig_account()`) + max_weight: Maximum allowed weight to execute the call ( Uses `get_payment_info()` by default) + era: Specify mortality in blocks in follow format: {'period': [amount_blocks]} If omitted the extrinsic is + immortal + nonce: nonce to include in extrinsics, if omitted the current nonce is retrieved on-chain + tip: The tip for the block author to gain priority during network congestion + tip_asset_id: Optional asset ID with which to pay the tip + signature: Optionally provide signature if externally signed + + Returns: + GenericExtrinsic + """ + if max_weight is None: + payment_info = self.get_payment_info(call, keypair) + max_weight = payment_info["weight"] + + # Check if call has existing approvals + multisig_details = self.query( + "Multisig", "Multisigs", [multisig_account.value, call.call_hash] + ) + + if multisig_details.value: + maybe_timepoint = multisig_details.value["when"] + else: + maybe_timepoint = None + + # Compose 'as_multi' when final, 'approve_as_multi' otherwise + if ( + multisig_details.value + and len(multisig_details.value["approvals"]) + 1 + == multisig_account.threshold + ): + multi_sig_call = self.compose_call( + "Multisig", + "as_multi", + { + "other_signatories": [ + s + for s in multisig_account.signatories + if s != f"0x{keypair.public_key.hex()}" + ], + "threshold": multisig_account.threshold, + "maybe_timepoint": maybe_timepoint, + "call": call, + "store_call": False, + "max_weight": max_weight, + }, + ) + else: + multi_sig_call = self.compose_call( + "Multisig", + "approve_as_multi", + { + "other_signatories": [ + s + for s in multisig_account.signatories + if s != f"0x{keypair.public_key.hex()}" + ], + "threshold": multisig_account.threshold, + "maybe_timepoint": maybe_timepoint, + "call_hash": call.call_hash, + "max_weight": max_weight, + }, + ) + + return self.create_signed_extrinsic( + multi_sig_call, + keypair, + era=era, + nonce=nonce, + tip=tip, + tip_asset_id=tip_asset_id, + signature=signature, + ) + def submit_extrinsic( self, extrinsic: GenericExtrinsic, @@ -2832,6 +3177,55 @@ def get_metadata_call_function( return call return None + def get_metadata_events(self, block_hash=None) -> list[dict]: + """ + Retrieves a list of all events in metadata active for given block_hash (or chaintip if block_hash is omitted) + + Args: + block_hash + + Returns: + list of module events + """ + + runtime = self.init_runtime(block_hash=block_hash) + + event_list = [] + + for event_index, (module, event) in self.metadata.event_index.items(): + event_list.append( + self.serialize_module_event( + module, event, runtime.runtime_version, event_index + ) + ) + + return event_list + + def get_metadata_event( + self, module_name, event_name, block_hash=None + ) -> Optional[Any]: + """ + Retrieves the details of an event for given module name, call function name and block_hash + (or chaintip if block_hash is omitted) + + Args: + module_name: name of the module to call + event_name: name of the event + block_hash: hash of the block + + Returns: + Metadata event + + """ + + runtime = self.init_runtime(block_hash=block_hash) + + for pallet in runtime.metadata.pallets: + if pallet.name == module_name and pallet.events: + for event in pallet.events: + if event.name == event_name: + return event + def get_block_number(self, block_hash: Optional[str] = None) -> int: """Async version of `substrateinterface.base.get_block_number` method.""" response = self.rpc_request("chain_getHeader", [block_hash]) diff --git a/async_substrate_interface/types.py b/async_substrate_interface/types.py index 754b860..261f73a 100644 --- a/async_substrate_interface/types.py +++ b/async_substrate_interface/types.py @@ -11,7 +11,7 @@ from scalecodec import ss58_encode, ss58_decode, is_valid_ss58_address from scalecodec.base import RuntimeConfigurationObject, ScaleBytes from scalecodec.type_registry import load_type_registry_preset -from scalecodec.types import GenericCall, ScaleType +from scalecodec.types import GenericCall, ScaleType, MultiAccountId from .utils import json @@ -919,3 +919,27 @@ def _encode_account_id(self, account) -> bytes: if isinstance(account, bytes): return account # Already encoded return bytes.fromhex(ss58_decode(account, SS58_FORMAT)) # SS58 string + + def generate_multisig_account( + self, signatories: list, threshold: int + ) -> MultiAccountId: + """ + Generate deterministic Multisig account with supplied signatories and threshold + + Args: + signatories: List of signatories + threshold: Amount of approvals needed to execute + + Returns: + MultiAccountId + """ + + multi_sig_account = MultiAccountId.create_from_account_list( + signatories, threshold + ) + + multi_sig_account.ss58_address = ss58_encode( + multi_sig_account.value.replace("0x", ""), self.ss58_format + ) + + return multi_sig_account