diff --git a/fuzz_utils/converters/EchidnaConverter.py b/fuzz_utils/converters/EchidnaConverter.py new file mode 100644 index 0000000..ee02973 --- /dev/null +++ b/fuzz_utils/converters/EchidnaConverter.py @@ -0,0 +1,334 @@ +""" Converts Echidna call sequences into the Medusa format""" +import os +import json +import copy +from typing import Any, NoReturn +from collections import defaultdict +from web3 import Web3 + +from slither import Slither +from slither.core.declarations.contract import Contract +from slither.core.declarations.function_contract import FunctionContract +from slither.core.solidity_types.elementary_type import ElementaryType +from slither.core.solidity_types.user_defined_type import UserDefinedType +from slither.core.solidity_types.array_type import ArrayType +from slither.core.declarations.structure import Structure +from slither.core.declarations.structure_contract import StructureContract +from slither.core.declarations.enum import Enum +from fuzz_utils.utils.crytic_print import CryticPrint + +from fuzz_utils.utils.encoding import parse_echidna_byte_string +from fuzz_utils.utils.encoding import strip_hex_leading_zero +from fuzz_utils.utils.error_handler import handle_exit + +# pylint: disable=too-many-instance-attributes +class EchidnaConverter: + """Converts call sequences from a corpus into an Echidna or Medusa format""" + + __MEDUSA_CALL_FORMAT = { + "call": { + "from": "", + "to": "", + "nonce": 0, + "value": "", + "gasLimit": 12500000, + "gasPrice": "0x1", + "gasFeeCap": "0x0", + "gasTipCap": "0x0", + "data": "", + "dataAbiValues": {"methodName": "", "inputValues": []}, + "AccessList": None, + "SkipAccountChecks": False, + }, + "blockNumberDelay": 0, + "blockTimestampDelay": 0, + } + + def __init__( + self, + target_name: str, + corpus_path: str, + gas_fee_cap: str, + gas_tip_cap: str, + slither: Slither, + target_address: str, + deployer: str, + ) -> None: + self.corpus_path = corpus_path + self.gas_fee_cap = gas_fee_cap + self.gas_tip_cap = gas_tip_cap + self.slither = slither + self.target_name = target_name + self.target = self.get_target_contract() + self.w3 = Web3(Web3.EthereumTesterProvider()) + self.contract = self.w3.eth.contract( + abi=self.target.file_scope.abi( + self.target.compilation_unit._crytic_compile_compilation_unit, self.target.name + ) + ) + self.target_address = target_address + self.deployer = deployer + + def convert(self) -> tuple[list, list, list, list]: + """Parses corpus files and returns lists of converted files and file names""" + converted_coverage = [] + converted_reproducers = [] + ( + coverage_list, + reproducers_list, + coverage_file_names, + reproducers_file_names, + ) = self._fetch_corpus() + + for coverage in coverage_list: + single_file = self._parse_corpus_file(coverage) + converted_coverage.append(single_file) + + for reproducer in reproducers_list: + single_file = self._parse_corpus_file(reproducer) + converted_reproducers.append(single_file) + + return ( + converted_coverage, + converted_reproducers, + coverage_file_names, + reproducers_file_names, + ) + + def get_target_contract(self) -> Contract: + """Finds and returns Slither Contract""" + contracts = self.slither.get_contract_from_name(self.target_name) + # Loop in case slither fetches multiple contracts for some reason (e.g., similar names?) + for contract in contracts: + if contract.name == self.target_name: + return contract + + # TODO throw error if no contract found + handle_exit(f"\n* Slither could not find the specified contract `{self.target_name}`.") + + def _parse_corpus_file(self, calls: list) -> list: + users_nonces = defaultdict(lambda: 0) + # NOTE: This is a hack to get around a Medusa error, won't work when deploying multiple contracts + users_nonces[self.deployer] = 1 + medusa_call_list = [] + for _, call in enumerate(calls): + _, call_str = self._parse_call_object(call, users_nonces) + medusa_call_list.append(call_str) + + return medusa_call_list + + def _parse_call_object(self, call: dict, nonces: dict) -> tuple[dict, str]: + """Converts a Echidna call dict into a Medusa call dict""" + parsed_call: dict = copy.deepcopy(self.__MEDUSA_CALL_FORMAT) + + if call["call"]["tag"] == "NoCall": + # TODO figure out how Medusa does empty calls + return {}, "" + # The sender, receiver, and value are encoded the same in both formats + # So we can just copy it over to our template dict + parsed_call["call"]["from"] = call["src"] + parsed_call["call"]["to"] = self.target_address if self.target_address else call["dst"] + parsed_call["call"]["value"] = strip_hex_leading_zero(call["value"]) + parsed_call["call"]["nonce"] = nonces[call["src"]] + # Update sender nonce + nonces[call["src"]] += 1 + parsed_call["call"]["gasLimit"] = call["gas"] + parsed_call["call"]["gasPrice"] = strip_hex_leading_zero(call["gasprice"]) + # TODO check if these are the correct indexes + parsed_call["blockTimestampDelay"] = int(call["delay"][0], 16) + parsed_call["blockNumberDelay"] = int(call["delay"][1], 16) + # Echidna does not have a concept of gasFeeCap and gasTipCap in the call sequences + # Since these values seem to be configurable only once per campaign (i.e., they should be the same for the whole corpus) + # we can safely use the default or make it configurable via CLI flags + if self.gas_fee_cap: + parsed_call["call"]["gasFeeCap"] = self.gas_fee_cap + if self.gas_tip_cap: + parsed_call["call"]["gasTipCap"] = self.gas_tip_cap + # AccessList: null + # SkipAccountChecks: false + + # dataAbiValues: {"methodName": "", "inputValues": []} + method_name = call["call"]["contents"][0] + parsed_call["call"]["dataAbiValues"]["methodName"] = method_name + + slither_entry_point: FunctionContract + + for entry_point in self.target.functions_entry_points: + if entry_point.name == method_name: + slither_entry_point = entry_point + + # Parse function input + function_inputs = call["call"]["contents"][1] + parsed_inputs = self._decode_function_params(function_inputs, False, slither_entry_point) + parsed_call["call"]["dataAbiValues"]["inputValues"] = copy.deepcopy(parsed_inputs) + + _, types, _ = slither_entry_point.signature + for idx, input_type in enumerate(types): + if "bytes" in input_type: + parsed_inputs[idx] = Web3.to_bytes(text=parsed_inputs[idx]) + + parsed_call["call"]["data"] = self.contract.encodeABI( + fn_name=method_name, args=parsed_inputs + ) + + return parsed_call, json.dumps(parsed_call) + + def _decode_function_params( + self, function_params: list, recursive: bool, entry_point: Any + ) -> list: + params = [] + index = 0 + # 1. Get a list of function parameters in hex, and their types + # 2. Decode the types and add to the input list + for param_idx, param in enumerate(function_params): + input_parameter = None + if recursive: + if isinstance(entry_point, list): + input_parameter = entry_point[param_idx].type + else: + input_parameter = entry_point.type + + else: + input_parameter = entry_point.parameters[param_idx].type + + match input_parameter: + case ElementaryType(): # type: ignore[misc] + params.append(self._match_elementary_types(param)) # type: ignore[unreachable] + case ArrayType(): # type: ignore[misc] + inputs, new_index = self._match_array_type( # type: ignore[unreachable,unpacking-non-sequence] + param, index, input_parameter + ) + params.append(inputs) + index = new_index + case UserDefinedType(): # type: ignore[misc] + func_params = self._match_user_defined_type(param, input_parameter) # type: ignore[unreachable, unpacking-non-sequence] + params.append(func_params) + case _: + # TODO should handle all cases, but keeping this just in case + CryticPrint().print_information( + f"\n* Attempted to decode an unidentified type {input_parameter}, this call will be skipped. Please open an issue at https://github.com/crytic/fuzz-utils/issues" + ) + continue + return params + + # pylint: disable=R0201 + def _match_elementary_types(self, param: dict) -> Any | NoReturn: + """ + Returns a string which represents a elementary type literal value. e.g. "5" or "uint256(5)" + + Parameters: + param (dict): A dictionary containing information about the function parameter + recursive (int): A boolean, determining if the elementary type should be casted + e.g., "uint256(5)", or returned as is, e.g., "5". + + Returns: + (str): String of the input parameter literal + """ + match param["tag"]: + case "AbiBool": + # Represented as True or False, needs to be lowercased to work + return param["contents"] + case "AbiUInt" | "AbiInt": + # The numbers are represented as [format (e.g., 8 for uint8), "number" (string of number)] + # So I can just append the second element string + return int(param["contents"][1]) + case "AbiAddress": + return param["contents"] + case "AbiBytes" | "AbiBytesDynamic": + is_fixed_size = isinstance(param["contents"], list) + contents = param["contents"][1] if is_fixed_size else param["contents"] + + # Haskell encoding needs to be stripped and then converted to a hex literal + return parse_echidna_byte_string(contents.strip('"')) + case "AbiString": + # TODO Strings are not converted correctly, they're pure hex for now + hex_string = parse_echidna_byte_string(param["contents"].strip('"')) + return bytes.fromhex(hex_string).decode("utf-8") + case _: + handle_exit( + f"\n* The parameter tag `{param['tag']}` could not be found in the call object. This could indicate an issue in decoding the call sequence, or a missing feature. Please open an issue at https://github.com/crytic/fuzz-utils/issues" + ) + + def _match_array_type( + self, param: dict, index: int, input_parameter: Any + ) -> tuple[list, int] | NoReturn: + match param["tag"]: + case "AbiArray": + # Consider cases where the array items are more complex types (bytes, string, tuples) + func_params = self._decode_function_params( + param["contents"][2], True, input_parameter + ) + return func_params, index + case "AbiArrayDynamic": + # Consider cases where the array items are more complex types (bytes, string, tuples) + func_params = self._decode_function_params( + param["contents"][1], True, input_parameter + ) + index += 1 + + return func_params, index + case _: + handle_exit( + f"\n* The parameter tag `{param['tag']}` could not be found in the call object. This could indicate an issue in decoding the call sequence, or a missing feature. Please open an issue at https://github.com/crytic/fuzz-utils/issues" + ) + + def _match_user_defined_type(self, param: dict, input_parameter: Any) -> str | NoReturn: + match param["tag"]: + case "AbiTuple": + match input_parameter.type: + case Structure() | StructureContract(): # type: ignore[misc] + entry_point = input_parameter.type.elems_ordered # type: ignore[unreachable] + func_params = self._decode_function_params( # type: ignore[unreachable] + param["contents"], True, entry_point + ) + struct_params = {} + for param_idx, parameter in enumerate(func_params): + field_parameter = entry_point[param_idx].name + struct_params[f"{field_parameter}"] = parameter + + return struct_params + case _: + handle_exit( + f"\n* The parameter type `{input_parameter.type}` could not be found. This could indicate an issue in decoding the call sequence, or a missing feature. Please open an issue at https://github.com/crytic/fuzz-utils/issues" + ) + case "AbiUInt": + if isinstance(input_parameter.type, Enum): + enum_uint = self._match_elementary_types(param) + return enum_uint + + # TODO is this even reachable? + handle_exit( + f"\n* The parameter type `{input_parameter.type}` does not match the intended type `Enum`. This could indicate an issue in decoding the call sequence, or a missing feature. Please open an issue at https://github.com/crytic/fuzz-utils/issues" + ) + case _: + handle_exit( + f"\n* The parameter tag `{param['tag']}` could not be found in the call object. This could indicate an issue in decoding the call sequence, or a missing feature. Please open an issue at https://github.com/crytic/fuzz-utils/issues" + ) + + # Load the corpus and reproducers one by one + def _fetch_corpus(self) -> tuple[list, list, list, list]: + """Fetches the Echidna coverage and reproducers from the corpus and returns two lists""" + coverage_list: list = [] + reproducers_list: list = [] + + coverage_path = os.path.join(self.corpus_path, "coverage") + reproducers_path = os.path.join(self.corpus_path, "reproducers") + + coverage_file_names, coverage_list = self._parse_files(coverage_path) + reproducers_file_names, reproducers_list = self._parse_files(reproducers_path) + + return coverage_list, reproducers_list, coverage_file_names, reproducers_file_names + + def _parse_files(self, path: str) -> tuple[list, list]: + """Parses all files in a directory and returns a list""" + name_list = [] + file_list = [] + for entry in os.listdir(path): + full_path = os.path.join(path, entry) + + if os.path.isfile(full_path): + with open(full_path, "r", encoding="utf8") as file: + file_list.append(json.load(file)) + name_list.append(entry.split(".")[0]) + + return name_list, file_list ## diff --git a/fuzz_utils/converters/__init__.py b/fuzz_utils/converters/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/fuzz_utils/main.py b/fuzz_utils/main.py index 2b93933..b14dad6 100644 --- a/fuzz_utils/main.py +++ b/fuzz_utils/main.py @@ -13,6 +13,7 @@ from fuzz_utils.templates.foundry_templates import templates from fuzz_utils.fuzzers.Medusa import Medusa from fuzz_utils.fuzzers.Echidna import Echidna +from fuzz_utils.converters.EchidnaConverter import EchidnaConverter from fuzz_utils.utils.error_handler import handle_exit @@ -49,7 +50,7 @@ def get_target_contract(self) -> Contract: # TODO throw error if no contract found sys.exit(-1) - def create_poc(self) -> str: + def create_poc(self, no_save: bool) -> str: """Takes in a directory path to the echidna reproducers and generates a test file""" file_list = [] @@ -78,90 +79,224 @@ def create_poc(self) -> str: inheritance_path = f"{self.inheritance_path}{self.target_name}" # 5. Save the test file - test_file_str = template.render( - file_path=f"{inheritance_path}.sol", - target_name=self.target_name, - amount=0, - tests=tests_list, - fuzzer=self.fuzzer.name, - ) - with open(f"{write_path}_{self.fuzzer.name}_Test.t.sol", "w", encoding="utf-8") as outfile: - outfile.write(test_file_str) + if not no_save: + test_file_str = template.render( + file_path=f"{inheritance_path}.sol", + target_name=self.target_name, + amount=0, + tests=tests_list, + fuzzer=self.fuzzer.name, + ) + with open( + f"{write_path}_{self.fuzzer.name}_Test.t.sol", "w", encoding="utf-8" + ) as outfile: + outfile.write(test_file_str) + CryticPrint().print_success( + f"Generated a test file in {write_path}_{self.fuzzer.name}_Test.t.sol" + ) + return test_file_str + CryticPrint().print_success( - f"Generated a test file in {write_path}_{self.fuzzer.name}_Test.t.sol" + "No files were saved. File saving is turned off using the --no-save flag" ) - return test_file_str + return "" +# pylint: disable=too-many-locals,too-many-branches,too-many-statements def main() -> None: # type: ignore[func-returns-value] """The main entry point""" parser = argparse.ArgumentParser( prog="fuzz-utils", description="Generate test harnesses for Echidna failed properties." ) - parser.add_argument("file_path", help="Path to the Echidna test harness.") - parser.add_argument( + subparsers = parser.add_subparsers(dest="command", help="sub-command help") + + # The command parser for generating unit tests + parser_generate = subparsers.add_parser( + "generate", help="Generate unit tests from fuzzer corpora sequences." + ) + parser_generate.add_argument("file_path", help="Path to the Echidna/Medusa test harness.") + parser_generate.add_argument( "-cd", "--corpus-dir", dest="corpus_dir", help="Path to the corpus directory", required=True ) - parser.add_argument("-c", "--contract", dest="target_contract", help="Define the contract name") - parser.add_argument( + parser_generate.add_argument( + "-c", "--contract", dest="target_contract", help="Define the contract name" + ) + parser_generate.add_argument( "-td", "--test-directory", dest="test_directory", help="Define the directory that contains the Foundry tests.", ) - parser.add_argument( + parser_generate.add_argument( "-i", "--inheritance-path", dest="inheritance_path", help="Define the relative path from the test directory to the directory src/contracts directory.", ) - parser.add_argument( + parser_generate.add_argument( "-f", "--fuzzer", dest="selected_fuzzer", help="Define the fuzzer used. Valid inputs: 'echidna', 'medusa'", ) - parser.add_argument( + parser_generate.add_argument( "--version", help="displays the current version", version=require("fuzz-utils")[0].version, action="version", ) + parser_generate.add_argument( + "--no-save", + dest="no_save", + action="store_true", + ) - args = parser.parse_args() - - missing_args = [arg for arg, value in vars(args).items() if value is None] - if missing_args: - parser.print_help() - handle_exit(f"\n* Missing required arguments: {', '.join(missing_args)}") + # The command parser for converting between corpus formats + parser_convert = subparsers.add_parser( + "convert", help="Convert a Echidna/Medusa corpus into the other format." + ) + parser_convert.add_argument("file_path", help="Path to the Echidna/Medusa test harness.") + parser_convert.add_argument( + "-cd", "--corpus-dir", dest="corpus_dir", help="Path to the corpus directory", required=True + ) + parser_convert.add_argument( + "-c", "--contract", dest="target_contract", required=True, help="Define the contract name" + ) + parser_convert.add_argument( + "-f", + "--fuzzer", + dest="selected_fuzzer", + required=True, + help="Define the fuzzer used. Valid inputs: 'echidna', 'medusa'", + ) + parser_convert.add_argument( + "-gfc", + "--gas-fee-cap", + dest="gas_fee_cap", + help="Define the gasFeeCap to be used in Medusa corpora.", + ) + parser_convert.add_argument( + "-gtc", + "--gas-tip-cap", + dest="gas_tip_cap", + help="Define the gasTipCap to be used in Medusa corpora.", + ) + parser_convert.add_argument( + "-o", + "--output-dir", + dest="output_dir", + help="Define the output directory where the result will be saved.", + ) + parser_convert.add_argument( + "-t", + "--target-address", + dest="target_address", + help="Define the address of the target contract. Needed because Medusa and Echidna deploy contracts to different addresses", + ) + parser_convert.add_argument( + "-d", + "--deployer", + dest="deployer", + help="Define the address of the Echidna/Medusa deployer.", + ) + args = parser.parse_args() + selected_fuzzer = args.selected_fuzzer.lower() file_path = args.file_path corpus_dir = args.corpus_dir - test_directory = args.test_directory - inheritance_path = args.inheritance_path target_contract = args.target_contract slither = Slither(file_path) - fuzzer: Echidna | Medusa - - match args.selected_fuzzer.lower(): - case "echidna": - fuzzer = Echidna(target_contract, corpus_dir, slither) - case "medusa": - fuzzer = Medusa(target_contract, corpus_dir, slither) - case _: - handle_exit( - f"\n* The requested fuzzer {args.selected_fuzzer} is not supported. Supported fuzzers: echidna, medusa." - ) - CryticPrint().print_information( - f"Generating Foundry unit tests based on the {fuzzer.name} reproducers..." - ) - foundry_test = FoundryTest( - inheritance_path, target_contract, corpus_dir, test_directory, slither, fuzzer - ) - foundry_test.create_poc() - CryticPrint().print_success("Done!") + if args.command == "generate": + test_directory = args.test_directory + inheritance_path = args.inheritance_path + + fuzzer: Echidna | Medusa + + match selected_fuzzer: + case "echidna": + fuzzer = Echidna(target_contract, corpus_dir, slither) + case "medusa": + fuzzer = Medusa(target_contract, corpus_dir, slither) + case _: + handle_exit( + f"\n* The requested fuzzer {selected_fuzzer} is not supported. Supported fuzzers: echidna, medusa." + ) + + CryticPrint().print_information( + f"Generating Foundry unit tests based on the {fuzzer.name} reproducers..." + ) + foundry_test = FoundryTest( + inheritance_path, target_contract, corpus_dir, test_directory, slither, fuzzer + ) + foundry_test.create_poc(no_save=args.no_save) + CryticPrint().print_success("Done!") + elif args.command == "convert": + corpus_dir = args.corpus_dir + output_dir = args.output_dir + match selected_fuzzer: + case "echidna": + CryticPrint().print_information( + f"Converting Echidna corpus {corpus_dir} to Medusa format..." + ) + converter = EchidnaConverter( + target_contract, + corpus_dir, + args.gas_fee_cap, + args.gas_tip_cap, + slither, + args.target_address, + args.deployer, + ) + ( + converted_coverage, + converted_reproducers, + coverage_file_names, + reproducers_file_names, + ) = converter.convert() + + # TODO Check if output dir exists + if not os.path.exists(output_dir): + os.makedirs(output_dir) + CryticPrint().print_success(f"{output_dir} created!") + + if not os.path.exists(output_dir + "/call_sequences"): + os.makedirs(output_dir + "/call_sequences/immutable") + os.makedirs(output_dir + "/call_sequences/mutable") + + if not os.path.exists(output_dir + "/test_results"): + os.makedirs(output_dir + "/test_results") + + # Check if "coverage" and "reproducers" dirs exist + + for idx, file_name in enumerate(coverage_file_names): + coverage_file = "[" + ",".join(converted_coverage[idx]) + "]" + with open( + f"{output_dir}/call_sequences/mutable/{file_name}.json", + "w", + encoding="utf-8", + ) as outfile: + outfile.write(coverage_file) + with open( + f"{output_dir}/call_sequences/immutable/{file_name}.json", + "w", + encoding="utf-8", + ) as outfile: + outfile.write(coverage_file) + + for idx, file_name in enumerate(reproducers_file_names): + reproducer_file = "[" + ",".join(converted_reproducers[idx]) + "]" + with open( + f"{output_dir}/test_results/{file_name}.json", "w", encoding="utf-8" + ) as outfile: + outfile.write(reproducer_file) + + CryticPrint().print_success(f"Medusa corpus was saved to {output_dir}!") + case _: + pass + else: + parser.print_help() if __name__ == "__main__": diff --git a/fuzz_utils/utils/encoding.py b/fuzz_utils/utils/encoding.py index c81f3d5..106ec91 100644 --- a/fuzz_utils/utils/encoding.py +++ b/fuzz_utils/utils/encoding.py @@ -48,18 +48,27 @@ def parse_echidna_byte_string(s: str) -> str: for key, value in ascii_escape_map.items(): s = s.replace(key, value) - # Handle octal escapes (like \\135) - def octal_to_byte(match: re.Match) -> str: - octal_value = match.group(0)[1:] # Remove the backslash + # Handle unicode escapes (like \\135) + def unicode_to_byte(match: re.Match) -> str: + unicode_value = match.group(0)[1:] # Remove the backslash - return chr(int(octal_value, 8)) + return chr(int(unicode_value, 10)) - s = re.sub(r"\\[0-3]?[0-7][0-7]", octal_to_byte, s) + s = re.sub(r"\\[0-3]?[0-9][0-9]", unicode_to_byte, s) # Convert to bytes and then to hexadecimal - return s.encode().hex() + return s.encode("latin-1").hex() def parse_medusa_byte_string(s: str) -> str: """Decode bytes* or string type from Medusa format to Solidity hex literal""" return s.encode("utf-8").hex() + + +def strip_hex_leading_zero(hex_str: str) -> str: + """Strips the leading zeroes from a hex string""" + stripped = hex_str[2:].lstrip("0") + if len(stripped) == 0: + return "0x0" + + return "0x" + stripped diff --git a/pyproject.toml b/pyproject.toml index 35ed648..a4f4629 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -14,7 +14,9 @@ requires-python = ">=3.10" dependencies = [ "colorama>=0.4.0", "slither_analyzer>=0.10.0", - "jinja2>=3.1.0" + "jinja2>=3.1.0", + "web3>=6.0.0", + "eth_tester>=v0.10.0-b.1" ] [project.optional-dependencies] diff --git a/tests/conftest.py b/tests/conftest.py index e855b09..bb989fc 100644 --- a/tests/conftest.py +++ b/tests/conftest.py @@ -25,11 +25,11 @@ def __init__(self, target: str, target_path: str, corpus_dir: str): def echidna_generate_tests(self) -> None: """Runs the fuzz-utils tool for an Echidna corpus""" - self.echidna_generator.create_poc() + self.echidna_generator.create_poc(False) def medusa_generate_tests(self) -> None: """Runs the fuzz-utils tool for a Medusa corpus""" - self.medusa_generator.create_poc() + self.medusa_generator.create_poc(False) @pytest.fixture(autouse=True) # type: ignore[misc]