diff --git a/client/src/ledger_app_clients/ethereum/command_builder.py b/client/src/ledger_app_clients/ethereum/command_builder.py index 1467efe1f..97c4fcd76 100644 --- a/client/src/ledger_app_clients/ethereum/command_builder.py +++ b/client/src/ledger_app_clients/ethereum/command_builder.py @@ -7,7 +7,7 @@ from typing import Optional from ragger.bip import pack_derivation_path -from .eip712 import EIP712FieldType +from .eip712.struct import EIP712FieldType, EIP712TypeDescOffset class InsType(IntEnum): @@ -70,6 +70,9 @@ class P2Type(IntEnum): FILTERING_RAW = 0xff NETWORK_CONFIG = 0x00 NETWORK_ICON = 0x01 + SIGN_PROCESS_START = 0x00 + SIGN_STORE = 0x01 + SIGN_START = 0x02 class CommandBuilder: @@ -113,9 +116,9 @@ def eip712_send_struct_def_struct_field(self, key_name: str) -> bytes: data = bytearray() typedesc = 0 - typedesc |= (len(array_levels) > 0) << 7 - typedesc |= (type_size is not None) << 6 - typedesc |= field_type + typedesc |= (len(array_levels) > 0) << EIP712TypeDescOffset.ARRAY + typedesc |= (type_size is not None) << EIP712TypeDescOffset.SIZE + typedesc |= field_type << EIP712TypeDescOffset.TYPE data.append(typedesc) if field_type == EIP712FieldType.CUSTOM: data.append(len(type_name)) diff --git a/client/src/ledger_app_clients/ethereum/eip712/struct.py b/client/src/ledger_app_clients/ethereum/eip712/struct.py index fb0785ee8..6d12c96b9 100644 --- a/client/src/ledger_app_clients/ethereum/eip712/struct.py +++ b/client/src/ledger_app_clients/ethereum/eip712/struct.py @@ -1,6 +1,18 @@ from enum import IntEnum, auto +class EIP712TypeDescOffset(IntEnum): + ARRAY = 7 + SIZE = 6 + TYPE = 0 + + +class EIP712TypeDescMask(IntEnum): + ARRAY = (0b1 << EIP712TypeDescOffset.ARRAY) + SIZE = (0b1 << EIP712TypeDescOffset.SIZE) + TYPE = (0b1111 << EIP712TypeDescOffset.TYPE) + + class EIP712FieldType(IntEnum): CUSTOM = 0 INT = auto() diff --git a/client/src/ledger_app_clients/ethereum/gcs.py b/client/src/ledger_app_clients/ethereum/gcs.py index 092b04f17..d0a6e28e0 100644 --- a/client/src/ledger_app_clients/ethereum/gcs.py +++ b/client/src/ledger_app_clients/ethereum/gcs.py @@ -7,6 +7,21 @@ from .client import TrustedNameType, TrustedNameSource +class TxInfoTag(IntEnum): + VERSION = 0x00 + CHAIN_ID = 0x01 + CONTRACT_ADDR = 0x02 + SELECTOR = 0x03 + FIELDS_HASH = 0x04 + OPERATION_TYPE = 0x05 + CREATOR_NAME = 0x06 + CREATOR_LEGAL_NAME = 0x07 + CREATOR_URL = 0x08 + CONTRACT_NAME = 0x09 + DEPLOY_DATE = 0x0a + SIGNATURE = 0xff + + class TxInfo(TlvSerializable): version: int chain_id: int @@ -49,26 +64,26 @@ def __init__(self, def serialize(self) -> bytes: payload = bytearray() - payload += self.serialize_field(0x00, self.version) - payload += self.serialize_field(0x01, self.chain_id) - payload += self.serialize_field(0x02, self.contract_addr) - payload += self.serialize_field(0x03, self.selector) - payload += self.serialize_field(0x04, self.fields_hash) - payload += self.serialize_field(0x05, self.operation_type) + payload += self.serialize_field(TxInfoTag.VERSION, self.version) + payload += self.serialize_field(TxInfoTag.CHAIN_ID, self.chain_id) + payload += self.serialize_field(TxInfoTag.CONTRACT_ADDR, self.contract_addr) + payload += self.serialize_field(TxInfoTag.SELECTOR, self.selector) + payload += self.serialize_field(TxInfoTag.FIELDS_HASH, self.fields_hash) + payload += self.serialize_field(TxInfoTag.OPERATION_TYPE, self.operation_type) if self.creator_name is not None: - payload += self.serialize_field(0x06, self.creator_name) + payload += self.serialize_field(TxInfoTag.CREATOR_NAME, self.creator_name) if self.creator_legal_name is not None: - payload += self.serialize_field(0x07, self.creator_legal_name) + payload += self.serialize_field(TxInfoTag.CREATOR_LEGAL_NAME, self.creator_legal_name) if self.creator_url is not None: - payload += self.serialize_field(0x08, self.creator_url) + payload += self.serialize_field(TxInfoTag.CREATOR_URL, self.creator_url) if self.contract_name is not None: - payload += self.serialize_field(0x09, self.contract_name) + payload += self.serialize_field(TxInfoTag.CONTRACT_NAME, self.contract_name) if self.deploy_date is not None: - payload += self.serialize_field(0x0a, self.deploy_date) + payload += self.serialize_field(TxInfoTag.DEPLOY_DATE, self.deploy_date) signature = self.signature if signature is None: signature = sign_data(Key.CALLDATA, payload) - payload += self.serialize_field(0xff, signature) + payload += self.serialize_field(TxInfoTag.SIGNATURE, signature) return payload @@ -529,6 +544,13 @@ def serialize(self) -> bytes: return payload +class FieldTag(IntEnum): + VERSION = 0x00 + NAME = 0x01 + PARAM_TYPE = 0x02 + PARAM = 0x03 + + class Field(TlvSerializable): version: int name: str @@ -541,8 +563,8 @@ def __init__(self, version: int, name: str, param: FieldParam): def serialize(self) -> bytes: payload = bytearray() - payload += self.serialize_field(0x00, self.version) - payload += self.serialize_field(0x01, self.name) - payload += self.serialize_field(0x02, self.param.type) - payload += self.serialize_field(0x03, self.param.serialize()) + payload += self.serialize_field(FieldTag.VERSION, self.version) + payload += self.serialize_field(FieldTag.NAME, self.name) + payload += self.serialize_field(FieldTag.PARAM_TYPE, self.param.type) + payload += self.serialize_field(FieldTag.PARAM, self.param.serialize()) return payload diff --git a/client/src/ledger_app_clients/ethereum/tlv.py b/client/src/ledger_app_clients/ethereum/tlv.py index a78830b10..cce3fc63c 100644 --- a/client/src/ledger_app_clients/ethereum/tlv.py +++ b/client/src/ledger_app_clients/ethereum/tlv.py @@ -17,6 +17,9 @@ class FieldTag(IntEnum): TICKER = 0x24 TX_HASH = 0x27 DOMAIN_HASH = 0x28 + SELECTOR = 0x41 + IMPL_ADDRESS = 0x42 + DELEGATION_TYPE = 0x43 BLOCKCHAIN_FAMILY = 0x51 NETWORK_NAME = 0x52 NETWORK_ICON_HASH = 0x53 diff --git a/tests/ragger/fields_utils.py b/tests/ragger/fields_utils.py new file mode 100644 index 000000000..8570c03aa --- /dev/null +++ b/tests/ragger/fields_utils.py @@ -0,0 +1,432 @@ +import json +from typing import List +from client.gcs import PathTuple, PathRef, PathArray, PathLeaf, PathLeafType + + +def is_dynamic_type(param_type: str, param: dict) -> bool: + """Check if a parameter type is dynamic (needs offset/reference).""" + # Dynamic arrays + if param_type.endswith('[]'): + return True + + # Dynamic bytes and string + if param_type in ['string', 'bytes']: + return True + + # Tuples containing dynamic types + if 'components' in param: + return any(is_dynamic_type(c['type'], c) for c in param['components']) + + return False + + +def get_function_params(abi_filename: str, function_name: str) -> List[str]: + """ + Get list of parameter names for a function. + + Args: + abi_filename: Path to ABI JSON file + function_name: Name of the function + + Returns: + List of parameter names + """ + with open(abi_filename, 'r', encoding='utf-8') as f: + abi = json.load(f) + + # Find the function in ABI + for item in abi: + if item.get('type') == 'function' and item.get('name') == function_name: + return [p.get('name', f'param_{i}') for i, p in enumerate(item.get('inputs', []))] + + raise ValueError(f"Function '{function_name}' not found in ABI") + + +def get_tuple_fields(abi_filename: str, function_name: str, tuple_param: str) -> List[str]: + """ + Get list of field names inside a tuple parameter. + + Args: + abi_filename: Path to ABI JSON file + function_name: Name of the function + tuple_param: Name of the tuple parameter + + Returns: + List of field names in the tuple + """ + with open(abi_filename, 'r', encoding='utf-8') as f: + abi = json.load(f) + + # Find the function + for item in abi: + if item.get('type') == 'function' and item.get('name') == function_name: + # Find the tuple parameter + for param in item.get('inputs', []): + if param.get('name') == tuple_param: + if 'components' not in param: + raise ValueError(f"Parameter '{tuple_param}' is not a tuple") + return [c.get('name', f'field_{i}') for i, c in enumerate(param['components'])] + + raise ValueError(f"Tuple parameter '{tuple_param}' not found") + + raise ValueError(f"Function '{function_name}' not found in ABI") + + +def get_path(abi_filename: str, function_name: str, param_name: str) -> List: + """ + Get the DataPath components for a specific parameter. + + Args: + abi_filename: Path to ABI JSON file + function_name: Name of the function + param_name: Name of the parameter to find + + Returns: + List of DataPath components [PathTuple, PathRef, etc.] + """ + with open(abi_filename, 'r', encoding='utf-8') as f: + abi = json.load(f) + + # Find the function in ABI + function_abi = None + for item in abi: + if item.get('type') == 'function' and item.get('name') == function_name: + function_abi = item + break + + if not function_abi: + raise ValueError(f"Function '{function_name}' not found in ABI") + + # Find the parameter + for idx, param in enumerate(function_abi['inputs']): + if param.get('name') == param_name: + return build_path(param, idx) + + # If not found, list available parameters for debugging + available = [p.get('name', f'param_{i}') for i, p in enumerate(function_abi['inputs'])] + raise ValueError( + f"Parameter '{param_name}' not found in function '{function_name}'. " + f"Available parameters: {', '.join(available)}" + ) + + +def build_path(param: dict, tuple_index: int, parent_path: List = None) -> List: + """ + Build the path components for a parameter. + + Returns a list of GCS path objects ready to use in DataPath(). + """ + if parent_path is None: + parent_path = [] + + path = parent_path + [PathTuple(tuple_index)] + param_type = param['type'] + + # Check if the parameter is dynamic + if is_dynamic_type(param_type, param): + path.append(PathRef()) + + # Handle arrays + if '[]' in param_type: + path.append(PathArray(1)) # Default weight=1 + + # Determine the element type (remove the []) + element_type = param_type.rstrip('[]') + + # Check if array elements are dynamic + # For primitive types like uint256[], the elements are static + if element_type in ['string', 'bytes'] or 'tuple' in element_type: + # Elements themselves are dynamic + path.append(PathLeaf(PathLeafType.DYNAMIC)) + else: + # Elements are static (uint256, address, etc.) + path.append(PathLeaf(PathLeafType.STATIC)) + elif param_type in ['string', 'bytes']: + # Dynamic bytes/string + path.append(PathLeaf(PathLeafType.DYNAMIC)) + else: + # Dynamic tuple + path.append(PathLeaf(PathLeafType.STATIC)) + else: + # Static type - direct leaf + path.append(PathLeaf(PathLeafType.STATIC)) + + return path + + +def get_nested_path(parent_abi: str, + parent_function: str, + parent_param: str, + nested_abi: str, + nested_function: str, + nested_param: str) -> List: + """ + Get the DataPath for a nested parameter inside a calldata field. + + Example: + Safe.execTransaction() has a 'data' parameter containing addOwnerWithThreshold() + To get the path to 'owner' inside addOwnerWithThreshold: + + get_nested_path( + "abis/safe_1.4.1.abi.json", "execTransaction", "data", + "abis/safe_1.4.1.abi.json", "addOwnerWithThreshold", "owner" + ) + """ + # Get parent path (to the calldata field) + parent_path = get_path(parent_abi, parent_function, parent_param) + + # Remove the final PathLeaf from parent (we'll go deeper) + parent_path_clean = [p for p in parent_path if not isinstance(p, PathLeaf)] + + # Get nested path + nested_path = get_path(nested_abi, nested_function, nested_param) + + # The nested path starts at offset 0 within the calldata, so we keep its PathTuple + # but it's relative to the parent's data field + return parent_path_clean + nested_path + + +def get_path_in_array(abi_filename: str, + function_name: str, + array_param: str, + element_index: int = None) -> List: + """ + Get the DataPath for an element inside an array parameter. + + Args: + abi_filename: Path to ABI JSON file + function_name: Name of the function + array_param: Name of the array parameter + element_index: Specific index in array, or None for all elements + + Returns: + List of DataPath components + """ + path = get_path(abi_filename, function_name, array_param) + + # The path should already have PathRef() and PathArray() + # We just need to specify the element if needed + if element_index is not None: + # Replace generic PathArray with indexed one + for component in path: + if isinstance(component, PathArray): + # For specific index, we might need different handling + # depending on your GCS implementation + pass + + return path + + +def get_path_in_tuple(abi_filename: str, + function_name: str, + tuple_param: str, + field_name: str) -> List: + """ + Get the DataPath for a field inside a tuple parameter. + + Args: + abi_filename: Path to ABI JSON file + function_name: Name of the function + tuple_param: Name of the tuple parameter + field_name: Name of the field in the tuple (or index as string) + + Returns: + List of DataPath components + """ + with open(abi_filename, 'r', encoding='utf-8') as f: + abi = json.load(f) + + # Find the function + function_abi = None + for item in abi: + if item.get('type') == 'function' and item.get('name') == function_name: + function_abi = item + break + + if not function_abi: + raise ValueError(f"Function '{function_name}' not found") + + # Find the tuple parameter + for idx, param in enumerate(function_abi['inputs']): + if param.get('name') == tuple_param: + if 'components' not in param: + raise ValueError(f"Parameter '{tuple_param}' is not a tuple") + + # Build path to the tuple parameter itself + base_path = build_path(param, idx) + + # Remove the final leaf (we're going deeper into the tuple) + base_path = [p for p in base_path if not isinstance(p, PathLeaf)] + + # Find the field in the tuple components + for field_idx, component in enumerate(param['components']): + if component.get('name') == field_name: + # Add path to the specific field + field_path = [PathTuple(field_idx)] + + # Add final leaf based on field type + if is_dynamic_type(component['type'], component): + field_path.extend([PathRef(), PathLeaf(PathLeafType.DYNAMIC)]) + else: + field_path.append(PathLeaf(PathLeafType.STATIC)) + + return base_path + field_path + + # List available fields for debugging + available = [c.get('name', f'field_{i}') for i, c in enumerate(param['components'])] + raise ValueError( + f"Field '{field_name}' not found in tuple '{tuple_param}'. " + f"Available fields: {', '.join(available)}" + ) + + raise ValueError(f"Tuple parameter '{tuple_param}' not found") + + +def get_all_paths(abi_filename: str, function_name: str) -> dict[str, List]: + """ + Get a dictionary mapping parameter names to their DataPath components. + + Args: + abi_filename: Path to ABI JSON file + function_name: Name of the function + + Returns: + Dictionary with parameter names as keys and path lists as values + """ + params = get_function_params(abi_filename, function_name) + return {param: get_path(abi_filename, function_name, param) for param in params} + + +def get_all_tuple_paths(abi_filename: str, function_name: str, tuple_param: str) -> dict[str, List]: + """ + Get a dictionary mapping tuple field names to their DataPath components. + + Args: + abi_filename: Path to ABI JSON file + function_name: Name of the function + tuple_param: Name of the tuple parameter + + Returns: + Dictionary with field names as keys and path lists as values + """ + fields = get_tuple_fields(abi_filename, function_name, tuple_param) + return {field: get_path_in_tuple(abi_filename, function_name, tuple_param, field) for field in fields} + + +def get_all_tuple_array_paths(abi_filename: str, function_name: str, array_param: str) -> dict[str, List]: + """ + Get paths for fields inside a tuple that is in an array. + + For example: batchExecute has a parameter 'calls' which is an array of tuples. + Each tuple has fields 'to', 'value', 'data'. + + This function returns the complete path including the array navigation. + + Args: + abi_filename: Path to ABI JSON file + function_name: Name of the function + array_param: Name of the array parameter (containing tuples) + + Returns: + Dictionary with field names as keys and complete path lists as values + """ + with open(abi_filename, 'r', encoding='utf-8') as f: + abi = json.load(f) + + # Find the function + function_abi = None + for item in abi: + if item.get('type') == 'function' and item.get('name') == function_name: + function_abi = item + break + + if not function_abi: + raise ValueError(f"Function '{function_name}' not found") + + # Find the array parameter + for idx, param in enumerate(function_abi['inputs']): + if param.get('name') == array_param: + if not param['type'].endswith('[]'): + raise ValueError(f"Parameter '{array_param}' is not an array") + + if 'components' not in param: + raise ValueError(f"Array parameter '{array_param}' does not contain tuples") + + # Build base path to the array + base_path = [PathTuple(idx), PathRef(), PathArray(1)] + + # If the tuple itself is dynamic, add another PathRef + # (this happens when the tuple contains dynamic types) + if is_dynamic_type(param['type'].rstrip('[]'), param): + base_path.append(PathRef()) + + # Get paths for each field in the tuple + result = {} + for field_idx, component in enumerate(param['components']): + field_name = component.get('name', f'field_{field_idx}') + + # Add the tuple field index + field_path = base_path + [PathTuple(field_idx)] + + # Add final leaf based on field type + if is_dynamic_type(component['type'], component): + field_path.extend([PathRef(), PathLeaf(PathLeafType.DYNAMIC)]) + else: + field_path.append(PathLeaf(PathLeafType.STATIC)) + + result[field_name] = field_path + + return result + + raise ValueError(f"Array parameter '{array_param}' not found") + + +# Helper to print the path in a readable format +def print_path(path: List) -> None: + """Convert path components to readable string.""" + components = [] + for p in path: + if isinstance(p, PathTuple): + components.append(f"PathTuple({p.value})") + elif isinstance(p, PathRef): + components.append("PathRef()") + elif isinstance(p, PathArray): + components.append(f"PathArray({p.weight})") + elif isinstance(p, PathLeaf): + components.append(f"PathLeaf(PathLeafType.{p.type.name})") + result = "[\n " + ",\n ".join(components) + ",\n ]" + print(result) + + +def print_all_paths(param_paths: dict[str, List]) -> None: + """Print all the paths in a readable format.""" + for param_name, path in param_paths.items(): + print(f"path for {param_name}:") + print_path(path) + print("---") + + +# Example usage +if __name__ == "__main__": + print("# Example 1: Simple ERC20 transfer") + print("# Get path for '_to' parameter:") + ex_path = get_path("abis/erc20.json", "transfer", "_to") + print_path(ex_path) + + print("\n# Example 2: Nested calldata") + print("# Safe execTransaction -> addOwnerWithThreshold -> owner") + ex_path = get_nested_path( + "abis/safe_1.4.1.abi.json", "execTransaction", "data", + "abis/safe_1.4.1.abi.json", "addOwnerWithThreshold", "owner" + ) + print_path(ex_path) + + print("\n# Example 3: Tuple field in batch.json") + print("# batchExecute -> calls[].to") + ex_path = get_path_in_tuple("abis/batch.json", "batchExecute", "calls", "to") + print_path(ex_path) + + print("\n# Example 4: Array element") + print("# ERC1155 safeBatchTransferFrom -> _ids[]") + ex_path = get_path("abis/erc1155.json", "safeBatchTransferFrom", "_ids") + print_path(ex_path) diff --git a/tests/ragger/test_eip712.py b/tests/ragger/test_eip712.py index f0f420826..a90323a25 100644 --- a/tests/ragger/test_eip712.py +++ b/tests/ragger/test_eip712.py @@ -1,3 +1,5 @@ +# pylint: disable=too-many-lines +# Large test file containing multiple test cases for EIP-712 signing import fnmatch import os from pathlib import Path @@ -9,6 +11,9 @@ import pytest from eth_account.messages import encode_typed_data from constants import ABIS_FOLDER +from test_gcs import compute_inst_hash +from fields_utils import get_all_tuple_array_paths, get_all_paths + import web3 from ragger.backend import BackendInterface @@ -25,9 +30,8 @@ from client.proxy_info import ProxyInfo from client.gcs import ( - Field, ParamType, ParamRaw, Value, TypeFamily, DataPath, PathTuple, ParamTrustedName, - ParamNFT, ParamDatetime, DatetimeType, ParamTokenAmount, ParamToken, ParamCalldata, - ParamAmount, ContainerPath, PathLeaf, PathLeafType, PathRef, PathArray, TxInfo + Field, ParamRaw, Value, TypeFamily, DataPath, PathTuple, ParamTokenAmount, ParamCalldata, + ContainerPath, PathLeaf, PathLeafType, TxInfo ) @@ -375,13 +379,12 @@ def data_set_fixture(request) -> DataSet: def test_eip712_advanced_filtering(scenario_navigator: NavigateWithScenario, - test_name: str, data_set: DataSet, verbose_raw: bool): if verbose_raw and data_set.suffix: pytest.skip("Skipping Verbose mode for this data sets") - snapshots_dirname = test_name + data_set.suffix + snapshots_dirname = scenario_navigator.test_name + data_set.suffix if verbose_raw: settings_toggle(scenario_navigator.backend.device, scenario_navigator.navigator, [SettingID.DISPLAY_HASH]) snapshots_dirname += "-verbose" @@ -390,7 +393,6 @@ def test_eip712_advanced_filtering(scenario_navigator: NavigateWithScenario, def test_eip712_filtering_empty_array(scenario_navigator: NavigateWithScenario, - test_name: str, simu_params: Optional[TxSimu] = None): app_client = EthAppClient(scenario_navigator.backend) @@ -466,7 +468,7 @@ def test_eip712_filtering_empty_array(scenario_navigator: NavigateWithScenario, response = app_client.provide_tx_simulation(simu_params) assert response.status == StatusWord.OK - eip712_new_common(scenario_navigator, data, filters, test_name, with_warning=bool(simu_params is not None)) + eip712_new_common(scenario_navigator, data, filters, scenario_navigator.test_name, with_warning=bool(simu_params is not None)) TOKENS = [ @@ -497,9 +499,8 @@ def tokens_fixture(request) -> list[dict]: def test_eip712_advanced_missing_token(scenario_navigator: NavigateWithScenario, - test_name: str, tokens: list[dict]): - test_name += f"-{len(tokens[0]) == 0}-{len(tokens[1]) == 0}" + test_name = f"{scenario_navigator.test_name}-{len(tokens[0]) == 0}-{len(tokens[1]) == 0}" data = { "types": { @@ -583,10 +584,9 @@ def filt_tn_types_fixture(request) -> list[TrustedNameType]: def test_eip712_advanced_trusted_name(scenario_navigator: NavigateWithScenario, - test_name: str, trusted_name: tuple, filt_tn_types: list[TrustedNameType]): - test_name += f"_{trusted_name[0].name.lower()}_with" + test_name = f"{scenario_navigator.test_name}_{trusted_name[0].name.lower()}_with" for t in filt_tn_types: test_name += f"_{t.name.lower()}" @@ -706,15 +706,13 @@ def gcs_handler(app_client: EthAppClient, json_data: dict) -> None: ), ] # compute instructions hash - inst_hash = hashlib.sha3_256() - for field in fields: - inst_hash.update(field.serialize()) + inst_hash = compute_inst_hash(fields) tx_info = TxInfo( 1, json_data["domain"]["chainId"], bytes.fromhex(json_data["message"]["to"][2:]), get_selector_from_data(json_data["message"]["data"]), - inst_hash.digest(), + inst_hash, "Token transfer", contract_name="USDC", ) @@ -728,8 +726,7 @@ def gcs_handler(app_client: EthAppClient, json_data: dict) -> None: def gcs_handler_batch(app_client: EthAppClient, json_data: dict) -> None: # Load EIP-712 JSON data - filename = "safe_batch" - with open(f"{eip712_json_path()}/{filename}.json", encoding="utf-8") as file: + with open(f"{eip712_json_path()}/safe_batch.json", encoding="utf-8") as file: data = json.load(file) # Define tokens @@ -780,7 +777,7 @@ def gcs_handler_batch(app_client: EthAppClient, json_data: dict) -> None: ]]) # Top level transaction fields definition - # Intermediate execTransaction transaction fields definition + param_paths = get_all_tuple_array_paths(f"{ABIS_FOLDER}/batch.json", "batchExecute", "calls") L0_fields = [ Field( 1, @@ -792,15 +789,7 @@ def gcs_handler_batch(app_client: EthAppClient, json_data: dict) -> None: TypeFamily.BYTES, data_path=DataPath( 1, - [ - PathTuple(0), - PathRef(), - PathArray(), - PathRef(), - PathTuple(2), - PathRef(), - PathLeaf(PathLeafType.DYNAMIC), - ] + param_paths["data"] ), ), Value( @@ -808,14 +797,7 @@ def gcs_handler_batch(app_client: EthAppClient, json_data: dict) -> None: TypeFamily.ADDRESS, data_path=DataPath( 1, - [ - PathTuple(0), - PathRef(), - PathArray(), - PathRef(), - PathTuple(0), - PathLeaf(PathLeafType.STATIC), - ] + param_paths["to"] ), ), amount=Value( @@ -823,23 +805,14 @@ def gcs_handler_batch(app_client: EthAppClient, json_data: dict) -> None: TypeFamily.UINT, data_path=DataPath( 1, - [ - PathTuple(0), - PathRef(), - PathArray(), - PathRef(), - PathTuple(1), - PathLeaf(PathLeafType.STATIC), - ] + param_paths["value"] ), ), ) ), ] # compute instructions hash - L0_hash = hashlib.sha3_256() - for field in L0_fields: - L0_hash.update(field.serialize()) + L0_hash = compute_inst_hash(L0_fields) # Define intermediate execTransaction transaction info L0_tx_info = TxInfo( @@ -847,7 +820,7 @@ def gcs_handler_batch(app_client: EthAppClient, json_data: dict) -> None: data["domain"]["chainId"], bytes.fromhex(json_data["domain"]["verifyingContract"][2:]), get_selector_from_data(batchData), - L0_hash.digest(), + L0_hash, "Batch transactions", creator_name="Ledger", creator_legal_name="Ledger Multisig", @@ -856,6 +829,7 @@ def gcs_handler_batch(app_client: EthAppClient, json_data: dict) -> None: ) # Lower batchExecute transaction fields definition + param_paths = get_all_paths(f"{ABIS_FOLDER}/erc20.json", "transfer") L1_fields = [ Field( 1, @@ -867,10 +841,7 @@ def gcs_handler_batch(app_client: EthAppClient, json_data: dict) -> None: TypeFamily.UINT, data_path=DataPath( 1, - [ - PathTuple(1), - PathLeaf(PathLeafType.STATIC), - ] + param_paths["_value"] ), type_size=32, ), @@ -891,19 +862,14 @@ def gcs_handler_batch(app_client: EthAppClient, json_data: dict) -> None: TypeFamily.ADDRESS, data_path=DataPath( 1, - [ - PathTuple(0), - PathLeaf(PathLeafType.STATIC), - ] + param_paths["_to"] ), ) ) ), ] # compute instructions hash - L1_hash = hashlib.sha3_256() - for sub_field in L1_fields: - L1_hash.update(sub_field.serialize()) + L1_hash = compute_inst_hash(L1_fields) # Define lower batchExecute transaction info L1_tx_info = [ @@ -912,7 +878,7 @@ def gcs_handler_batch(app_client: EthAppClient, json_data: dict) -> None: data["domain"]["chainId"], tokens[0]["address"], get_selector_from_data(tokenData0), - L1_hash.digest(), + L1_hash, "Send", contract_name="USD_Coin", ), @@ -921,7 +887,7 @@ def gcs_handler_batch(app_client: EthAppClient, json_data: dict) -> None: data["domain"]["chainId"], tokens[1]["address"], get_selector_from_data(tokenData1), - L1_hash.digest(), + L1_hash, "Send", contract_name="USD_Coin", ) @@ -1090,8 +1056,7 @@ def test_eip712_gondi(scenario_navigator: NavigateWithScenario): def test_eip712_batch(scenario_navigator: NavigateWithScenario): - filename = "safe_batch" - with open(f"{eip712_json_path()}/{filename}.json", encoding="utf-8") as file: + with open(f"{eip712_json_path()}/safe_batch.json", encoding="utf-8") as file: data = json.load(file) filters = { diff --git a/tests/ragger/test_gcs.py b/tests/ragger/test_gcs.py index 7a668ba0d..3ea78fb67 100644 --- a/tests/ragger/test_gcs.py +++ b/tests/ragger/test_gcs.py @@ -9,22 +9,30 @@ from dynamic_networks_cfg import get_network_config from constants import ABIS_FOLDER +from fields_utils import get_all_tuple_array_paths, get_all_paths, get_all_tuple_paths import client.response_parser as ResponseParser from client.client import EthAppClient, SignMode, TrustedNameType, TrustedNameSource from client.status_word import StatusWord from client.utils import get_selector_from_data from client.gcs import ( - Field, ParamType, ParamRaw, Value, TypeFamily, DataPath, PathTuple, ParamTrustedName, + Field, ParamType, ParamRaw, Value, TypeFamily, DataPath, ParamTrustedName, ParamNFT, ParamDatetime, DatetimeType, ParamTokenAmount, ParamToken, ParamCalldata, - ParamAmount, ContainerPath, PathLeaf, PathLeafType, PathRef, PathArray, TxInfo + ParamAmount, ContainerPath, TxInfo ) from client.tx_simu import TxSimu from client.proxy_info import ProxyInfo from client.dynamic_networks import DynamicNetwork -def test_gcs_nft(scenario_navigator: NavigateWithScenario, test_name: str): +def compute_inst_hash(fields: list[Field]) -> bytes: + inst_hash = hashlib.sha3_256() + for field in fields: + inst_hash.update(field.serialize()) + return inst_hash.digest() + + +def test_gcs_nft(scenario_navigator: NavigateWithScenario): backend = scenario_navigator.backend app_client = EthAppClient(backend) @@ -66,6 +74,7 @@ def test_gcs_nft(scenario_navigator: NavigateWithScenario, test_name: str): with app_client.sign("m/44'/60'/0'/0/0", tx_params, mode=SignMode.STORE): pass + param_paths = get_all_paths(f"{ABIS_FOLDER}/erc1155.json", "safeBatchTransferFrom") fields = [ Field( 1, @@ -77,10 +86,7 @@ def test_gcs_nft(scenario_navigator: NavigateWithScenario, test_name: str): TypeFamily.ADDRESS, data_path=DataPath( 1, - [ - PathTuple(0), - PathLeaf(PathLeafType.STATIC), - ] + param_paths["_from"] ), ), [ @@ -108,10 +114,7 @@ def test_gcs_nft(scenario_navigator: NavigateWithScenario, test_name: str): TypeFamily.ADDRESS, data_path=DataPath( 1, - [ - PathTuple(1), - PathLeaf(PathLeafType.STATIC), - ] + param_paths["_to"] ), ) ) @@ -127,12 +130,7 @@ def test_gcs_nft(scenario_navigator: NavigateWithScenario, test_name: str): type_size=32, data_path=DataPath( 1, - [ - PathTuple(2), - PathRef(), - PathArray(1), - PathLeaf(PathLeafType.STATIC), - ] + param_paths["_ids"] ), ), Value( @@ -153,12 +151,7 @@ def test_gcs_nft(scenario_navigator: NavigateWithScenario, test_name: str): type_size=32, data_path=DataPath( 1, - [ - PathTuple(3), - PathRef(), - PathArray(1), - PathLeaf(PathLeafType.STATIC), - ] + param_paths["_values"] ), ) ) @@ -173,11 +166,7 @@ def test_gcs_nft(scenario_navigator: NavigateWithScenario, test_name: str): TypeFamily.BYTES, data_path=DataPath( 1, - [ - PathTuple(4), - PathRef(), - PathLeaf(PathLeafType.DYNAMIC), - ] + param_paths["_data"] ), ) ) @@ -185,16 +174,14 @@ def test_gcs_nft(scenario_navigator: NavigateWithScenario, test_name: str): ] # compute instructions hash - inst_hash = hashlib.sha3_256() - for field in fields: - inst_hash.update(field.serialize()) + inst_hash = compute_inst_hash(fields) tx_info = TxInfo( 1, tx_params["chainId"], tx_params["to"], get_selector_from_data(tx_params["data"]), - inst_hash.digest(), + inst_hash, "batch transfer NFTs", ) @@ -212,11 +199,10 @@ def test_gcs_nft(scenario_navigator: NavigateWithScenario, test_name: str): app_client.provide_transaction_field_desc(field.serialize()) with app_client.sign(mode=SignMode.START_FLOW): - scenario_navigator.review_approve(test_name=test_name) + scenario_navigator.review_approve() def test_gcs_poap(scenario_navigator: NavigateWithScenario, - test_name: str, simu_params: Optional[TxSimu] = None): backend = scenario_navigator.backend app_client = EthAppClient(backend) @@ -256,6 +242,7 @@ def test_gcs_poap(scenario_navigator: NavigateWithScenario, with app_client.sign("m/44'/60'/0'/0/0", tx_params, mode=SignMode.STORE): pass + param_paths = get_all_paths(f"{ABIS_FOLDER}/poap.abi.json", "mintToken") fields = [ Field( 1, @@ -268,10 +255,7 @@ def test_gcs_poap(scenario_navigator: NavigateWithScenario, type_size=32, data_path=DataPath( 1, - [ - PathTuple(0), - PathLeaf(PathLeafType.STATIC), - ] + param_paths["eventId"] ), ) ) @@ -287,10 +271,7 @@ def test_gcs_poap(scenario_navigator: NavigateWithScenario, type_size=32, data_path=DataPath( 1, - [ - PathTuple(1), - PathLeaf(PathLeafType.STATIC), - ] + param_paths["tokenId"] ), ) ) @@ -305,10 +286,7 @@ def test_gcs_poap(scenario_navigator: NavigateWithScenario, TypeFamily.ADDRESS, data_path=DataPath( 1, - [ - PathTuple(2), - PathLeaf(PathLeafType.STATIC), - ] + param_paths["receiver"] ), ) ) @@ -324,10 +302,7 @@ def test_gcs_poap(scenario_navigator: NavigateWithScenario, type_size=32, data_path=DataPath( 1, - [ - PathTuple(3), - PathLeaf(PathLeafType.STATIC), - ] + param_paths["expirationTime"] ), ), DatetimeType.DT_UNIX @@ -343,11 +318,7 @@ def test_gcs_poap(scenario_navigator: NavigateWithScenario, TypeFamily.BYTES, data_path=DataPath( 1, - [ - PathTuple(4), - PathRef(), - PathLeaf(PathLeafType.DYNAMIC), - ] + param_paths["signature"] ), ) ) @@ -355,16 +326,14 @@ def test_gcs_poap(scenario_navigator: NavigateWithScenario, ] # compute instructions hash - inst_hash = hashlib.sha3_256() - for field in fields: - inst_hash.update(field.serialize()) + inst_hash = compute_inst_hash(fields) tx_info = TxInfo( 1, tx_params["chainId"], tx_params["to"], get_selector_from_data(tx_params["data"]), - inst_hash.digest(), + inst_hash, "mint POAP", creator_name="POAP", creator_legal_name="Proof of Attendance Protocol", @@ -380,12 +349,12 @@ def test_gcs_poap(scenario_navigator: NavigateWithScenario, with app_client.sign(mode=SignMode.START_FLOW): if simu_params is not None: - scenario_navigator.review_approve_with_warning(test_name=test_name) + scenario_navigator.review_approve_with_warning() else: - scenario_navigator.review_approve(test_name=test_name) + scenario_navigator.review_approve() -def test_gcs_1inch(scenario_navigator: NavigateWithScenario, test_name: str): +def test_gcs_1inch(scenario_navigator: NavigateWithScenario): backend = scenario_navigator.backend app_client = EthAppClient(backend) @@ -420,6 +389,8 @@ def test_gcs_1inch(scenario_navigator: NavigateWithScenario, test_name: str): with app_client.sign("m/44'/60'/0'/0/0", tx_params, mode=SignMode.STORE): pass + param_paths = get_all_paths(f"{ABIS_FOLDER}/1inch.abi.json", "swap") + param_tuple_paths = get_all_tuple_paths(f"{ABIS_FOLDER}/1inch.abi.json", "swap", "desc") fields = [ Field( 1, @@ -431,10 +402,7 @@ def test_gcs_1inch(scenario_navigator: NavigateWithScenario, test_name: str): TypeFamily.ADDRESS, data_path=DataPath( 1, - [ - PathTuple(0), - PathLeaf(PathLeafType.STATIC), - ] + param_paths["executor"] ), ) ) @@ -450,11 +418,7 @@ def test_gcs_1inch(scenario_navigator: NavigateWithScenario, test_name: str): type_size=32, data_path=DataPath( 1, - [ - PathTuple(1), - PathTuple(4), - PathLeaf(PathLeafType.STATIC), - ] + param_tuple_paths["amount"] ), ), token=Value( @@ -462,11 +426,7 @@ def test_gcs_1inch(scenario_navigator: NavigateWithScenario, test_name: str): TypeFamily.ADDRESS, data_path=DataPath( 1, - [ - PathTuple(1), - PathTuple(0), - PathLeaf(PathLeafType.STATIC), - ] + param_tuple_paths["srcToken"] ), ), native_currency=[ @@ -485,11 +445,7 @@ def test_gcs_1inch(scenario_navigator: NavigateWithScenario, test_name: str): type_size=32, data_path=DataPath( 1, - [ - PathTuple(1), - PathTuple(5), - PathLeaf(PathLeafType.STATIC), - ] + param_tuple_paths["minReturnAmount"] ), ), token=Value( @@ -497,11 +453,7 @@ def test_gcs_1inch(scenario_navigator: NavigateWithScenario, test_name: str): TypeFamily.ADDRESS, data_path=DataPath( 1, - [ - PathTuple(1), - PathTuple(1), - PathLeaf(PathLeafType.STATIC), - ] + param_tuple_paths["dstToken"] ), ), native_currency=[ @@ -512,16 +464,14 @@ def test_gcs_1inch(scenario_navigator: NavigateWithScenario, test_name: str): ] # compute instructions hash - inst_hash = hashlib.sha3_256() - for field in fields: - inst_hash.update(field.serialize()) + inst_hash = compute_inst_hash(fields) tx_info = TxInfo( 1, tx_params["chainId"], tx_params["to"], get_selector_from_data(tx_params["data"]), - inst_hash.digest(), + inst_hash, "swap", creator_name="1inch", creator_legal_name="1inch Network", @@ -538,10 +488,10 @@ def test_gcs_1inch(scenario_navigator: NavigateWithScenario, test_name: str): app_client.provide_transaction_field_desc(field.serialize()) with app_client.sign(mode=SignMode.START_FLOW): - scenario_navigator.review_approve(test_name=test_name) + scenario_navigator.review_approve() -def test_gcs_proxy(scenario_navigator: NavigateWithScenario, test_name: str): +def test_gcs_proxy(scenario_navigator: NavigateWithScenario): backend = scenario_navigator.backend app_client = EthAppClient(backend) @@ -568,6 +518,7 @@ def test_gcs_proxy(scenario_navigator: NavigateWithScenario, test_name: str): with app_client.sign("m/44'/60'/0'/0/0", tx_params, mode=SignMode.STORE): pass + param_paths = get_all_paths(f"{ABIS_FOLDER}/proxy_implem.abi.json", "transferOwnership") fields = [ Field( 1, @@ -579,10 +530,7 @@ def test_gcs_proxy(scenario_navigator: NavigateWithScenario, test_name: str): TypeFamily.ADDRESS, data_path=DataPath( 1, - [ - PathTuple(0), - PathLeaf(PathLeafType.STATIC), - ] + param_paths["newOwner"] ), ), [TrustedNameType.CONTRACT], @@ -592,9 +540,7 @@ def test_gcs_proxy(scenario_navigator: NavigateWithScenario, test_name: str): ] # compute instructions hash - inst_hash = hashlib.sha3_256() - for field in fields: - inst_hash.update(field.serialize()) + inst_hash = compute_inst_hash(fields) tx_info = TxInfo( 1, @@ -602,7 +548,7 @@ def test_gcs_proxy(scenario_navigator: NavigateWithScenario, test_name: str): # address of the implementation contract bytes.fromhex("1784be6401339fc0fedf7e9379409f5c1bfe9dda"), get_selector_from_data(tx_params["data"]), - inst_hash.digest(), + inst_hash, "transfer ownership", creator_name="EigenLayer", creator_legal_name="Eigen Labs", @@ -645,10 +591,10 @@ def test_gcs_proxy(scenario_navigator: NavigateWithScenario, test_name: str): app_client.provide_transaction_field_desc(field.serialize()) with app_client.sign(mode=SignMode.START_FLOW): - scenario_navigator.review_approve(test_name=test_name) + scenario_navigator.review_approve() -def test_gcs_4226(scenario_navigator: NavigateWithScenario, test_name: str): +def test_gcs_4226(scenario_navigator: NavigateWithScenario): backend = scenario_navigator.backend app_client = EthAppClient(backend) @@ -674,6 +620,7 @@ def test_gcs_4226(scenario_navigator: NavigateWithScenario, test_name: str): pass swell_token_addr = bytes.fromhex("0a6e7ba5042b38349e437ec6db6214aec7b35676") + param_paths = get_all_paths(f"{ABIS_FOLDER}/rSWELL.abi.json", "deposit") fields = [ Field( 1, @@ -686,10 +633,7 @@ def test_gcs_4226(scenario_navigator: NavigateWithScenario, test_name: str): type_size=32, data_path=DataPath( 1, - [ - PathTuple(0), - PathLeaf(PathLeafType.STATIC), - ] + param_paths["assets"] ), ), token=Value( @@ -721,10 +665,7 @@ def test_gcs_4226(scenario_navigator: NavigateWithScenario, test_name: str): TypeFamily.ADDRESS, data_path=DataPath( 1, - [ - PathTuple(1), - PathLeaf(PathLeafType.STATIC), - ] + param_paths["receiver"] ), ), ) @@ -732,16 +673,14 @@ def test_gcs_4226(scenario_navigator: NavigateWithScenario, test_name: str): ] # compute instructions hash - inst_hash = hashlib.sha3_256() - for field in fields: - inst_hash.update(field.serialize()) + inst_hash = compute_inst_hash(fields) tx_info = TxInfo( 1, tx_params["chainId"], tx_params["to"], get_selector_from_data(tx_params["data"]), - inst_hash.digest(), + inst_hash, "deposit", creator_name="Swell", creator_legal_name="Swell Network", @@ -759,11 +698,11 @@ def test_gcs_4226(scenario_navigator: NavigateWithScenario, test_name: str): app_client.provide_transaction_field_desc(field.serialize()) with app_client.sign(mode=SignMode.START_FLOW): - scenario_navigator.review_approve(test_name=test_name) + scenario_navigator.review_approve() # https://etherscan.io/tx/0x07a80f1b359146129f3369af39e7eb2457581109c8300fc2ef81e997a07cf3f0 -def test_gcs_nested_createProxyWithNonce(scenario_navigator: NavigateWithScenario, test_name: str): +def test_gcs_nested_createProxyWithNonce(scenario_navigator: NavigateWithScenario): backend = scenario_navigator.backend app_client = EthAppClient(backend) @@ -816,6 +755,7 @@ def test_gcs_nested_createProxyWithNonce(scenario_navigator: NavigateWithScenari with app_client.sign("m/44'/60'/0'/0/0", tx_params, mode=SignMode.STORE): pass + param_paths = get_all_paths(f"{ABIS_FOLDER}/safe_proxy_factory_1.4.1.abi.json", "createProxyWithNonce") fields = [ Field( 1, @@ -827,10 +767,7 @@ def test_gcs_nested_createProxyWithNonce(scenario_navigator: NavigateWithScenari TypeFamily.ADDRESS, data_path=DataPath( 1, - [ - PathTuple(0), - PathLeaf(PathLeafType.STATIC), - ] + param_paths["_singleton"] ), ), ) @@ -845,11 +782,7 @@ def test_gcs_nested_createProxyWithNonce(scenario_navigator: NavigateWithScenari TypeFamily.BYTES, data_path=DataPath( 1, - [ - PathTuple(1), - PathRef(), - PathLeaf(PathLeafType.DYNAMIC), - ] + param_paths["initializer"] ), ), Value( @@ -857,10 +790,7 @@ def test_gcs_nested_createProxyWithNonce(scenario_navigator: NavigateWithScenari TypeFamily.BYTES, data_path=DataPath( 1, - [ - PathTuple(0), - PathLeaf(PathLeafType.STATIC), - ] + param_paths["_singleton"] ), ), ) @@ -876,10 +806,7 @@ def test_gcs_nested_createProxyWithNonce(scenario_navigator: NavigateWithScenari type_size=32, data_path=DataPath( 1, - [ - PathTuple(2), - PathLeaf(PathLeafType.STATIC), - ] + param_paths["saltNonce"] ), ), ) @@ -887,16 +814,14 @@ def test_gcs_nested_createProxyWithNonce(scenario_navigator: NavigateWithScenari ] # compute instructions hash - inst_hash = hashlib.sha3_256() - for field in fields: - inst_hash.update(field.serialize()) + inst_hash = compute_inst_hash(fields) tx_info = TxInfo( 1, tx_params["chainId"], tx_params["to"], get_selector_from_data(tx_params["data"]), - inst_hash.digest(), + inst_hash, "create a Safe account", creator_name="Safe", creator_legal_name="Safe Ecosystem Foundation", @@ -905,6 +830,7 @@ def test_gcs_nested_createProxyWithNonce(scenario_navigator: NavigateWithScenari app_client.provide_transaction_info(tx_info.serialize()) + param_paths = get_all_paths(f"{ABIS_FOLDER}/safe_1.4.1.abi.json", "setup") sub_fields = [ Field( 1, @@ -916,12 +842,7 @@ def test_gcs_nested_createProxyWithNonce(scenario_navigator: NavigateWithScenari TypeFamily.ADDRESS, data_path=DataPath( 1, - [ - PathTuple(0), - PathRef(), - PathArray(1), - PathLeaf(PathLeafType.STATIC), - ] + param_paths["_owners"] ), ), ) @@ -937,10 +858,7 @@ def test_gcs_nested_createProxyWithNonce(scenario_navigator: NavigateWithScenari type_size=32, data_path=DataPath( 1, - [ - PathTuple(1), - PathLeaf(PathLeafType.STATIC), - ] + param_paths["_threshold"] ), ), ) @@ -955,10 +873,7 @@ def test_gcs_nested_createProxyWithNonce(scenario_navigator: NavigateWithScenari TypeFamily.ADDRESS, data_path=DataPath( 1, - [ - PathTuple(2), - PathLeaf(PathLeafType.STATIC), - ] + param_paths["to"] ), ), ) @@ -973,11 +888,7 @@ def test_gcs_nested_createProxyWithNonce(scenario_navigator: NavigateWithScenari TypeFamily.BYTES, data_path=DataPath( 1, - [ - PathTuple(3), - PathRef(), - PathLeaf(PathLeafType.DYNAMIC), - ] + param_paths["data"] ), ), Value( @@ -985,10 +896,7 @@ def test_gcs_nested_createProxyWithNonce(scenario_navigator: NavigateWithScenari TypeFamily.ADDRESS, data_path=DataPath( 1, - [ - PathTuple(2), - PathLeaf(PathLeafType.STATIC), - ] + param_paths["to"] ), ), ) @@ -1003,10 +911,7 @@ def test_gcs_nested_createProxyWithNonce(scenario_navigator: NavigateWithScenari TypeFamily.ADDRESS, data_path=DataPath( 1, - [ - PathTuple(4), - PathLeaf(PathLeafType.STATIC), - ] + param_paths["fallbackHandler"] ), ), ) @@ -1021,10 +926,7 @@ def test_gcs_nested_createProxyWithNonce(scenario_navigator: NavigateWithScenari TypeFamily.ADDRESS, data_path=DataPath( 1, - [ - PathTuple(5), - PathLeaf(PathLeafType.STATIC), - ] + param_paths["paymentToken"] ), ), ) @@ -1040,10 +942,7 @@ def test_gcs_nested_createProxyWithNonce(scenario_navigator: NavigateWithScenari type_size=32, data_path=DataPath( 1, - [ - PathTuple(6), - PathLeaf(PathLeafType.STATIC), - ] + param_paths["payment"] ), ), ) @@ -1058,29 +957,24 @@ def test_gcs_nested_createProxyWithNonce(scenario_navigator: NavigateWithScenari TypeFamily.ADDRESS, data_path=DataPath( 1, - [ - PathTuple(7), - PathLeaf(PathLeafType.STATIC), - ] + param_paths["paymentReceiver"] ), ), ) ), ] # compute instructions hash - sub_inst_hash = hashlib.sha3_256() - for sub_field in sub_fields: - sub_inst_hash.update(sub_field.serialize()) - + sub_inst_hash = compute_inst_hash(sub_fields) sub_tx_info = TxInfo( 1, tx_params["chainId"], safe.address, get_selector_from_data(safe_data), - sub_inst_hash.digest(), + sub_inst_hash, "setup", ) + param_paths = get_all_paths(f"{ABIS_FOLDER}/safe_l2_setup_1.4.1.abi.json", "setupToL2") sub_sub_fields = [ Field( 1, @@ -1092,25 +986,20 @@ def test_gcs_nested_createProxyWithNonce(scenario_navigator: NavigateWithScenari TypeFamily.ADDRESS, data_path=DataPath( 1, - [ - PathTuple(0), - PathLeaf(PathLeafType.STATIC), - ] + param_paths["l2Singleton"] ), ), ) ), ] # compute instructions hash - sub_sub_inst_hash = hashlib.sha3_256() - for sub_sub_field in sub_sub_fields: - sub_sub_inst_hash.update(sub_sub_field.serialize()) + sub_sub_inst_hash = compute_inst_hash(sub_sub_fields) sub_sub_tx_info = TxInfo( 1, tx_params["chainId"], safe_l2_setup.address, get_selector_from_data(safe_l2_setup_data), - sub_sub_inst_hash.digest(), + sub_sub_inst_hash, "L2 setup", ) @@ -1126,11 +1015,11 @@ def test_gcs_nested_createProxyWithNonce(scenario_navigator: NavigateWithScenari app_client.provide_transaction_field_desc(sub_sub_field.serialize()) with app_client.sign(mode=SignMode.START_FLOW): - scenario_navigator.review_approve(test_name=test_name) + scenario_navigator.review_approve() # https://etherscan.io/tx/0xc5545f13bfaf6f69ae937bc64337405060dc56ce7649ea7051d2bbc3b4316b79 -def test_gcs_nested_execTransaction_send(scenario_navigator: NavigateWithScenario, test_name: str): +def test_gcs_nested_execTransaction_send(scenario_navigator: NavigateWithScenario): backend = scenario_navigator.backend app_client = EthAppClient(backend) @@ -1167,6 +1056,7 @@ def test_gcs_nested_execTransaction_send(scenario_navigator: NavigateWithScenari with app_client.sign("m/44'/60'/0'/0/0", tx_params, mode=SignMode.STORE): pass + param_paths = get_all_paths(f"{ABIS_FOLDER}/safe_1.4.1.abi.json", "execTransaction") fields = [ Field( 1, @@ -1178,11 +1068,7 @@ def test_gcs_nested_execTransaction_send(scenario_navigator: NavigateWithScenari TypeFamily.BYTES, data_path=DataPath( 1, - [ - PathTuple(2), - PathRef(), - PathLeaf(PathLeafType.DYNAMIC), - ] + param_paths["data"] ), ), Value( @@ -1190,10 +1076,7 @@ def test_gcs_nested_execTransaction_send(scenario_navigator: NavigateWithScenari TypeFamily.ADDRESS, data_path=DataPath( 1, - [ - PathTuple(0), - PathLeaf(PathLeafType.STATIC), - ] + param_paths["to"] ), ), amount=Value( @@ -1202,10 +1085,7 @@ def test_gcs_nested_execTransaction_send(scenario_navigator: NavigateWithScenari type_size=32, data_path=DataPath( 1, - [ - PathTuple(1), - PathLeaf(PathLeafType.STATIC), - ] + param_paths["value"] ), ), spender=Value( @@ -1218,16 +1098,14 @@ def test_gcs_nested_execTransaction_send(scenario_navigator: NavigateWithScenari ] # compute instructions hash - inst_hash = hashlib.sha3_256() - for field in fields: - inst_hash.update(field.serialize()) + inst_hash = compute_inst_hash(fields) tx_info = TxInfo( 1, tx_params["chainId"], tx_params["to"], get_selector_from_data(tx_params["data"]), - inst_hash.digest(), + inst_hash, "execute a Safe action", creator_name="Safe", creator_legal_name="Safe Ecosystem Foundation", @@ -1240,11 +1118,11 @@ def test_gcs_nested_execTransaction_send(scenario_navigator: NavigateWithScenari app_client.provide_transaction_field_desc(field.serialize()) with app_client.sign(mode=SignMode.START_FLOW): - scenario_navigator.review_approve(test_name=test_name) + scenario_navigator.review_approve() # https://etherscan.io/tx/0xbeafe22c9e3ddcf85b06f65a56cc3ea8f5b02c323cc433c93c103ad3526db88d -def test_gcs_nested_execTransaction_addOwnerWithThreshold(scenario_navigator: NavigateWithScenario, test_name: str): +def test_gcs_nested_execTransaction_addOwnerWithThreshold(scenario_navigator: NavigateWithScenario): backend = scenario_navigator.backend app_client = EthAppClient(backend) @@ -1285,6 +1163,7 @@ def test_gcs_nested_execTransaction_addOwnerWithThreshold(scenario_navigator: Na with app_client.sign("m/44'/60'/0'/0/0", tx_params, mode=SignMode.STORE): pass + param_paths = get_all_paths(f"{ABIS_FOLDER}/safe_1.4.1.abi.json", "execTransaction") fields = [ Field( 1, @@ -1296,10 +1175,7 @@ def test_gcs_nested_execTransaction_addOwnerWithThreshold(scenario_navigator: Na TypeFamily.ADDRESS, data_path=DataPath( 1, - [ - PathTuple(0), - PathLeaf(PathLeafType.STATIC), - ] + param_paths["to"] ), ), ) @@ -1315,10 +1191,7 @@ def test_gcs_nested_execTransaction_addOwnerWithThreshold(scenario_navigator: Na type_size=32, data_path=DataPath( 1, - [ - PathTuple(1), - PathLeaf(PathLeafType.STATIC), - ] + param_paths["value"] ), ), ) @@ -1333,11 +1206,7 @@ def test_gcs_nested_execTransaction_addOwnerWithThreshold(scenario_navigator: Na TypeFamily.BYTES, data_path=DataPath( 1, - [ - PathTuple(2), - PathRef(), - PathLeaf(PathLeafType.DYNAMIC), - ] + param_paths["data"] ), ), Value( @@ -1345,10 +1214,7 @@ def test_gcs_nested_execTransaction_addOwnerWithThreshold(scenario_navigator: Na TypeFamily.ADDRESS, data_path=DataPath( 1, - [ - PathTuple(0), - PathLeaf(PathLeafType.STATIC), - ] + param_paths["to"] ), ), ) @@ -1364,10 +1230,7 @@ def test_gcs_nested_execTransaction_addOwnerWithThreshold(scenario_navigator: Na type_size=1, data_path=DataPath( 1, - [ - PathTuple(3), - PathLeaf(PathLeafType.STATIC), - ] + param_paths["operation"] ), ), ) @@ -1383,10 +1246,7 @@ def test_gcs_nested_execTransaction_addOwnerWithThreshold(scenario_navigator: Na type_size=32, data_path=DataPath( 1, - [ - PathTuple(4), - PathLeaf(PathLeafType.STATIC), - ] + param_paths["safeTxGas"] ), ), ) @@ -1402,10 +1262,7 @@ def test_gcs_nested_execTransaction_addOwnerWithThreshold(scenario_navigator: Na type_size=32, data_path=DataPath( 1, - [ - PathTuple(5), - PathLeaf(PathLeafType.STATIC), - ] + param_paths["baseGas"] ), ), ) @@ -1421,10 +1278,7 @@ def test_gcs_nested_execTransaction_addOwnerWithThreshold(scenario_navigator: Na type_size=32, data_path=DataPath( 1, - [ - PathTuple(6), - PathLeaf(PathLeafType.STATIC), - ] + param_paths["gasPrice"] ), ), ) @@ -1439,10 +1293,7 @@ def test_gcs_nested_execTransaction_addOwnerWithThreshold(scenario_navigator: Na TypeFamily.ADDRESS, data_path=DataPath( 1, - [ - PathTuple(7), - PathLeaf(PathLeafType.STATIC), - ] + param_paths["gasToken"] ), ), ) @@ -1457,10 +1308,7 @@ def test_gcs_nested_execTransaction_addOwnerWithThreshold(scenario_navigator: Na TypeFamily.ADDRESS, data_path=DataPath( 1, - [ - PathTuple(8), - PathLeaf(PathLeafType.STATIC), - ] + param_paths["refundReceiver"] ), ), ) @@ -1475,11 +1323,7 @@ def test_gcs_nested_execTransaction_addOwnerWithThreshold(scenario_navigator: Na TypeFamily.BYTES, data_path=DataPath( 1, - [ - PathTuple(9), - PathRef(), - PathLeaf(PathLeafType.DYNAMIC), - ] + param_paths["signatures"] ), ), ) @@ -1487,16 +1331,14 @@ def test_gcs_nested_execTransaction_addOwnerWithThreshold(scenario_navigator: Na ] # compute instructions hash - inst_hash = hashlib.sha3_256() - for field in fields: - inst_hash.update(field.serialize()) + inst_hash = compute_inst_hash(fields) tx_info = TxInfo( 1, tx_params["chainId"], tx_params["to"], get_selector_from_data(tx_params["data"]), - inst_hash.digest(), + inst_hash, "execute a Safe action", creator_name="Safe", creator_legal_name="Safe Ecosystem Foundation", @@ -1505,6 +1347,7 @@ def test_gcs_nested_execTransaction_addOwnerWithThreshold(scenario_navigator: Na app_client.provide_transaction_info(tx_info.serialize()) + param_paths = get_all_paths(f"{ABIS_FOLDER}/safe_1.4.1.abi.json", "addOwnerWithThreshold") sub_fields = [ Field( 1, @@ -1516,10 +1359,7 @@ def test_gcs_nested_execTransaction_addOwnerWithThreshold(scenario_navigator: Na TypeFamily.ADDRESS, data_path=DataPath( 1, - [ - PathTuple(0), - PathLeaf(PathLeafType.STATIC), - ] + param_paths["owner"] ), ), ) @@ -1535,26 +1375,21 @@ def test_gcs_nested_execTransaction_addOwnerWithThreshold(scenario_navigator: Na type_size=32, data_path=DataPath( 1, - [ - PathTuple(1), - PathLeaf(PathLeafType.STATIC), - ] + param_paths["_threshold"] ), ), ) ), ] # compute instructions hash - sub_inst_hash = hashlib.sha3_256() - for sub_field in sub_fields: - sub_inst_hash.update(sub_field.serialize()) + sub_inst_hash = compute_inst_hash(sub_fields) sub_tx_info = TxInfo( 1, tx_params["chainId"], contract.address, get_selector_from_data(sub_data), - sub_inst_hash.digest(), + sub_inst_hash, "add owner with threshold", ) @@ -1566,11 +1401,11 @@ def test_gcs_nested_execTransaction_addOwnerWithThreshold(scenario_navigator: Na app_client.provide_transaction_field_desc(sub_field.serialize()) with app_client.sign(mode=SignMode.START_FLOW): - scenario_navigator.review_approve(test_name=test_name) + scenario_navigator.review_approve() # https://etherscan.io/tx/0x5047fedc98f46d2afd94d0a2813ddf0c8fe777ec0739ffd327586a91e1e5a89a -def test_gcs_nested_execTransaction_changeThreshold(scenario_navigator: NavigateWithScenario, test_name: str): +def test_gcs_nested_execTransaction_changeThreshold(scenario_navigator: NavigateWithScenario): backend = scenario_navigator.backend app_client = EthAppClient(backend) @@ -1610,6 +1445,7 @@ def test_gcs_nested_execTransaction_changeThreshold(scenario_navigator: Navigate with app_client.sign("m/44'/60'/0'/0/0", tx_params, mode=SignMode.STORE): pass + param_paths = get_all_paths(f"{ABIS_FOLDER}/safe_1.4.1.abi.json", "execTransaction") fields = [ Field( 1, @@ -1621,10 +1457,7 @@ def test_gcs_nested_execTransaction_changeThreshold(scenario_navigator: Navigate TypeFamily.ADDRESS, data_path=DataPath( 1, - [ - PathTuple(0), - PathLeaf(PathLeafType.STATIC), - ] + param_paths["to"] ), ), [TrustedNameType.ACCOUNT], @@ -1642,10 +1475,7 @@ def test_gcs_nested_execTransaction_changeThreshold(scenario_navigator: Navigate type_size=32, data_path=DataPath( 1, - [ - PathTuple(1), - PathLeaf(PathLeafType.STATIC), - ] + param_paths["value"] ), ), ) @@ -1660,11 +1490,7 @@ def test_gcs_nested_execTransaction_changeThreshold(scenario_navigator: Navigate TypeFamily.BYTES, data_path=DataPath( 1, - [ - PathTuple(2), - PathRef(), - PathLeaf(PathLeafType.DYNAMIC), - ] + param_paths["data"] ), ), Value( @@ -1672,10 +1498,7 @@ def test_gcs_nested_execTransaction_changeThreshold(scenario_navigator: Navigate TypeFamily.ADDRESS, data_path=DataPath( 1, - [ - PathTuple(0), - PathLeaf(PathLeafType.STATIC), - ] + param_paths["to"] ), ), ) @@ -1691,10 +1514,7 @@ def test_gcs_nested_execTransaction_changeThreshold(scenario_navigator: Navigate type_size=1, data_path=DataPath( 1, - [ - PathTuple(3), - PathLeaf(PathLeafType.STATIC), - ] + param_paths["operation"] ), ), ) @@ -1710,10 +1530,7 @@ def test_gcs_nested_execTransaction_changeThreshold(scenario_navigator: Navigate type_size=32, data_path=DataPath( 1, - [ - PathTuple(4), - PathLeaf(PathLeafType.STATIC), - ] + param_paths["safeTxGas"] ), ), ) @@ -1729,10 +1546,7 @@ def test_gcs_nested_execTransaction_changeThreshold(scenario_navigator: Navigate type_size=32, data_path=DataPath( 1, - [ - PathTuple(5), - PathLeaf(PathLeafType.STATIC), - ] + param_paths["baseGas"] ), ), ) @@ -1748,10 +1562,7 @@ def test_gcs_nested_execTransaction_changeThreshold(scenario_navigator: Navigate type_size=32, data_path=DataPath( 1, - [ - PathTuple(6), - PathLeaf(PathLeafType.STATIC), - ] + param_paths["gasPrice"] ), ), ) @@ -1766,10 +1577,7 @@ def test_gcs_nested_execTransaction_changeThreshold(scenario_navigator: Navigate TypeFamily.ADDRESS, data_path=DataPath( 1, - [ - PathTuple(7), - PathLeaf(PathLeafType.STATIC), - ] + param_paths["gasToken"] ), ), ) @@ -1784,10 +1592,7 @@ def test_gcs_nested_execTransaction_changeThreshold(scenario_navigator: Navigate TypeFamily.ADDRESS, data_path=DataPath( 1, - [ - PathTuple(8), - PathLeaf(PathLeafType.STATIC), - ] + param_paths["refundReceiver"] ), ), ) @@ -1802,11 +1607,7 @@ def test_gcs_nested_execTransaction_changeThreshold(scenario_navigator: Navigate TypeFamily.BYTES, data_path=DataPath( 1, - [ - PathTuple(9), - PathRef(), - PathLeaf(PathLeafType.DYNAMIC), - ] + param_paths["signatures"] ), ), ) @@ -1814,16 +1615,14 @@ def test_gcs_nested_execTransaction_changeThreshold(scenario_navigator: Navigate ] # compute instructions hash - inst_hash = hashlib.sha3_256() - for field in fields: - inst_hash.update(field.serialize()) + inst_hash = compute_inst_hash(fields) tx_info = TxInfo( 1, tx_params["chainId"], tx_params["to"], get_selector_from_data(tx_params["data"]), - inst_hash.digest(), + inst_hash, "execute a Safe action", creator_name="Safe", creator_legal_name="Safe Ecosystem Foundation", @@ -1832,6 +1631,7 @@ def test_gcs_nested_execTransaction_changeThreshold(scenario_navigator: Navigate app_client.provide_transaction_info(tx_info.serialize()) + param_paths = get_all_paths(f"{ABIS_FOLDER}/safe_1.4.1.abi.json", "changeThreshold") sub_fields = [ Field( 1, @@ -1844,26 +1644,21 @@ def test_gcs_nested_execTransaction_changeThreshold(scenario_navigator: Navigate type_size=32, data_path=DataPath( 1, - [ - PathTuple(0), - PathLeaf(PathLeafType.STATIC), - ] + param_paths["_threshold"] ), ), ) ), ] # compute instructions hash - sub_inst_hash = hashlib.sha3_256() - for sub_field in sub_fields: - sub_inst_hash.update(sub_field.serialize()) + sub_inst_hash = compute_inst_hash(sub_fields) sub_tx_info = TxInfo( 1, tx_params["chainId"], contract.address, get_selector_from_data(sub_data), - sub_inst_hash.digest(), + sub_inst_hash, "change threshold", ) @@ -1882,10 +1677,10 @@ def test_gcs_nested_execTransaction_changeThreshold(scenario_navigator: Navigate app_client.provide_transaction_field_desc(sub_field.serialize()) with app_client.sign(mode=SignMode.START_FLOW): - scenario_navigator.review_approve(test_name=test_name) + scenario_navigator.review_approve() -def test_gcs_nested_no_param(scenario_navigator: NavigateWithScenario, test_name: str): +def test_gcs_nested_no_param(scenario_navigator: NavigateWithScenario): backend = scenario_navigator.backend app_client = EthAppClient(backend) @@ -1927,6 +1722,7 @@ def test_gcs_nested_no_param(scenario_navigator: NavigateWithScenario, test_name with app_client.sign("m/44'/60'/0'/0/0", tx_params, mode=SignMode.STORE): pass + param_paths = get_all_paths(f"{ABIS_FOLDER}/safe_1.4.1.abi.json", "execTransaction") fields = [ Field( 1, @@ -1938,11 +1734,7 @@ def test_gcs_nested_no_param(scenario_navigator: NavigateWithScenario, test_name TypeFamily.BYTES, data_path=DataPath( 1, - [ - PathTuple(2), - PathRef(), - PathLeaf(PathLeafType.DYNAMIC), - ] + param_paths["data"] ), ), Value( @@ -1950,10 +1742,7 @@ def test_gcs_nested_no_param(scenario_navigator: NavigateWithScenario, test_name TypeFamily.ADDRESS, data_path=DataPath( 1, - [ - PathTuple(0), - PathLeaf(PathLeafType.STATIC), - ] + param_paths["to"] ), ), ) @@ -1961,16 +1750,14 @@ def test_gcs_nested_no_param(scenario_navigator: NavigateWithScenario, test_name ] # compute instructions hash - inst_hash = hashlib.sha3_256() - for field in fields: - inst_hash.update(field.serialize()) + inst_hash = compute_inst_hash(fields) tx_info = TxInfo( 1, tx_params["chainId"], tx_params["to"], get_selector_from_data(tx_params["data"]), - inst_hash.digest(), + inst_hash, "execute a Safe action", creator_name="Safe", creator_legal_name="Safe Ecosystem Foundation", @@ -1997,10 +1784,10 @@ def test_gcs_nested_no_param(scenario_navigator: NavigateWithScenario, test_name app_client.provide_transaction_info(sub_tx_info.serialize()) with app_client.sign(mode=SignMode.START_FLOW): - scenario_navigator.review_approve(test_name=test_name) + scenario_navigator.review_approve() -def test_gcs_no_param(scenario_navigator: NavigateWithScenario, test_name: str): +def test_gcs_no_param(scenario_navigator: NavigateWithScenario): backend = scenario_navigator.backend app_client = EthAppClient(backend) @@ -2039,10 +1826,10 @@ def test_gcs_no_param(scenario_navigator: NavigateWithScenario, test_name: str): app_client.provide_transaction_info(tx_info.serialize()) with app_client.sign(mode=SignMode.START_FLOW): - scenario_navigator.review_approve(test_name=test_name) + scenario_navigator.review_approve() -def test_gcs_trusted_name_token(scenario_navigator: NavigateWithScenario, test_name: str): +def test_gcs_trusted_name_token(scenario_navigator: NavigateWithScenario): backend = scenario_navigator.backend app_client = EthAppClient(backend) @@ -2088,6 +1875,7 @@ def test_gcs_trusted_name_token(scenario_navigator: NavigateWithScenario, test_n with app_client.sign("m/44'/60'/0'/0/0", tx_params, mode=SignMode.STORE): pass + param_paths = get_all_tuple_paths(f"{ABIS_FOLDER}/1inch.abi.json", "swap", "desc") fields = [ Field( 1, @@ -2099,11 +1887,7 @@ def test_gcs_trusted_name_token(scenario_navigator: NavigateWithScenario, test_n TypeFamily.ADDRESS, data_path=DataPath( 1, - [ - PathTuple(1), - PathTuple(0), - PathLeaf(PathLeafType.STATIC), - ] + param_paths["srcToken"] ), ), [TrustedNameType.TOKEN], @@ -2120,11 +1904,7 @@ def test_gcs_trusted_name_token(scenario_navigator: NavigateWithScenario, test_n TypeFamily.ADDRESS, data_path=DataPath( 1, - [ - PathTuple(1), - PathTuple(1), - PathLeaf(PathLeafType.STATIC), - ] + param_paths["dstToken"] ), ), [TrustedNameType.TOKEN], @@ -2134,16 +1914,13 @@ def test_gcs_trusted_name_token(scenario_navigator: NavigateWithScenario, test_n ] # compute instructions hash - inst_hash = hashlib.sha3_256() - for field in fields: - inst_hash.update(field.serialize()) - + inst_hash = compute_inst_hash(fields) tx_info = TxInfo( 1, tx_params["chainId"], tx_params["to"], get_selector_from_data(tx_params["data"]), - inst_hash.digest(), + inst_hash, "swap", creator_name="1inch", creator_legal_name="1inch Network", @@ -2154,8 +1931,7 @@ def test_gcs_trusted_name_token(scenario_navigator: NavigateWithScenario, test_n app_client.provide_transaction_info(tx_info.serialize()) - i = 0 - for field in fields: + for i, field in enumerate(fields): challenge = ResponseParser.challenge(app_client.get_challenge().data) app_client.provide_trusted_name_v2(tokens[i]["address"], tokens[i]["name"], @@ -2164,13 +1940,12 @@ def test_gcs_trusted_name_token(scenario_navigator: NavigateWithScenario, test_n tx_params["chainId"], challenge=challenge) app_client.provide_transaction_field_desc(field.serialize()) - i += 1 with app_client.sign(mode=SignMode.START_FLOW): - scenario_navigator.review_approve(test_name=test_name) + scenario_navigator.review_approve() -def test_gcs_batch(scenario_navigator: NavigateWithScenario, test_name: str): +def test_gcs_batch(scenario_navigator: NavigateWithScenario): backend = scenario_navigator.backend app_client = EthAppClient(backend) @@ -2232,6 +2007,7 @@ def test_gcs_batch(scenario_navigator: NavigateWithScenario, test_name: str): with app_client.sign("m/44'/60'/0'/0/0", tx_params, mode=SignMode.STORE): pass + param_paths = get_all_paths(f"{ABIS_FOLDER}/erc20.json", "transfer") sub_fields = [ Field( 1, @@ -2243,10 +2019,7 @@ def test_gcs_batch(scenario_navigator: NavigateWithScenario, test_name: str): TypeFamily.ADDRESS, data_path=DataPath( 1, - [ - PathTuple(0), - PathLeaf(PathLeafType.STATIC), - ] + param_paths["_to"] ), ) ) @@ -2259,14 +2032,11 @@ def test_gcs_batch(scenario_navigator: NavigateWithScenario, test_name: str): Value( 1, TypeFamily.UINT, - data_path=DataPath( + 32, + DataPath( 1, - [ - PathTuple(1), - PathLeaf(PathLeafType.STATIC), - ] + param_paths["_value"] ), - type_size=32, ), Value( 1, @@ -2277,6 +2047,7 @@ def test_gcs_batch(scenario_navigator: NavigateWithScenario, test_name: str): ), ] + param_paths = get_all_tuple_array_paths(f"{ABIS_FOLDER}/batch.json", "batchExecute", "calls") fields = [ Field( 1, @@ -2288,15 +2059,7 @@ def test_gcs_batch(scenario_navigator: NavigateWithScenario, test_name: str): TypeFamily.BYTES, data_path=DataPath( 1, - [ - PathTuple(0), - PathRef(), - PathArray(), - PathRef(), - PathTuple(2), - PathRef(), - PathLeaf(PathLeafType.DYNAMIC), - ] + param_paths["data"] ), ), Value( @@ -2304,14 +2067,7 @@ def test_gcs_batch(scenario_navigator: NavigateWithScenario, test_name: str): TypeFamily.ADDRESS, data_path=DataPath( 1, - [ - PathTuple(0), - PathRef(), - PathArray(), - PathRef(), - PathTuple(0), - PathLeaf(PathLeafType.STATIC), - ] + param_paths["to"] ), ), amount=Value( @@ -2319,14 +2075,7 @@ def test_gcs_batch(scenario_navigator: NavigateWithScenario, test_name: str): TypeFamily.UINT, data_path=DataPath( 1, - [ - PathTuple(0), - PathRef(), - PathArray(), - PathRef(), - PathTuple(1), - PathLeaf(PathLeafType.STATIC), - ] + param_paths["value"] ), ), ) @@ -2334,16 +2083,14 @@ def test_gcs_batch(scenario_navigator: NavigateWithScenario, test_name: str): ] # compute instructions hash - inst_hash = hashlib.sha3_256() - for field in fields: - inst_hash.update(field.serialize()) + inst_hash = compute_inst_hash(fields) tx_info = TxInfo( 1, tx_params["chainId"], contract.address, get_selector_from_data(data), - inst_hash.digest(), + inst_hash, "Batch transaction", creator_name="WETH", creator_legal_name="Wrapped Ether", @@ -2353,9 +2100,7 @@ def test_gcs_batch(scenario_navigator: NavigateWithScenario, test_name: str): app_client.provide_transaction_info(tx_info.serialize()) # compute instructions hash - sub_inst_hash = hashlib.sha3_256() - for sub_field in sub_fields: - sub_inst_hash.update(sub_field.serialize()) + sub_inst_hash = compute_inst_hash(sub_fields) sub_tx_info = [ TxInfo( @@ -2363,7 +2108,7 @@ def test_gcs_batch(scenario_navigator: NavigateWithScenario, test_name: str): tx_params["chainId"], tokens[0]["address"], get_selector_from_data(data0), - sub_inst_hash.digest(), + sub_inst_hash, "Transfer token", ), TxInfo( @@ -2371,7 +2116,7 @@ def test_gcs_batch(scenario_navigator: NavigateWithScenario, test_name: str): tx_params["chainId"], tokens[1]["address"], get_selector_from_data(data1), - sub_inst_hash.digest(), + sub_inst_hash, "Transfer token", ) ] @@ -2388,10 +2133,10 @@ def test_gcs_batch(scenario_navigator: NavigateWithScenario, test_name: str): app_client.provide_transaction_field_desc(sub_field.serialize()) with app_client.sign(mode=SignMode.START_FLOW): - scenario_navigator.review_approve(test_name=test_name) + scenario_navigator.review_approve() -def test_gcs_batch_2(scenario_navigator: NavigateWithScenario, test_name: str): +def test_gcs_batch_2(scenario_navigator: NavigateWithScenario): backend = scenario_navigator.backend app_client = EthAppClient(backend) @@ -2479,6 +2224,8 @@ def test_gcs_batch_2(scenario_navigator: NavigateWithScenario, test_name: str): with app_client.sign("m/44'/60'/0'/0/0", tx_params, mode=SignMode.STORE): pass + print(f"{'~' * 20} DEBUG L0 {'~' * 20}") + param_paths = get_all_paths(f"{ABIS_FOLDER}/safe_1.4.1.abi.json", "execTransaction") # Top level transaction fields definition L0_fields = [ Field( @@ -2519,11 +2266,7 @@ def test_gcs_batch_2(scenario_navigator: NavigateWithScenario, test_name: str): TypeFamily.BYTES, data_path=DataPath( 1, - [ - PathTuple(2), - PathRef(), - PathLeaf(PathLeafType.DYNAMIC), - ] + param_paths["data"] ), ), Value( @@ -2531,10 +2274,7 @@ def test_gcs_batch_2(scenario_navigator: NavigateWithScenario, test_name: str): TypeFamily.ADDRESS, data_path=DataPath( 1, - [ - PathTuple(0), - PathLeaf(PathLeafType.STATIC), - ] + param_paths["to"] ), ), amount=Value( @@ -2543,10 +2283,7 @@ def test_gcs_batch_2(scenario_navigator: NavigateWithScenario, test_name: str): type_size=32, data_path=DataPath( 1, - [ - PathTuple(1), - PathLeaf(PathLeafType.STATIC), - ] + param_paths["value"] ), ), spender=Value( @@ -2567,10 +2304,7 @@ def test_gcs_batch_2(scenario_navigator: NavigateWithScenario, test_name: str): type_size=32, data_path=DataPath( 1, - [ - PathTuple(4), - PathLeaf(PathLeafType.STATIC), - ] + param_paths["safeTxGas"] ), ), ) @@ -2586,10 +2320,7 @@ def test_gcs_batch_2(scenario_navigator: NavigateWithScenario, test_name: str): type_size=32, data_path=DataPath( 1, - [ - PathTuple(5), - PathLeaf(PathLeafType.STATIC), - ] + param_paths["baseGas"] ), ), Value( @@ -2597,10 +2328,7 @@ def test_gcs_batch_2(scenario_navigator: NavigateWithScenario, test_name: str): TypeFamily.ADDRESS, data_path=DataPath( 1, - [ - PathTuple(6), - PathLeaf(PathLeafType.STATIC), - ] + param_paths["gasPrice"] ), ), [bytes.fromhex("0000000000000000000000000000000000000000")] @@ -2616,10 +2344,7 @@ def test_gcs_batch_2(scenario_navigator: NavigateWithScenario, test_name: str): TypeFamily.ADDRESS, data_path=DataPath( 1, - [ - PathTuple(8), - PathLeaf(PathLeafType.STATIC), - ] + param_paths["refundReceiver"] ), ), [TrustedNameType.ACCOUNT, TrustedNameType.CONTRACT, TrustedNameType.TOKEN], @@ -2628,9 +2353,7 @@ def test_gcs_batch_2(scenario_navigator: NavigateWithScenario, test_name: str): ), ] # compute instructions hash - L0_hash = hashlib.sha3_256() - for field in L0_fields: - L0_hash.update(field.serialize()) + L0_hash = compute_inst_hash(L0_fields) # Define top level transaction info L0_tx_info = TxInfo( @@ -2638,7 +2361,7 @@ def test_gcs_batch_2(scenario_navigator: NavigateWithScenario, test_name: str): tx_params["chainId"], bytes.fromhex("29fcb43b46531bca003ddc8fcb67ffe91900c762"), get_selector_from_data(tx_params["data"]), - L0_hash.digest(), + L0_hash, "sign multisig operation", creator_name="Safe", creator_legal_name="Safe{Wallet}", @@ -2646,6 +2369,8 @@ def test_gcs_batch_2(scenario_navigator: NavigateWithScenario, test_name: str): contract_name="SafeL2", ) + print(f"{'~' * 20} DEBUG L1 {'~' * 20}") + param_paths = get_all_tuple_array_paths(f"{ABIS_FOLDER}/batch.json", "batchExecute", "calls") # Intermediate execTransaction transaction fields definition L1_fields = [ Field( @@ -2658,15 +2383,7 @@ def test_gcs_batch_2(scenario_navigator: NavigateWithScenario, test_name: str): TypeFamily.BYTES, data_path=DataPath( 1, - [ - PathTuple(0), - PathRef(), - PathArray(), - PathRef(), - PathTuple(2), - PathRef(), - PathLeaf(PathLeafType.DYNAMIC), - ] + param_paths["data"] ), ), Value( @@ -2674,14 +2391,7 @@ def test_gcs_batch_2(scenario_navigator: NavigateWithScenario, test_name: str): TypeFamily.ADDRESS, data_path=DataPath( 1, - [ - PathTuple(0), - PathRef(), - PathArray(), - PathRef(), - PathTuple(0), - PathLeaf(PathLeafType.STATIC), - ] + param_paths["to"] ), ), amount=Value( @@ -2689,23 +2399,14 @@ def test_gcs_batch_2(scenario_navigator: NavigateWithScenario, test_name: str): TypeFamily.UINT, data_path=DataPath( 1, - [ - PathTuple(0), - PathRef(), - PathArray(), - PathRef(), - PathTuple(1), - PathLeaf(PathLeafType.STATIC), - ] + param_paths["value"] ), ), ) ), ] # compute instructions hash - L1_hash = hashlib.sha3_256() - for field in L1_fields: - L1_hash.update(field.serialize()) + L1_hash = compute_inst_hash(L1_fields) # Define intermediate execTransaction transaction info L1_tx_info = [ @@ -2714,7 +2415,7 @@ def test_gcs_batch_2(scenario_navigator: NavigateWithScenario, test_name: str): tx_params["chainId"], contract.address, get_selector_from_data(batchData), - L1_hash.digest(), + L1_hash, "Batch transactions", creator_name="Ledger", creator_legal_name="Ledger Multisig", @@ -2723,6 +2424,8 @@ def test_gcs_batch_2(scenario_navigator: NavigateWithScenario, test_name: str): ), ] + print(f"{'~' * 20} DEBUG L2 {'~' * 20}") + param_paths = get_all_paths(f"{ABIS_FOLDER}/erc20.json", "transfer") # Lower batchExecute transaction fields definition L2_fields = [ Field( @@ -2735,10 +2438,7 @@ def test_gcs_batch_2(scenario_navigator: NavigateWithScenario, test_name: str): TypeFamily.UINT, data_path=DataPath( 1, - [ - PathTuple(1), - PathLeaf(PathLeafType.STATIC), - ] + param_paths["_value"] ), type_size=32, ), @@ -2759,19 +2459,14 @@ def test_gcs_batch_2(scenario_navigator: NavigateWithScenario, test_name: str): TypeFamily.ADDRESS, data_path=DataPath( 1, - [ - PathTuple(0), - PathLeaf(PathLeafType.STATIC), - ] + param_paths["_to"] ), ) ) ), ] # compute instructions hash - L2_hash = hashlib.sha3_256() - for sub_field in L2_fields: - L2_hash.update(sub_field.serialize()) + L2_hash = compute_inst_hash(L2_fields) # Define lower batchExecute transaction info L2_tx_info = [ @@ -2780,7 +2475,7 @@ def test_gcs_batch_2(scenario_navigator: NavigateWithScenario, test_name: str): tx_params["chainId"], tokens[0]["address"], get_selector_from_data(tokenData0), - L2_hash.digest(), + L2_hash, "Send", contract_name="USD_Coin", ), @@ -2789,7 +2484,7 @@ def test_gcs_batch_2(scenario_navigator: NavigateWithScenario, test_name: str): tx_params["chainId"], tokens[1]["address"], get_selector_from_data(tokenData1), - L2_hash.digest(), + L2_hash, "Send", contract_name="USD_Coin", ) @@ -2840,4 +2535,4 @@ def test_gcs_batch_2(scenario_navigator: NavigateWithScenario, test_name: str): # Send the full transaction with app_client.sign(mode=SignMode.START_FLOW): - scenario_navigator.review_approve(test_name=test_name) + scenario_navigator.review_approve() diff --git a/tests/ragger/test_tx_simulation.py b/tests/ragger/test_tx_simulation.py index bfb4dd0ef..2b8674083 100644 --- a/tests/ragger/test_tx_simulation.py +++ b/tests/ragger/test_tx_simulation.py @@ -142,10 +142,7 @@ def test_tx_simulation_enabled(backend: BackendInterface, navigator: Navigator) __handle_simulation(app_client, simu_params) -def test_tx_simulation_sign(navigator: Navigator, - scenario_navigator: NavigateWithScenario, - test_name: str, - config: str) -> None: +def test_tx_simulation_sign(scenario_navigator: NavigateWithScenario, config: str) -> None: """Test the TX Simulation APDU with a simple transaction""" backend = scenario_navigator.backend @@ -155,7 +152,7 @@ def test_tx_simulation_sign(navigator: Navigator, if device.is_nano: pytest.skip("Not yet supported on Nano") - __common_setting_handling(device, navigator, app_client, True) + __common_setting_handling(device, scenario_navigator.navigator, app_client, True) tx_params: dict = { "nonce": 21, @@ -174,13 +171,11 @@ def test_tx_simulation_sign(navigator: Navigator, sign_tx_common(scenario_navigator, tx_params, - test_name + f"_{config}", + scenario_navigator.test_name + f"_{config}", with_simu=config not in ("benign", "issue")) -def test_tx_simulation_no_simu(navigator: Navigator, - scenario_navigator: NavigateWithScenario, - test_name: str) -> None: +def test_tx_simulation_no_simu(scenario_navigator: NavigateWithScenario) -> None: """Test the TX Transaction APDU without TX Simulation APDU but with the TRANSACTION_CHECKS setting enabled""" @@ -191,7 +186,7 @@ def test_tx_simulation_no_simu(navigator: Navigator, if device.is_nano: pytest.skip("Not yet supported on Nano") - __common_setting_handling(device, navigator, app_client, True) + __common_setting_handling(device, scenario_navigator.navigator, app_client, True) tx_params: dict = { "nonce": 21, @@ -204,13 +199,11 @@ def test_tx_simulation_no_simu(navigator: Navigator, sign_tx_common(scenario_navigator, tx_params, - test_name, + scenario_navigator.test_name, with_simu=False) -def test_tx_simulation_nft(navigator: Navigator, - scenario_navigator: NavigateWithScenario, - test_name: str) -> None: +def test_tx_simulation_nft(scenario_navigator: NavigateWithScenario) -> None: """Test the TX Simulation APDU with a Plugin & NFT transaction""" backend = scenario_navigator.backend @@ -220,12 +213,12 @@ def test_tx_simulation_nft(navigator: Navigator, if device.is_nano: pytest.skip("Not yet supported on Nano") - __common_setting_handling(device, navigator, app_client, True) + __common_setting_handling(device, scenario_navigator.navigator, app_client, True) simu_params = __get_simu_params("warning", SimuType.TRANSACTION) common_test_nft(scenario_navigator, - test_name, + scenario_navigator.test_name, collecs_721[0], actions_721[0], False, @@ -233,13 +226,11 @@ def test_tx_simulation_nft(navigator: Navigator, simu_params) -def test_tx_simulation_blind_sign(navigator: Navigator, - scenario_navigator: NavigateWithScenario, - test_name: str, - config: str) -> None: +def test_tx_simulation_blind_sign(scenario_navigator: NavigateWithScenario, config: str) -> None: """Test the TX Simulation APDU with a Blind Sign transaction""" backend = scenario_navigator.backend + navigator = scenario_navigator.navigator app_client = EthAppClient(backend) device = backend.device @@ -255,16 +246,13 @@ def test_tx_simulation_blind_sign(navigator: Navigator, blind_sign(navigator, scenario_navigator, - test_name + f"_{config}", + scenario_navigator.test_name + f"_{config}", False, 0.0, simu_params) -def test_tx_simulation_eip191(navigator: Navigator, - scenario_navigator: NavigateWithScenario, - test_name: str, - config: str) -> None: +def test_tx_simulation_eip191(scenario_navigator: NavigateWithScenario, config: str) -> None: """Test the TX Simulation APDU with a Message Streaming based on EIP191""" # TODO Re-activate when partners are ready for eip191 @@ -279,8 +267,9 @@ def test_tx_simulation_eip191(navigator: Navigator, if config in ("benign", "warning"): pytest.skip("Skipping useless tests") - __common_setting_handling(device, navigator, app_client, True) + __common_setting_handling(device, scenario_navigator.navigator, app_client, True) + test_name = scenario_navigator.test_name simu_params = __get_simu_params(config, SimuType.PERSONAL_MESSAGE) msg = "Example `personal_sign` message with TX Simulation" if config == "issue": @@ -294,7 +283,7 @@ def test_tx_simulation_eip191(navigator: Navigator, simu_params) -def test_tx_simulation_eip712(scenario_navigator: NavigateWithScenario, test_name: str) -> None: +def test_tx_simulation_eip712(scenario_navigator: NavigateWithScenario) -> None: """Test the TX Simulation APDU with a Message Streaming based on EIP712""" app_client = EthAppClient(scenario_navigator.backend) @@ -306,9 +295,7 @@ def test_tx_simulation_eip712(scenario_navigator: NavigateWithScenario, test_nam simu_params = __get_simu_params("threat", SimuType.TYPED_DATA) - sign_eip712(scenario_navigator, - test_name, - simu_params) + sign_eip712(scenario_navigator, simu_params) def test_tx_simulation_eip712_v0(scenario_navigator: NavigateWithScenario) -> None: @@ -327,8 +314,7 @@ def test_tx_simulation_eip712_v0(scenario_navigator: NavigateWithScenario) -> No def test_tx_simulation_gcs(navigator: Navigator, - scenario_navigator: NavigateWithScenario, - test_name: str) -> None: + scenario_navigator: NavigateWithScenario) -> None: """Test the TX Simulation APDU with a Message Streaming based on EIP712""" backend = scenario_navigator.backend @@ -342,4 +328,4 @@ def test_tx_simulation_gcs(navigator: Navigator, simu_params = __get_simu_params("warning", SimuType.TRANSACTION) - sign_gcs_poap(scenario_navigator, test_name, simu_params) + sign_gcs_poap(scenario_navigator, simu_params) diff --git a/tools/client b/tools/client new file mode 120000 index 000000000..e3f224674 --- /dev/null +++ b/tools/client @@ -0,0 +1 @@ +../client/src/ledger_app_clients/ethereum/ \ No newline at end of file diff --git a/tools/decode_apdu.py b/tools/decode_apdu.py new file mode 100755 index 000000000..b471124ee --- /dev/null +++ b/tools/decode_apdu.py @@ -0,0 +1,1089 @@ +#!/usr/bin/env python3 +"""Decode APDU replay file to extract transaction details.""" + +from pathlib import Path +import argparse +import logging +from typing import Callable, Optional +import rlp +import re +import requests +from eth_utils import to_int +from eth_abi import decode + +from generate_selector_cache import gen_selector_cache + +from client.command_builder import InsType, P1Type, P2Type +from client.tlv import FieldTag as TLVFieldTag +from client.gcs import TxInfoTag as TagTransactionInfo, FieldTag as TagTransactionField +from client.enum_value import Tag as TagEnumValue +from client.eip712.struct import EIP712FieldType as StructFieldType, EIP712TypeDescMask as StructTypeDescMask + + +APP_CLA: int = 0xE0 + +MAX_BYTES_LEN: int = 70 # Max bytes to be logged fully + +# Local selector cache at module level +LOCAL_SELECTORS = {} +CACHE_FILE = Path(__file__).parent / "function_selectors.json" + +logger = logging.getLogger(__name__) + +transaction = { + "BIP32 path": "", # Formatted BIP32 path + "RLP transaction": bytes(), # Full RLP bytes data + "TX params": {}, # Decoded Transaction fields +} + +tlv_info = { + "Struct": {}, # Temporary TLV payload data and remaining length + "TLV": {}, # Decoded TLV fields +} + + +# =============================================================================== +# Parameters +# =============================================================================== +def init_parser() -> argparse.ArgumentParser: + parser = argparse.ArgumentParser(description="Decode APDU replay file to extract transaction details.") + parser.add_argument("--input", "-i", required=True, help="Input apdu replay file.") + parser.add_argument("--verbose", "-v", action='store_true', help="Verbose mode") + return parser + + +# =============================================================================== +# Logging +# =============================================================================== +def set_logging(verbose: bool = False) -> None: + if verbose: + logger.setLevel(level=logging.DEBUG) + else: + logger.setLevel(level=logging.INFO) + logger.handlers.clear() + handler = logging.StreamHandler() + handler.setFormatter(logging.Formatter("[%(levelname)s] %(message)s")) + logger.addHandler(handler) + + +def print_title(title: str) -> None: + logger.info("=" * 60) + logger.info(f"DECODED {title.upper()}") + logger.info("=" * 60) + + +# =============================================================================== +# BIP32 Path Formatting +# =============================================================================== +def format_bip32_path(path_indices: list) -> str: + """Convert BIP32 path indices to readable format (m/44'/60'/0'/0/0).""" + BIP32_HARDENED = 0x80000000 + path_parts = ["m"] + + for index in path_indices: + if index >= BIP32_HARDENED: + path_parts.append(f"{index - BIP32_HARDENED}'") + else: + path_parts.append(str(index)) + + return "/".join(path_parts) + + +# =============================================================================== +# Selector function Decoding +# =============================================================================== +def load_selector_cache() -> None: + """Load function selector cache from JSON file.""" + global LOCAL_SELECTORS + LOCAL_SELECTORS = gen_selector_cache(logger=logger) + assert LOCAL_SELECTORS + + +def decode_function_selector(selector: str) -> str: + """Decode function selector using local cache first, then API. + + Priority: + 1. Local cache (function_selectors.json) + 2. Online API (4byte.directory) + + Args: + selector: 8-character hex string (without 0x prefix) + + Returns: + Function signature or "Unknown (0x...)" + """ + if not selector or len(selector) != 8: + logger.error("Invalid selector %s", selector) + return "Invalid selector" + + selector = selector.lower() + + # 1. Check local cache first + if selector in LOCAL_SELECTORS: + logger.debug(f"Found selector {selector} in local cache") + return LOCAL_SELECTORS[selector] + + # 2. Try online API as fallback + logger.debug(f"Selector {selector} not in cache, querying API...") + try: + url = f"https://www.4byte.directory/api/v1/signatures/?hex_signature=0x{selector}" + response = requests.get(url, timeout=5) + if response.status_code == 200: + data = response.json() + if data.get('results'): + signature = data['results'][0]['text_signature'] + logger.debug(f"Found selector {selector} online: {signature}") + return signature + logger.debug(f"Failed to retrieve selector {selector} online ({response.status_code})") + except Exception as e: + logger.error(f"Failed to query API for selector {selector}: {e}") + + return f"Unknown (0x{selector})" + + +# =============================================================================== +# DER fields Decoding +# =============================================================================== +def der_decode(data: bytes, offset: int = 0) -> tuple[int, int]: + """Decode a DER-encoded integer from bytes. + + Args: + data: The bytes to decode from + offset: Starting offset in the data + + Returns: + tuple[int, int]: (decoded_value, new_offset) + - decoded_value: The decoded integer value + - new_offset: The offset after reading the value + """ + if offset >= len(data): + raise ValueError("Offset out of bounds") + + first_byte = data[offset] + + # Check if this is a multi-byte length encoding + if first_byte & 0x80: + # The lower 7 bits indicate how many bytes encode the length + num_length_bytes = first_byte & 0x7F + if offset + 1 + num_length_bytes > len(data): + raise ValueError("Invalid DER encoding: not enough bytes for length") + + # Read the actual value from the following bytes + value = int.from_bytes(data[offset + 1:offset + 1 + num_length_bytes], 'big') + new_offset = offset + 1 + num_length_bytes + else: + # Single byte value + value = first_byte + new_offset = offset + 1 + + return value, new_offset + + +# =============================================================================== +# ABI Parameter Parsing +# =============================================================================== +def parse_function_signature(signature: str) -> tuple: + """Parse function signature to extract parameter types. + + Args: + signature: Function signature string. + + Example: + - execTransaction(address,uint256,bytes,uint8,...) + - batchExecute((address,uint256,bytes)[]) + + Returns: ('function_name', ['param_type1', 'param_type2', ...]) + """ + match = re.match(r'(\w+)\((.*)\)', signature) + if not match: + return None, [] + + function_name = match.group(1) + params_str = match.group(2) + + if not params_str: + return function_name, [] + + # Parse parameters (handle nested types like bytes, arrays, tuples) + params = [] + depth = 0 + current_param = "" + + for char in params_str: + if char == '(' or char == '[': + depth += 1 + current_param += char + elif char == ')' or char == ']': + depth -= 1 + current_param += char + elif char == ',' and depth == 0: + params.append(current_param.strip()) + current_param = "" + else: + current_param += char + + if current_param: + params.append(current_param.strip()) + + return function_name, params + + +def format_decoded_value(param_type: str, value, indent: int = 0) -> str: + """Format a decoded value based on its type. + + Args: + param_type: ABI type (e.g., 'address', 'uint256', '(address,uint256,bytes)[]') + value: Decoded value + indent: Indentation level for nested structures + + Returns: + Formatted string representation + """ + prefix = " " * indent + + # Handle arrays (including tuple arrays) + if param_type.endswith('[]'): + if not value: + return "[]" + + inner_type = param_type[:-2] # Remove '[]' + result = f"{prefix}\n" + + for i, item in enumerate(value): + result += f"{prefix} [{i}]:\n" + formatted_item = format_decoded_value(inner_type, item, indent + 2) + result += formatted_item + "\n" + + result += f"{prefix}" + return result + + # Handle tuples + if param_type.startswith('(') and param_type.endswith(')'): + # Extract tuple component types + inner = param_type[1:-1] # Remove parentheses + component_types = [] + depth = 0 + current = "" + + for char in inner: + if char in '([': + depth += 1 + current += char + elif char in ')]': + depth -= 1 + current += char + elif char == ',' and depth == 0: + component_types.append(current.strip()) + current = "" + else: + current += char + + if current: + component_types.append(current.strip()) + + # Format tuple components + if not isinstance(value, (list, tuple)): + return f"{prefix}{value}" + + result = f"{prefix}\n" + for i, (comp_type, comp_value) in enumerate(zip(component_types, value)): + type_name = comp_type.split('[')[0] # Get base type name + if comp_type.startswith('bytes') and isinstance(comp_value, bytes) and len(comp_value.hex()) > MAX_BYTES_LEN: + result += f"{prefix} {type_name}[{len(comp_value)}]: " + else: + result += f"{prefix} {type_name}: " + # Format the component value + if comp_type.startswith('(') or comp_type.endswith('[]'): + result += "\n" + format_decoded_value(comp_type, comp_value, indent + 2) + else: + formatted = format_decoded_value(comp_type, comp_value, 0) + result += formatted.strip() + result += "\n" + + result += f"{prefix}" + return result + + # Handle basic types + if param_type == 'address': + if isinstance(value, bytes): + return f"{prefix}0x{value.hex()}" + return f"{prefix}{value}" + if param_type.startswith('bytes'): + if isinstance(value, bytes): + return f"{prefix}0x{value.hex()}" + return f"{prefix}{value}" + if param_type.startswith('uint') or param_type.startswith('int'): + return f"{prefix}{int(value)}" + if isinstance(value, bytes): + return f"{prefix}0x{value.hex()}" + return f"{prefix}{value}" + + +def decode_function_data(selector: str, data: bytes) -> dict: + """Decode function call data using the function signature. + + Returns: + Dictionary mapping parameter names to decoded values + """ + + if not selector or selector.startswith("Unknown") or \ + selector == "Invalid selector": + logger.debug("Cannot decode function data: invalid or unknown selector") + return {} + + try: + # Extract function signature and parameter types + selector_decoded = selector.split(' - ')[1] + function_name, param_types = parse_function_signature(selector_decoded) + + if not param_types: + logger.debug(f"Function {function_name} has no parameters to decode") # Améliorer le message + return {} + + # Decode the data + decoded_params = decode(param_types, data) + + # Create a dictionary with parameter names + result = {} + for i, (param_type, value) in enumerate(zip(param_types, decoded_params)): + # Create descriptive parameter name + if param_type.startswith('(') and param_type.endswith('[]'): + param_name = f"param_{i} (array of structs)" + elif param_type.startswith('('): + param_name = f"param_{i} (struct)" + elif param_type == "bytes": + # Try to extract the included function selector if present + if isinstance(value, bytes) and len(value) >= 4: + inner_selector = value[:4].hex() + inner_signature = decode_function_selector(inner_selector) + if inner_signature.startswith("Unknown") or inner_signature == "Invalid selector": + param_name = f"param_{i} (bytes)" + else: + param_name = f"param_{i} (bytes, 0x{inner_selector} - {inner_signature})" + value = value[4:] # Remove inner selector for decoding + # Decode inner function data + inner_selector = f"0x{inner_selector} - {inner_signature}" + decoded_params = decode_function_data(inner_selector, value) + format_decoded_params(decoded_params, f"'param_{i}' Function Parameters (bytes)") + else: + param_name = f"param_{i} ({param_type})" + else: + param_name = f"param_{i} ({param_type})" + + # Format the value + result[param_name] = format_decoded_value(param_type, value) + + return result + + except Exception as e: + logger.error(f"Failed to decode function data: {e}") + return {} + + +def format_decoded_params(params: dict, title: str) -> None: + """Format and log decoded parameters in a readable way.""" + if not params: + return + + print_title(title) + + for param_name, value in params.items(): + # For nested structures (arrays, tuples), log directly without prefix + if isinstance(value, str) and ('\n' in value or value.strip().startswith('[')): + logger.info(f"{param_name}:") + # Split and log each line separately + for line in value.split('\n'): + if line.strip(): + logger.info(line) + elif isinstance(value, str): + length = len(value.replace("0x", "")) + if value.startswith("0x"): + length = (length // 2) # Hex string length adjustment without '0x' + if param_name != "selector" and length > MAX_BYTES_LEN: + logger.info(f"{param_name}[{length}]: {value[:MAX_BYTES_LEN]}...") + logger.debug(f"{param_name}[{length}]: {value}") + elif value: + logger.info(f"{param_name}: {value}") + elif isinstance(value, bytes): + value_hex = f"0x{value.hex()}" + length = len(value) + if length > MAX_BYTES_LEN: + logger.info(f"{param_name}[{length}]: {value_hex[:MAX_BYTES_LEN]}...") + logger.debug(f"{param_name}[{length}]: {value_hex}") + elif value: + logger.info(f"{param_name}: {value_hex}") + else: + logger.info(f"{param_name}: {value}") + + +# =============================================================================== +# Value Formatting Helpers +# =============================================================================== +def format_gas_price(value_wei: int) -> str: + """Format gas price in wei, gwei, and ether.""" + if value_wei == 0: + return "0 wei" + + gwei = value_wei / 1e9 + ether = value_wei / 1e18 + + if gwei < 1: + return f"{value_wei} wei" + elif gwei < 1000: + return f"{gwei:.2f} gwei ({value_wei} wei)" + else: + return f"{gwei:.2f} gwei ({ether:.6f} ETH)" + + +def format_amount(value_wei: int) -> str: + """Format amount in wei and ether.""" + if value_wei == 0: + return "0 wei (0 ETH)" + + ether = value_wei / 1e18 + + if ether < 0.000001: + return f"{value_wei} wei" + else: + return f"{value_wei} wei ({ether:.6f} ETH)" + + +def format_gas_limit(value: int) -> str: + """Format gas limit with common reference values.""" + references = { + 21000: "21,000 (simple transfer)", + 65000: "~65,000 (token transfer)", + 100000: "~100,000 (basic contract)", + } + + # Find closest reference + closest = min(references.keys(), key=lambda x: abs(x - value)) + if abs(closest - value) < 10000: + return f"{value:,} ≈ {references[closest]}" + + return f"{value:,}" + + +# =============================================================================== +# RLP Transaction Decoding +# =============================================================================== +def decode_rlp_transaction(rlp_bytes: bytes) -> None: + """Decode RLP-encoded Ethereum transaction, based on EIP-1559 + https://github.com/ethereum/EIPs/blob/master/EIPS/eip-1559.md#abstract + + Args: + rlp_bytes: RLP-encoded transaction bytes + """ + + decoded_tx = rlp.decode(rlp_bytes[1:]) # EIP-1559: Skip prefix byte + nb_field = len(decoded_tx) + + # Extract selector if present + selector = "" + selector_decoded = "" + if nb_field > 7 and decoded_tx[7] and len(decoded_tx[7]) >= 4: + selector = decoded_tx[7][:4].hex() + selector_decoded = decode_function_selector(selector) + + # Extract raw values + chain_id = to_int(decoded_tx[0]) if nb_field > 0 and decoded_tx[0] else 0 + nonce = to_int(decoded_tx[1]) if nb_field > 1 and decoded_tx[1] else 0 + max_priority_fee = to_int(decoded_tx[2]) if nb_field > 2 and decoded_tx[2] else 0 + max_fee = to_int(decoded_tx[3]) if nb_field > 3 and decoded_tx[3] else 0 + gas_limit = to_int(decoded_tx[4]) if nb_field > 4 and decoded_tx[4] else 0 + to_address = to_int(decoded_tx[5]) if nb_field > 5 and decoded_tx[5] else 0 + amount = to_int(decoded_tx[6]) if nb_field > 6 and decoded_tx[6] else 0 + + transaction["TX params"] = { + "chainId": chain_id, + "nonce": nonce, + "maxPriorityFeePerGas": f"{max_priority_fee} -> {format_gas_price(max_priority_fee)}", + "maxFeePerGas": f"{max_fee} -> {format_gas_price(max_fee)}", + "gas": f"{gas_limit} -> {format_gas_limit(gas_limit)}", + "to": f"0x{to_address:040x}" if to_address else "0x0", + "amount": f"{amount} -> {format_amount(amount)}", + "selector": f"0x{selector} - {selector_decoded}", + "data": decoded_tx[7][4:] if nb_field > 7 and decoded_tx[7] else bytes(), + "access_list": decoded_tx[8].hex() if nb_field > 8 and decoded_tx[8] else "", + "signature_y_parity": decoded_tx[9].hex() if nb_field > 9 and decoded_tx[9] else "", + "signature_r_parity": decoded_tx[10].hex() if nb_field > 10 and decoded_tx[10] else "", + "signature_s_parity": decoded_tx[11].hex() if nb_field > 11 and decoded_tx[11] else "", + } + + +# =============================================================================== +# Generic TLV decoding +# =============================================================================== +def decode_generic_tlv(tag_enum_class: type, + string_tags: list = None, + number_tags: list = None, + selector_tags: list = None, + skip_tags: list = None) -> None: + """Decode TLV-encoded structure with generic tag handling. + + Args: + tag_enum_class: The IntEnum class to use for tags (e.g., TagTransactionInfo) + string_tags: List of tags that should be decoded as UTF-8 strings + number_tags: List of tags that should be decoded as integers + selector_tags: List of tags that should be decoded as function selectors + skip_tags: List of tags that should be skipped (not stored) + """ + if string_tags is None: + string_tags = [] + if skip_tags is None: + skip_tags = [] + if number_tags is None: + number_tags = [] + if selector_tags is None: + selector_tags = [] + + tlv_hex = tlv_info["Struct"]["Data"] + tlv_len = len(tlv_hex) + offset = 0 + + while offset < tlv_len: + try: + # Decode tag + tag_value, offset = der_decode(tlv_hex, offset) + tag = tag_enum_class(tag_value) + + # Decode length + length, offset = der_decode(tlv_hex, offset) + + # Extract value + value = tlv_hex[offset:offset + length] + offset += length + + # Skip tags that should not be displayed + if tag in skip_tags: + continue + + # Decode value based on tag type + if tag in number_tags: + val_int = to_int(value) + decoded_value = f"0x{val_int:x} -> {val_int}" + elif tag in string_tags: + try: + decoded_value = value.decode('utf-8') + except UnicodeDecodeError: + decoded_value = f"0x{value.hex()}" + elif tag in selector_tags: + decoded_value = f"0x{value.hex()} - {decode_function_selector(value.hex())}" + else: + decoded_value = f"0x{value.hex()}" + + # Store with tag name as key + tlv_info["TLV"][tag.name] = decoded_value + + except (ValueError, IndexError) as e: + logger.error(f"Failed to decode TLV at offset {offset}: {e}") + break + + +# =============================================================================== +# Decode APDU SIGN +# =============================================================================== +def decode_sign_apdu(apdu: bytes) -> None: + """Decode APDU SIGN command for Ethereum transaction signing. + Args: + apdu: APDU command (without "0x" prefix) + """ + + p1 = to_int(apdu[2]) + p2 = to_int(apdu[3]) + lc = int(apdu[4]) + data = apdu[5:5 + lc] + + if p2 == P2Type.SIGN_STORE: + if p1 == P1Type.SIGN_FIRST_CHUNK: + # Extract derivation path + path_length = to_int(data[0]) + data = data[1:] # Skip path length byte + derived_path = [] + for _ in range(path_length): + index = to_int(data[:4]) + derived_path.append(index) + data = data[4:] # Skip 4 bytes (8 hex chars) for each index + # Format and display the path + transaction["BIP32 path"] = format_bip32_path(derived_path) + + # The remaining data is the RLP-encoded transaction + transaction["RLP transaction"] += data + elif p2 == P2Type.SIGN_START: + print_title("SIGN APDU") + + decode_rlp_transaction(transaction["RLP transaction"]) + + # Log transaction fields + logger.info(f"BIP32 path: {transaction['BIP32 path']}") + logger.info(f"RLP transaction[{len(transaction['RLP transaction'])}]: 0x{transaction['RLP transaction'].hex()[:MAX_BYTES_LEN]}...") + format_decoded_params(transaction["TX params"], "Transaction Params") + + # Print 32-bytes length chunks of RLP transaction + rlp_tx = transaction["TX params"]["data"] + logger.debug("=" * 60) + logger.debug("RLP TRANSACTION DATA (32-byte chunks)") + logger.debug("=" * 60) + for i in range(0, len(rlp_tx), 32): + chunk_index = i // 32 + logger.debug(f"Chunk {chunk_index:3d} [{i:4d}-{i+31:4d}]: {rlp_tx[i:i+32].hex()}") + + # Decode function parameters if available + decoded_params = decode_function_data(transaction["TX params"]["selector"], transaction["TX params"]["data"]) + format_decoded_params(decoded_params, "Function Parameters") + + +# =============================================================================== +# Decode EIP712 APDU +# =============================================================================== +def decode_eip712_struct_def_apdu(apdu: bytes) -> None: + """Decode APDU EIP712 STRUCT DEFINITION command for Ethereum transaction signing. + Args: + apdu: APDU command (without "0x" prefix) + """ + + p2 = to_int(apdu[3]) + lc = int(apdu[4]) + data = apdu[5:5 + lc] + + print_title("EIP712 STRUCT FIELD") + if p2 == P2Type.STRUCT_NAME: + logger.info(f" STRUCT NAME: {data.decode('utf-8')}") + + elif p2 == P2Type.STRUCT_FIELD: + typeDesc = data[0] + data = data[1:] + typeValue: StructFieldType = typeDesc & StructTypeDescMask.TYPE + + if typeValue == StructFieldType.CUSTOM: + # Custom type name + size = to_int(data[0]) + data = data[1:] + name = data[:size].decode('utf-8') + logger.info(f" Field Name: {name}") + data = data[size:] + + if typeDesc & StructTypeDescMask.SIZE: + # Type size + size = to_int(data[0]) + data = data[1:] + logger.info(f" Field Size [{StructFieldType(typeValue).name}]: {size}") + + if typeDesc & StructTypeDescMask.ARRAY: + # Array levels + levels_count = to_int(data[0]) + data = data[1:] + logger.info(f" Field ArrayLevels: {levels_count}") + + keyNameLen = to_int(data[0]) + data = data[1:] + keyName = data[:keyNameLen] + data = data[keyNameLen:] + logger.info(f" Field KeyName[{keyNameLen}]: {keyName.decode('utf-8')}") + + +def decode_eip712_struct_impl_apdu(apdu: bytes) -> None: + """Decode APDU EIP712 STRUCT IMPLEMENTATION command for Ethereum transaction signing. + Args: + apdu: APDU command (without "0x" prefix) + """ + + p1 = to_int(apdu[2]) + p2 = to_int(apdu[3]) + lc = int(apdu[4]) + data = apdu[5:5 + lc] + + print_title("EIP712 STRUCT IMPLEMENTATION") + if p2 == P2Type.STRUCT_NAME: + logger.info(f" STRUCT NAME: {data.decode('utf-8')}") + + elif p2 == P2Type.ARRAY: + size = data[0] + data = data[1:] + logger.info(f" ARRAY SIZE: {size}") + + elif p2 == P2Type.STRUCT_FIELD: + size = to_int(apdu[0:2]) + data = data[2:] + value = data[:size] + logger.info(f" STRUCT FIELD VALUE: {value.hex()}") + + +def __paramPresence(value: int) -> str: + if value == 0: + return "None" + if value == 1: + return "Present (filtered message field)" + elif value == 2: + return "Present (domain’s verifyingContract)" + return "Unknown" + + +def decode_eip712_filtering_apdu(apdu: bytes) -> None: + """Decode APDU EIP712 FILTERING command for Ethereum transaction signing. + Args: + apdu: APDU command (without "0x" prefix) + """ + + p1 = to_int(apdu[2]) + p2 = to_int(apdu[3]) + lc = int(apdu[4]) + data = apdu[5:5 + lc] + + print_title("EIP712 FILTERING") + if p2 == P2Type.FILTERING_DISCARDED_PATH: + size = data[0] + data = data[1:] + value = data[:size] + logger.info(f" PATH[{size}]: {value.hex()}") + + elif p2 == P2Type.FILTERING_MESSAGE_INFO: + size = data[0] + data = data[1:] + value = data[:size] + data = data[size:] + logger.info(f" MESSAGE INFO[{size}]: {value.decode('utf-8')}") + filters = to_int(data[0]) + logger.info(f" FILTERS COUNT: {filters}") + + elif p2 in (P2Type.FILTERING_CALLDATA_SPENDER, + P2Type.FILTERING_CALLDATA_AMOUNT, + P2Type.FILTERING_CALLDATA_SELECTOR, + P2Type.FILTERING_CALLDATA_CHAIN_ID, + P2Type.FILTERING_CALLDATA_CALLEE, + P2Type.FILTERING_CALLDATA_VALUE, + P2Type.FILTERING_AMOUNT_JOIN_TOKEN): + logger.info(f" CALLDATA {P2Type(p2).name.split('_')[-1]} index: {to_int(data[0])}") + + elif p2 == P2Type.FILTERING_CALLDATA_INFO: + logger.info(f" CALLDATA INFO index: {to_int(data[0])}") + logger.info(f" CALLDATA INFO value filter flag: {str(bool(data[1]))}") + logger.info(f" CALLDATA INFO callee filter flag: {__paramPresence(data[2])}") + logger.info(f" CALLDATA INFO ChaindId filter flag: {str(bool(data[3]))}") + logger.info(f" CALLDATA INFO Selector filter flag: {str(bool(data[4]))}") + logger.info(f" CALLDATA INFO Amount filter flag: {str(bool(data[5]))}") + logger.info(f" CALLDATA INFO Spender filter flag: {__paramPresence(data[6])}") + + elif p2 == P2Type.FILTERING_TRUSTED_NAME: + size = data[0] + data = data[1:] + value = data[:size] + data = data[size:] + logger.info(f" CALLDATA TRUSTED NAME[{size}]: {value.decode('utf-8')}") + for elt in ("TYPES", "SOURCES"): + size = data[0] + data = data[1:] + value = data[:size] + data = data[size:] + logger.info(f" CALLDATA TRUSTED {elt}[{size}]: {value.hex()}") + + elif p2 in (P2Type.FILTERING_DATETIME, P2Type.FILTERING_RAW, P2Type.FILTERING_AMOUNT_JOIN_VALUE): + size = data[0] + data = data[1:] + value = data[:size] + logger.info(f" CALLDATA {P2Type(p2).name.split('_')[-1]} Name: {value.decode('utf-8')}") + if p2 == P2Type.FILTERING_AMOUNT_JOIN_VALUE: + data = data[size:] + logger.info(f" CALLDATA VALUE TOKEN INDEX: {to_int(data[0])}") + + elif p2 == P2Type.FILTERING_AMOUNT_JOIN_VALUE: + size = data[0] + data = data[1:] + logger.info(f" AMOUNT FIELD SIZE: {size}") + + +def decode_sign_eip712_apdu(apdu: bytes) -> None: + """Decode APDU SIGN command for Ethereum EIP712 transaction signing. + Args: + apdu: APDU command (without "0x" prefix) + """ + + p2 = to_int(apdu[3]) + lc = int(apdu[4]) + data = apdu[5:5 + lc] + + print_title("EIP712 SIGNING") + # Extract derivation path + path_length = to_int(data[0]) + data = data[1:] # Skip path length byte + derived_path = [] + for _ in range(path_length): + index = to_int(data[:4]) + derived_path.append(index) + data = data[4:] # Skip 4 bytes (8 hex chars) for each index + # Format and display the path + logger.info(f"BIP32 path: {format_bip32_path(derived_path)}") + + if p2 == P2Type.V0_IMPLEM: + domain_hash = data[:32] + data = data[32:] + message_hash = data[:32] + logger.info(f"Domain Separator Hash: 0x{domain_hash.hex()}") + logger.info(f"Message Hash: 0x{message_hash.hex()}") + + +# =============================================================================== +# Transaction Info TLV decoding +# =============================================================================== +def decode_struct_information_tlv() -> None: + """Decode TLV-encoded Transaction Info structure""" + + string_tags = [ + TagTransactionInfo.CREATOR_NAME, + TagTransactionInfo.CREATOR_LEGAL_NAME, + TagTransactionInfo.CREATOR_URL, + TagTransactionInfo.CONTRACT_NAME, + TagTransactionInfo.DEPLOY_DATE, + TagTransactionInfo.OPERATION_TYPE, + ] + skip_tags = [ + TagTransactionInfo.SIGNATURE, + TagTransactionInfo.FIELDS_HASH, + ] + number_tags = [ + TagTransactionInfo.CHAIN_ID, + ] + selector_tags = [ + TagTransactionInfo.SELECTOR, + ] + decode_generic_tlv(TagTransactionInfo, string_tags, number_tags, selector_tags, skip_tags) + + +# =============================================================================== +# Transaction Field TLV decoding +# =============================================================================== +def decode_struct_field_tlv() -> None: + """Decode TLV-encoded Transaction Field structure""" + + string_tags = [ + TagTransactionField.NAME, + ] + decode_generic_tlv(TagTransactionField, string_tags) + + +# =============================================================================== +# Enum Value TLV decoding +# =============================================================================== +def decode_enum_value_tlv() -> None: + """Decode TLV-encoded Enum Value structure""" + + string_tags = [ + TagEnumValue.NAME, + ] + skip_tags = [ + TagEnumValue.SIGNATURE, + ] + number_tags = [ + TagEnumValue.CHAIN_ID, + ] + selector_tags = [ + TagEnumValue.SELECTOR, + ] + decode_generic_tlv(TagEnumValue, string_tags, number_tags, selector_tags, skip_tags) + + +# =============================================================================== +# Standard payload TLV decoding +# =============================================================================== +def decode_tlv() -> None: + """Decode TLV-encoded payload""" + + string_tags = [ + TLVFieldTag.TICKER, + TLVFieldTag.NETWORK_NAME, + TLVFieldTag.TX_CHECKS_PROVIDER_MSG, + TLVFieldTag.TX_CHECKS_TINY_URL, + ] + skip_tags = [ + TLVFieldTag.CHALLENGE, + TLVFieldTag.DER_SIGNATURE, + TLVFieldTag.TX_HASH, + TLVFieldTag.DOMAIN_HASH, + TLVFieldTag.NETWORK_ICON_HASH, + ] + number_tags = [ + TLVFieldTag.CHAIN_ID, + TLVFieldTag.SIGNER_KEY_ID, + TLVFieldTag.SIGNER_ALGO, + TLVFieldTag.COIN_TYPE, + TLVFieldTag.THRESHOLD, + TLVFieldTag.SIGNERS_COUNT, + ] + selector_tags = [ + TLVFieldTag.SELECTOR, + ] + decode_generic_tlv(TLVFieldTag, string_tags, number_tags, selector_tags, skip_tags) + + +# =============================================================================== +# Generic TLV payload parser +# =============================================================================== +def decode_tlv_apdu(apdu: bytes, tlv_parser: Callable, title: str) -> None: + """Decode APDU for providing TLV-encoded structure information. + + Args: + apdu: Hex string of the APDU command (without "0x" prefix) + tlv_parser: Function to parse the TLV structure + title: Title for logging + """ + + p1 = to_int(apdu[2]) + lc = int(apdu[4]) + data = apdu[5:5 + lc] + + tlv_info["TLV"].clear() + + if p1 == P1Type.FIRST_CHUNK: + # Extract derivation path + tlv_info["Struct"]["Length"] = to_int(data[0:2]) + tlv_info["Struct"]["Data"] = data[2:] # Skip length + tlv_info["Struct"]["Length"] -= (lc - 2) # Adjust remaining length + else: + tlv_info["Struct"]["Data"] += data + tlv_info["Struct"]["Length"] -= lc # Adjust remaining length + + # Check if all chunks received + if tlv_info["Struct"]["Length"] == 0: + # All chunks received, decode struct information + tlv_parser() + format_decoded_params(tlv_info["TLV"], title) + + +# =============================================================================== +# Decode APDU ERC20 Token Information +# =============================================================================== +def decode_erc20_token_apdu(apdu: bytes) -> None: + """Decode APDU for providing ERC20 token information.""" + lc = int(apdu[4]) + data = apdu[5:5 + lc] + + # Simulte a TLV for logging purpose + tlv_info["TLV"].clear() + + # Ticker length + ticker_length = int(data[0]) + data = data[1:] + tlv_info["TLV"]["Ticker"] = data[:ticker_length].decode('utf-8') + data = data[ticker_length:] + # Address + tlv_info["TLV"]["Address"] = f"0x{data[:20].hex()}" + data = data[20:] + # Decimals + tlv_info["TLV"]["Decimals"] = to_int(data[0:4]) + data = data[4:] + # Chain ID + tlv_info["TLV"]["Chain ID"] = to_int(data[0:4]) + + format_decoded_params(tlv_info["TLV"], "ERC20 token") + + +# =============================================================================== +# APDU File Parser +# =============================================================================== +def parse_apdu_line(line: str) -> Optional[bytes]: + """Parse a line from APDU replay file. + + Handles two formats: + 1. Raw hex string: "e0040001ff..." + 2. Direction prefix: "=> e0040001ff..." or "<= 9000" + + Args: + line: Line from the APDU file + + Returns: + bytes: Parsed APDU bytes, or None if should be skipped + """ + line = line.strip().lower() + + if not line: + return None + + # Check for direction prefix + if line.startswith("=>"): + # Outgoing APDU (to device) - extract hex data + hex_data = line[2:].strip() + try: + return bytes.fromhex(hex_data) + except ValueError: + logger.warning(f"Invalid hex data in outgoing APDU: {line}") + return None + if line.startswith("<="): + # Incoming APDU (from device) - skip response + logger.debug(f"Skipping response: {line}") + return None + + # Raw hex string (legacy format) - parse directly + try: + return bytes.fromhex(line) + except ValueError: + logger.warning(f"Invalid hex data: {line}") + return None + + +# =============================================================================== +# Main entry +# =============================================================================== +def main() -> None: + parser = init_parser() + args = parser.parse_args() + + set_logging(args.verbose) + + # Load selector cache at startup + load_selector_cache() + logger.debug(f"Loaded {len(LOCAL_SELECTORS)} function selectors from cache") + + try: + with open(args.input, encoding='utf-8') as f: + for line in f.readlines(): + # Parse APDU line + apdu_bytes = parse_apdu_line(line) + + # Skip if parsing failed or it's a response + if apdu_bytes is None: + continue + + if to_int(apdu_bytes[0]) != APP_CLA: + logger.debug(f"Skipping non-Ethereum APDU (CLA={apdu_bytes[0]:02x})") + continue + + # Extract INS byte + ins = apdu_bytes[1] + + # Dispatch to appropriate decoder + if ins == InsType.SIGN: + decode_sign_apdu(apdu_bytes) + elif ins == InsType.PROVIDE_ERC20_TOKEN_INFORMATION: + decode_erc20_token_apdu(apdu_bytes) + elif ins == InsType.PROVIDE_TRANSACTION_INFO: + decode_tlv_apdu(apdu_bytes, decode_struct_information_tlv, "Transaction Info Struct") + elif ins == InsType.PROVIDE_TRANSACTION_FIELD_DESC: + decode_tlv_apdu(apdu_bytes, decode_struct_field_tlv, "Transaction Field Struct") + elif ins == InsType.PROVIDE_ENUM_VALUE: + decode_tlv_apdu(apdu_bytes, decode_enum_value_tlv, "Enum Value Struct") + elif ins == InsType.PROVIDE_PROXY_INFO: + decode_tlv_apdu(apdu_bytes, decode_tlv, "Proxy Info Struct") + elif ins == InsType.PROVIDE_NETWORK_INFORMATION: + decode_tlv_apdu(apdu_bytes, decode_tlv, "Network Info Struct") + elif ins == InsType.EIP712_SEND_STRUCT_DEF: + decode_eip712_struct_def_apdu(apdu_bytes) + elif ins == InsType.EIP712_SEND_STRUCT_IMPL: + decode_eip712_struct_impl_apdu(apdu_bytes) + elif ins == InsType.EIP712_SEND_FILTERING: + decode_eip712_filtering_apdu(apdu_bytes) + elif ins == InsType.EIP712_SIGN: + decode_sign_eip712_apdu(apdu_bytes) + else: + logger.info(f"*** Skipping INS type: {ins:02x} -> {InsType(ins).name}") + except FileNotFoundError: + logger.error(f"Failed to open file: {args.input}") + return 1 + except Exception as e: + logger.error(f"Unexpected error: {e}") + return 1 + + return 0 + + +if __name__ == "__main__": + exit(main()) diff --git a/tools/generate_selector_cache.py b/tools/generate_selector_cache.py new file mode 100644 index 000000000..c17fc1cfa --- /dev/null +++ b/tools/generate_selector_cache.py @@ -0,0 +1,157 @@ +#!/usr/bin/env python3 +"""Generate function selector cache from ABI files.""" + +import json +import logging +from pathlib import Path +from typing import Dict, Optional +from eth_utils import keccak + + +def calculate_function_selector(signature: str) -> str: + """Calculate function selector from signature. + + Args: + signature: Function signature like "transfer(address,uint256)" + + Returns: + 8-character hex string of the selector + """ + signature = signature.strip() + hash_bytes = keccak(text=signature) + selector = hash_bytes[:4].hex() + return selector + + +def parse_abi_type(abi_type: dict) -> str: + """Parse ABI type definition to string representation. + + Args: + abi_type: ABI type object with 'type', 'components', etc. + + Returns: + String representation like "address", "uint256", "(address,uint256)[]" + """ + base_type = abi_type['type'] + + # Handle tuple types (structs) + if base_type.startswith('tuple'): + if 'components' not in abi_type: + return base_type + + # Build tuple signature recursively + component_types = [parse_abi_type(comp) for comp in abi_type['components']] + tuple_sig = f"({','.join(component_types)})" + + # Handle arrays of tuples + if base_type.endswith('[]'): + return f"{tuple_sig}[]" + else: + return tuple_sig + + return base_type + + +def extract_function_signature(func: dict) -> str: + """Extract function signature from ABI function definition. + + Args: + func: ABI function object + + Returns: + Function signature like "transfer(address,uint256)" + """ + name = func['name'] + + if 'inputs' not in func or not func['inputs']: + return f"{name}()" + + # Parse input types + input_types = [parse_abi_type(input_param) for input_param in func['inputs']] + + return f"{name}({','.join(input_types)})" + + +def process_abi_file(abi_path: Path, logger: Optional[logging.Logger]) -> Dict[str, str]: + """Process a single ABI file and extract all function signatures. + + Args: + abi_path: Path to ABI JSON file + + Returns: + Dictionary mapping selector to signature + """ + + try: + with open(abi_path, 'r') as f: + abi = json.load(f) + except Exception as e: + logger.error(f"Failed to load {abi_path}: {e}") + return {} + + + selectors = {} + + for item in abi: + # Only process function definitions + if item.get('type') != 'function': + continue + + # Extract signature + signature = extract_function_signature(item) + + # Calculate selector + selector = calculate_function_selector(signature) + + # Store in cache + if selector in selectors and selectors[selector] != signature: + logger.warning(f"Selector collision: {selector}") + logger.warning(f" Existing: {selectors[selector]}") + logger.warning(f" New: {signature}") + + selectors[selector] = signature + + return selectors + + +def scan_abi_directory(directory: Path, logger: Optional[logging.Logger]) -> Dict[str, str]: + """Scan directory for ABI files and extract all signatures. + + Args: + directory: Directory containing ABI JSON files + + Returns: + Dictionary mapping selector to signature + """ + all_selectors = {} + + # Find all JSON files + json_files = list(directory.glob('*.json')) + list(directory.glob('**/*.json')) + + if not json_files: + logger.warning(f"No JSON files found in {directory}") + return {} + + for abi_path in sorted(json_files): + selectors = process_abi_file(abi_path, logger) + + # Merge with existing selectors + for selector, signature in selectors.items(): + if selector in all_selectors and all_selectors[selector] != signature: + logger.warning(f"Duplicate selector {selector} from {abi_path.name}") + logger.warning(f" Keeping: {all_selectors[selector]}") + logger.warning(f" Ignoring: {signature}") + else: + all_selectors[selector] = signature + + return all_selectors + + +def gen_selector_cache(input_path: Path = Path("tests/ragger/abis"), logger: Optional[logging.Logger] = None) -> Dict[str, str]: + # Validate input directory + assert input_path.exists() + + assert input_path.is_dir() + + # Scan ABI files + return scan_abi_directory(input_path, logger) diff --git a/tools/requirements.txt b/tools/requirements.txt new file mode 100644 index 000000000..a2af1f03f --- /dev/null +++ b/tools/requirements.txt @@ -0,0 +1,5 @@ +eth_abi +eth_utils +eth-hash[pycryptodome] +requests +rlp