diff --git a/docs/source/install.rst b/docs/source/install.rst index 7911e107..61ecc891 100644 --- a/docs/source/install.rst +++ b/docs/source/install.rst @@ -29,10 +29,11 @@ Once you see the following msg, then you're all set:: -d, --detail TEXT Show detail report. Optionally specify the name of a rule/label -o, --output FILE Output report in JSON + -w, --webreport FILE Generate web report -a, --apk FILE APK file [required] -r, --rule PATH Rules directory [default: - /Users/$USER/.quark-engine/quark-rules] - -g, --graph Create call graph to call_graph_image + /home/$USER/.quark-engine/quark-rules] + -g, --graph [png|json] Create call graph to call_graph_image directory -c, --classification Show rules classification -t, --threshold [100|80|60|40|20] @@ -46,10 +47,10 @@ Once you see the following msg, then you're all set:: --core-library [androguard|rizin] Specify the core library used to analyze an APK - --multi-process INTEGER RANGE Allow analyzing APK with N processes, - where N doesn't exceeds the number of usable CPUs - 1 - to avoid memory exhaustion. - --version Show the version and exit. + --multi-process INTEGER RANGE Allow analyzing APK with N processes, where + N doesn't exceeds the number of usable CPUs + - 1 to avoid memory exhaustion. [x>=1] + --version Show the version and exit. --help Show this message and exit. To learn how to scan multiple samples in a directory, please have a look at :ref:`Directory Scanning ` diff --git a/docs/source/testing.rst b/docs/source/testing.rst index cd396867..546b0263 100644 --- a/docs/source/testing.rst +++ b/docs/source/testing.rst @@ -15,7 +15,6 @@ Then we could test one of the apk in `apk-malware-samples` by the rules `quark-r $ quark -a Ahmyth.apk -s - Running in Docker ================= @@ -36,3 +35,18 @@ You may also interactively use quark in the docker container. For example: (in-docker): /app/quark# cd /tmp (in-docker)::/tmp# quark -a Ahmyth.apk -s +Running analyses based on Rizin (Upcoming unstable feature) +=========================================================== + +Now Quark also supports `Rizin`_ as one of our Android analysis frameworks. You can use option ``--core-library`` with ``rizin`` to enable the Rizin-based analysis library. + +.. _`Rizin`: https://github.com/rizinorg/rizin + +.. code-block:: bash + + quark -a Ahmyth.apk -s --core-library rizin + + +For now, Quark is compatible with Rizin v0.3.4. But, users don't have to installed a Rizin with that version. Quark provides a feature to automatically setup a independent Rizin. In this way, the dependency will not conflict with your environment. Type in the above command and let Quark handle everything else for you. + +If there is a working installation of Rizin installed in the system, Quark will check its version to determine if it is compatible. If true, Quark automatically uses it for the analysis. Otherwise, Quark uses the independent Rizin installed in the Quark directory (``~/.quark-engine``). diff --git a/quark/cli.py b/quark/cli.py index d596000b..269dd2a4 100644 --- a/quark/cli.py +++ b/quark/cli.py @@ -18,6 +18,7 @@ from quark.utils.colors import yellow from quark.utils.graph import select_label_menu, show_comparison_graph from quark.utils.pprint import print_info, print_success, print_warning +from quark.utils.tools import find_rizin from quark.utils.weight import Weight from quark.webreport.generate import ReportGenerator @@ -141,8 +142,8 @@ "--multi-process", "num_of_process", type=click.IntRange(min=1), - help="Allow analyzing APK with N processes, where N doesn't exceeds" + - " the number of usable CPUs - 1 to avoid memory exhaustion.", + help="Allow analyzing APK with N processes, where N doesn't exceeds" + + " the number of usable CPUs - 1 to avoid memory exhaustion.", required=False, default=1, ) @@ -169,6 +170,11 @@ def entry_point( rule_buffer_list = [] rule_filter = summary or detail + if core_library.lower() == "rizin": + rizin_path = find_rizin() + else: + rizin_path = "" + # Determine the location of rules if rule_filter and rule_filter.endswith("json"): if not os.path.isfile(rule_filter): @@ -224,9 +230,14 @@ def entry_point( malware_confidences = {} for apk_ in apk: data = ( - ParallelQuark(apk_, core_library, num_of_process) + ParallelQuark( + apk_, + core_library, + num_of_process, + rizin_path, + ) if num_of_process > 1 - else Quark(apk_, core_library) + else Quark(apk_, core_library, rizin_path) ) all_labels = {} # dictionary containing @@ -281,9 +292,9 @@ def entry_point( # Load APK data = ( - ParallelQuark(apk[0], core_library, num_of_process) + ParallelQuark(apk[0], core_library, num_of_process, rizin_path) if num_of_process > 1 - else Quark(apk[0], core_library) + else Quark(apk[0], core_library, rizin_path) ) if label: @@ -410,7 +421,8 @@ def entry_point( json_report = data.get_json_report() report_html = ReportGenerator( - json_report).get_analysis_report_html() + json_report + ).get_analysis_report_html() if ".html" not in webreport: webreport = f"{webreport}.html" diff --git a/quark/config.py b/quark/config.py index a2481915..6bafd2de 100644 --- a/quark/config.py +++ b/quark/config.py @@ -9,5 +9,8 @@ DIR_PATH = f"{HOME_DIR}quark-rules" DEBUG = False +COMPATIBLE_RAZIN_VERSIONS = ["v0.3.4"] + +RIZIN_DIR = f"{HOME_DIR}rizin/" Path(HOME_DIR).mkdir(parents=True, exist_ok=True) diff --git a/quark/core/axmlreader/__init__.py b/quark/core/axmlreader/__init__.py index b040951d..6141415f 100644 --- a/quark/core/axmlreader/__init__.py +++ b/quark/core/axmlreader/__init__.py @@ -1,8 +1,8 @@ import enum import functools import os.path -import pkg_resources +import pkg_resources import rzpipe # Resource Types Definition @@ -71,7 +71,19 @@ class AxmlReader(object): A Class that parses the Android XML file """ - def __init__(self, file_path, structure_path=None): + def __init__(self, file_path, structure_path=None, rizin_path=None): + """ + Create an AxmlReader object to parse the given Android XML file like + AndroidManifest.xml. + + :param file_path: a file in the Android XML format + :param structure_path: a plain text file defining the structures of + the Android XML format. Defaults to None + :param rizin_path: a PathLike object to specify a Rizin executable to + use. Defaults to None + :raises AxmlException: if the given file is an invalid Android XML + file + """ if structure_path is None: structure_path = pkg_resources.resource_filename( "quark.core.axmlreader", "axml_definition" @@ -83,7 +95,7 @@ def __init__(self, file_path, structure_path=None): f" of Rizin in {structure_path}" ) - self._rz = rzpipe.open(file_path) + self._rz = rzpipe.open(file_path, rizin_home=rizin_path) self._rz.cmd(f"pfo {structure_path}") self._file_size = int(self._rz.cmd("i~size[1]"), 16) diff --git a/quark/core/parallelquark.py b/quark/core/parallelquark.py index 57247232..bf391ed3 100644 --- a/quark/core/parallelquark.py +++ b/quark/core/parallelquark.py @@ -2,8 +2,8 @@ # This file is part of Quark-Engine - https://github.com/quark-engine/quark-engine # See the file 'LICENSE' for copying permission. -from multiprocessing.pool import Pool from multiprocessing import cpu_count +from multiprocessing.pool import Pool from quark.core.analysis import QuarkAnalysis from quark.core.quark import Quark @@ -13,12 +13,33 @@ class ParallelQuark(Quark): @staticmethod - def _worker_initializer(apk, core_library): + def _worker_initializer(apk, core_library, rizin_path): + """ + An initializer that creates multiple Quark object for a subprocess + pool. + + :param apk: an APK for Quark to analyze + :param core_library: a string indicating which analysis library Quark + should use + :param rizin_path: a PathLike object to specify a Rizin executable for + the Rizin-based analysis library + """ + global _quark - _quark = Quark(apk, core_library) + _quark = Quark(apk, core_library, rizin_path) @staticmethod def _worker_analysis(rule_obj): + """ + A function for the subprocesses in the pool to analyze the target APK + with the specified rule. + + :param rule_obj: a Quark rule that the subprocess will analyze + :return: a tuple of the analysis result, including the reached stage, + the parent functions, the detected behavior (rule), and the + corresponding bytecodes. + """ + _quark.quark_analysis = QuarkAnalysis() _quark.run(rule_obj) @@ -32,7 +53,8 @@ def to_raw_method(methodobject): reached_stage = rule_obj.check_item.count(True) level_4_result = tuple( - to_raw_method(method) for method in _quark.quark_analysis.level_4_result + to_raw_method(method) + for method in _quark.quark_analysis.level_4_result ) behavior_list = [ ( @@ -51,6 +73,13 @@ def to_raw_method(methodobject): ) def _apply_analysis_result(self, rule_obj): + """ + Append the result returned by the subprocesses into the QuarkAnalysis + object. + + :param rule_obj: a Quark rule specifying which result this method + should append + """ async_result = self._result_map[id(rule_obj)] result = async_result.get() @@ -112,21 +141,43 @@ def _apply_analysis_result(self, rule_obj): ] ) - def __init__(self, apk, core_library, num_of_process=1): + def __init__(self, apk, core_library, num_of_process=1, rizin_path=None): + """ + Create a ParallelQuark object to run the Quark analysis parallelly. + + :param apk: an APK for Quark to analyze + :param core_library: a string indicating which analysis library Quark + should use + :param num_of_process: a value indicating the maximal number of + available processes for the analysis. Defaults to 1 + :param rizin_path: a PathLike object to specify a Rizin executable for + the Rizin-based analysis library. Default to None + """ self._result_map = {} self._pool = Pool( - min(num_of_process, cpu_count() - 1), self._worker_initializer, - (apk, core_library) + min(num_of_process, cpu_count() - 1), + self._worker_initializer, + (apk, core_library, rizin_path), ) - super().__init__(apk, core_library) + super().__init__(apk, core_library, rizin_path=rizin_path) def apply_rules(self, rule_obj_list): + """ + Add Quark rules to this pool. + + :param rule_obj_list: a list of Quark rules + """ for rule_obj in rule_obj_list: result = self._pool.apply_async(self._worker_analysis, (rule_obj,)) self._result_map[id(rule_obj)] = result def run(self, rule_obj): + """ + Wait for all rules to be analyzed. + + :param rule_obj: _description_ + """ self._apply_analysis_result(rule_obj) def close(self): diff --git a/quark/core/quark.py b/quark/core/quark.py index 455240e2..b0f2fa66 100644 --- a/quark/core/quark.py +++ b/quark/core/quark.py @@ -41,14 +41,20 @@ class Quark: """Quark module is used to check quark's five-stage theory""" - def __init__(self, apk, core_library="androguard"): + def __init__(self, apk, core_library="androguard", rizin_path=None): """ - - :param apk: the filename of the apk. + Create a Quark object. + + :param apk: an APK for Quark to analyze + :param core_library: a string indicating which analysis library Quark + should use. Defaults to "androguard" + :param rizin_path: a PathLike object to specify a Rizin executable for + the Rizin-based analysis library. Defaults to None + :raises ValueError: if an unknown core library is specified """ core_library = core_library.lower() if core_library == "rizin": - self.apkinfo = RizinImp(apk) + self.apkinfo = RizinImp(apk, rizin_path=rizin_path) elif core_library == "androguard": self.apkinfo = AndroguardImp(apk) else: diff --git a/quark/core/rzapkinfo.py b/quark/core/rzapkinfo.py index 446ab39a..d14f168d 100644 --- a/quark/core/rzapkinfo.py +++ b/quark/core/rzapkinfo.py @@ -18,7 +18,8 @@ from quark.core.interface.baseapkinfo import BaseApkinfo from quark.core.struct.bytecodeobject import BytecodeObject from quark.core.struct.methodobject import MethodObject -from quark.utils.tools import descriptor_to_androguard_format, remove_dup_list +from quark.utils.tools import (descriptor_to_androguard_format, + _get_rizin_version, remove_dup_list) RizinCache = namedtuple("rizin_cache", "address dexindex is_imported") @@ -51,9 +52,21 @@ def __init__( self, apk_filepath: Union[str, PathLike], tmp_dir: Union[str, PathLike] = None, + rizin_path: PathLike = None, ): super().__init__(apk_filepath, "rizin") + if rizin_path: + if not _get_rizin_version(rizin_path): + raise ValueError( + f"The file in {rizin_path} is not a valid Rizin" + + " executable." + ) + + rizin_path = os.path.dirname(rizin_path) + + self.rizin_path = rizin_path + if self.ret_type == "DEX": self._tmp_dir = None self._dex_list = [apk_filepath] @@ -84,7 +97,7 @@ def __init__( @functools.lru_cache def _get_rz(self, index): - rz = rzpipe.open(self._dex_list[index]) + rz = rzpipe.open(self._dex_list[index], rizin_home=self.rizin_path) rz.cmd("aa") return rz @@ -227,7 +240,7 @@ def _get_methods_classified(self, dexindex): @functools.cached_property def permissions(self) -> List[str]: - axml = AxmlReader(self._manifest) + axml = AxmlReader(self._manifest, rizin_path=self.rizin_path) permission_list = set() for tag in axml: diff --git a/quark/report.py b/quark/report.py index 2b6f94b4..cafa9c88 100644 --- a/quark/report.py +++ b/quark/report.py @@ -6,6 +6,11 @@ from quark.core.quark import Quark from quark.core.struct.ruleobject import RuleObject +from quark.utils.tools import ( + find_rizin_in_PATH, + find_rizin_in_configuration_folder, +) +from quark.config import COMPATIBLE_RAZIN_VERSIONS class Report: @@ -13,20 +18,48 @@ class Report: This module is for users who want to use quark as a Python module. """ - def __init__(self): - self.quark = None + def __init__(self, rizin_path=None, disable_rizin_installation=False): + """ + Create a Report object. - def analysis(self, apk, rule, core_library="androguard"): + :param rizin_path: a PathLike object to specify a Rizin executable for + the Rizin-based analysis library. Defaults to None + :param disable_rizin_installation: a flag to disable the automatic + installation of Rizin. Defaults to False. Defaults to False """ - The main function of Quark-Engine analysis, the analysis is based on the provided APK file. + self.quark = None + self.rizin_path = rizin_path + self.disable_rizin_installation = disable_rizin_installation - :param core_library: the library to analysis binary - :param apk: the APK file - :param rule: the rule to be checked, it could be a directory or a single json rule + def analysis(self, apk, rule, core_library="androguard", rizin_path=None): + """ + The main function of Quark-Engine analysis, the analysis is based on + the provided APK file. + + :param apk: an APK for Quark to analyze + :param rule: a Quark rule that will be used in the analysis. It could + be a directory or a Quark rule + :param core_library: a string indicating which analysis library Quark + should use. Defaults to "androguard" + :param rizin_path: a PathLike object to specify a Rizin executable for + the Rizin-based analysis library. Defaults to None :return: None """ - self.quark = Quark(apk, core_library) + if core_library.lower() == "rizin": + if rizin_path: + self.rizin_path = rizin_path + elif not self.rizin_path: + self.rizin_path = find_rizin_in_PATH(COMPATIBLE_RAZIN_VERSIONS) + if not self.rizin_path: + self.rizin_path = find_rizin_in_configuration_folder( + COMPATIBLE_RAZIN_VERSIONS + ) + + if not self.rizin_path: + raise ValueError("Cannot found a valid Rizin executable.") + + self.quark = Quark(apk, core_library, self.rizin_path) if os.path.isdir(rule): diff --git a/quark/utils/pprint.py b/quark/utils/pprint.py index 81197143..7fe8ee58 100644 --- a/quark/utils/pprint.py +++ b/quark/utils/pprint.py @@ -3,8 +3,14 @@ # See the file 'LICENSE' for copying permission. from prettytable import PrettyTable +from quark.utils.colors import bold, cyan, green, red, yellow -from quark.utils.colors import bold, cyan, yellow, red, green + +def clear_the_last_line(): + """ + Clear the last line of the terminal. + """ + print("\033[A\033[A") def print_info(message): diff --git a/quark/utils/tools.py b/quark/utils/tools.py index 62b1833c..0c782de8 100644 --- a/quark/utils/tools.py +++ b/quark/utils/tools.py @@ -3,7 +3,23 @@ # See the file 'LICENSE' for copying permission. import copy +import os.path import re +import shutil +from os import F_OK, PathLike, access, mkdir +from subprocess import ( # nosec + PIPE, + STDOUT, + CalledProcessError, + Popen, + check_output, +) +from typing import List, Tuple +from xmlrpc.client import Boolean + +from click import confirm, prompt +from quark.config import COMPATIBLE_RAZIN_VERSIONS, RIZIN_DIR +from quark.utils.pprint import clear_the_last_line, print_error, print_info def remove_dup_list(element): @@ -79,8 +95,10 @@ def filter_api_by_usage_count(data, api_pool, percentile_rank=0.2): statistic_result[str(api)] = api_called_count str_statistic_result[str(api)] = api - sorted_key = {k: v for k, v in sorted( - statistic_result.items(), key=lambda item: item[1])} + sorted_key = { + k: v + for k, v in sorted(statistic_result.items(), key=lambda item: item[1]) + } sorted_result = {k: v for k, v in sorted(sorted_key.items())} threshold = len(api_pool) * percentile_rank @@ -94,3 +112,274 @@ def filter_api_by_usage_count(data, api_pool, percentile_rank=0.2): S_set.append(str_statistic_result[api]) return P_set, S_set + + +def _execute_command(command, stderr=PIPE, cwd=None): + """ + Execute a given command and yield the messages from the standard output. + + :param command: a list of strings which is the command to execute + :param cwd: a PathLike object which is the working directory. Defaults to + None + :raises subprocess.CalledProcessError: if the process terminates with a + non-zero return code + :yield: a string holding a line of message in the standard output + """ + process = Popen( # nosec + command, + bufsize=1, + stdout=PIPE, + stderr=stderr, + universal_newlines=True, + cwd=cwd, + ) + + line = "" + while True: + char = process.stdout.read(1) + if char == "\n" or char == "\r": + clear_the_last_line() + yield line + line = "" + continue + + elif char == "": + break + + line = line + char + + process.stdout.close() + return_code = process.wait() + + if return_code: + error_messages = "" + if stderr == PIPE: + for message in process.stderr.readlines(): + error_messages = error_messages + message + + raise CalledProcessError(return_code, command, stderr=error_messages) + + if stderr == PIPE: + process.stderr.close() + + +def _get_rizin_version(executable_path) -> str: + """ + Get the version number of the Rizin instance in the path. + + :param rizin_path: a path to the Rizin executable + :return: the version number of the Rizin instance + """ + try: + result = check_output([executable_path, "-v"], timeout=5) # nosec + result = str(result) + + matched_versions = re.finditer( + r"[0-9]+\.[0-9]+\.[0-9]+", result[: result.index("@")] + ) + first_matched = next(matched_versions, None) + + if first_matched: + return "v" + first_matched.group(0) + else: + return None + + except CalledProcessError: + return None + + except OSError: + return None + + +def download_rizin(target_path) -> Boolean: + """ + Download the source code of Rizin into the specified path. If a file or + folder already exists, this function will remove them. + + :param target_path: a PathLike object specifying the location to save the + downloaded files + :return: a boolean indicating if the operation finishes without errors + """ + if access(target_path, F_OK): + shutil.rmtree(target_path) + mkdir(target_path) + + try: + print() + + for line in _execute_command( + [ + "git", + "clone", + "--progress", + "https://github.com/rizinorg/rizin", + target_path, + ], + stderr=STDOUT, + ): + print_info(line) + + return True + + except CalledProcessError: + print_error("An error occurred when downloading Rizin.\n") + + except OSError: + print_error("An error occurred when downloading Rizin.\n") + + return False + + +def update_rizin(source_path, tag) -> bool: + """ + Checkout the specified commit in the Rizin repository. Then, compile the + source code to build a Rizin executable. + + :param source_path: a PathLike object specifying the location to the + source code + :param target_commit: a hash value representing a valid commit in the + repository + :return: a boolean indicating the operation is success or not + """ + + def _print_error(error: CalledProcessError): + error_output = error.stderr + if isinstance(error_output, (bytes, bytearray)): + error_output = error_output.decode() + + for line in error_output.splitlines(): + print_error(line) + + try: + print() + + # Checkout to target commit + for line in _execute_command( + ["git", "checkout", tag], cwd=source_path + ): + print_info(line) + + # Remove the last build + for line in _execute_command(["rm", "-rf", "build"], cwd=source_path): + print_info(line) + + # Clean out old subprojects + for line in _execute_command( + ["git", "clean", "-dxff", "subprojects/"], cwd=source_path + ): + print_info(line) + + except CalledProcessError as error: + _print_error(error) + return False + + except OSError as error: + print_error("An error occurred when updating Rizin.\n") + print_error(error) + return False + + # Compile Rizin + try: + print() + + # Configure + for line in _execute_command( + ["meson", "--buildtype=release", "build"], cwd=source_path + ): + print_info(line) + + # Compile the source code + for line in _execute_command( + ["meson", "compile", "-C", "build"], cwd=source_path + ): + print_info(line) + + return True + + except CalledProcessError as error: + _print_error(error) + + except OSError as error: + print_error("an error occurred when building rizin.\n") + print_error(error) + + return False + + +def find_rizin_in_PATH(compatible_versions: List[str]) -> PathLike: + """Search the system variable, PATH, to find an appropriate Rizin + executable. + + :param compatible_versions: python list containing compatible Rizin + versions + :return: path to the Rizin executable + """ + executable_path = shutil.which("rizin") + if executable_path: + version = _get_rizin_version(executable_path) + if version in compatible_versions: + return executable_path + + +def find_rizin_in_configuration_folder( + compatible_versions: List[str], +) -> Tuple[str, str]: + """Search the configuration folder of Quark (~/.quark-engine) to find + an appropriate Rizin executable. + + :param compatible_versions: python list containing compatible Rizin + versions + :return: path to the Rizin executable + """ + executable_path = RIZIN_DIR + "build/binrz/rizin/rizin" + if os.path.exists(executable_path): + version = _get_rizin_version(executable_path) + if version in compatible_versions: + return executable_path, "ready" + else: + return executable_path, "outdated" + + return None, "Not found" + + +def find_rizin() -> PathLike: + """ + Search the system PATH and the configuration folder of Quark + (~/.quark-engine) to find an appropriate Rizin executable. If none of them + are usable, this method will ask users to specify one. + + :return: path to an Rizin executable + """ + + compatible_versions = COMPATIBLE_RAZIN_VERSIONS + recommend_version = compatible_versions[0] + + # Search Rizin in Path + executable_path = find_rizin_in_PATH(compatible_versions) + if executable_path: + return executable_path + + # Otherwise, search the configuration folder of Quark + executable_path, state = find_rizin_in_configuration_folder( + compatible_versions + ) + if executable_path: + if state == "outdated": + update_rizin(RIZIN_DIR, recommend_version) + return executable_path + elif state == "ready": + return executable_path + + # Ask if the user is willing to install Rizin + install_rizin = confirm( + f"Do you want to install Rizin {recommend_version}?", show_default=True + ) + if install_rizin: + # download_rizin(RIZIN_DIR) + update_rizin(RIZIN_DIR, recommend_version) + + return os.path.join(RIZIN_DIR, "build", "binrz", "rizin", "rizin") + + # Otherwise, ask for the path to a Rizin executable + executable_path = prompt("Please specify a path to the Rizin executable") + return executable_path diff --git a/tests/test_report.py b/tests/test_report.py index ec56efe8..baeb5981 100644 --- a/tests/test_report.py +++ b/tests/test_report.py @@ -1,11 +1,11 @@ import os import os.path +import shutil import zipfile from unittest.mock import patch import pytest import requests - from quark.report import Report @@ -19,7 +19,8 @@ def invalid_file(tempfile): @pytest.fixture(scope="module") def sample_apk_file(): APK_SOURCE = ( - "https://github.com/quark-engine/" "apk-malware-samples/raw/master/Ahmyth.apk" + "https://github.com/quark-engine/" + "apk-malware-samples/raw/master/Ahmyth.apk" ) APK_NAME = "Ahmyth.apk" @@ -130,6 +131,70 @@ def test_analysis_with_rule_directory( assert mock_run.call_count == num_of_rules assert mock_generate_report.call_count == num_of_rules + @staticmethod + def test_analysis_without_specified_rizin_path( + sample_apk_file, sample_rule_directory + ): + expected_path = shutil.which("rizin") + + with patch("quark.core.quark.Quark.run") as mock_run: + with patch( + "quark.core.quark.Quark.generate_json_report" + ) as mock_generate_report: + sample_report = Report() + sample_report.analysis( + sample_apk_file, + sample_rule_directory, + core_library="rizin", + ) + + assert sample_report.rizin_path == expected_path + + mock_run.assert_called_once() + mock_generate_report.assert_called_once() + + @staticmethod + def test_analysis_with_specified_rizin_path( + sample_apk_file, sample_rule_directory + ): + rizin_path = shutil.which("rizin") + + with patch("quark.core.quark.Quark.run") as mock_run: + with patch( + "quark.core.quark.Quark.generate_json_report" + ) as mock_generate_report: + with patch( + "quark.utils.tools.find_rizin_in_PATH" + ) as mock_find_rizin: + + sample_report = Report() + sample_report.analysis( + sample_apk_file, + sample_rule_directory, + core_library="rizin", + rizin_path=rizin_path, + ) + + assert sample_report.rizin_path == rizin_path + + mock_find_rizin.assert_not_called() + mock_run.assert_called_once() + mock_generate_report.assert_called_once() + + @staticmethod + def test_analysis_with_invalid_rizin_path( + sample_report, sample_apk_file, sample_rule_directory + ): + invalid_path = "INVALID_PATH" + + with pytest.raises(ValueError): + sample_report.analysis( + sample_apk_file, + sample_rule_directory, + core_library="rizin", + rizin_path=invalid_path, + ) + def test_get_report_with_invalid_type(self, sample_report): with pytest.raises(ValueError): sample_report.get_report(None) diff --git a/tests/utils/test_tools.py b/tests/utils/test_tools.py index 656089bc..baee7031 100644 --- a/tests/utils/test_tools.py +++ b/tests/utils/test_tools.py @@ -1,12 +1,59 @@ -import pytest +import os +import re +import shutil +from subprocess import ( # nosec B404 + PIPE, + CalledProcessError, + check_output, + run, +) +from unittest.mock import patch +import pytest +from quark import config from quark.utils.tools import ( + _get_rizin_version, contains, descriptor_to_androguard_format, + download_rizin, + find_rizin_in_configuration_folder, + find_rizin_in_PATH, remove_dup_list, + update_rizin, ) +@pytest.fixture(scope="module") +def rizin_in_system_path(): + path = shutil.which("rizin") + assert path + + return path + + +@pytest.fixture(scope="module") +def version_of_rizin_installed_on_system(): + rizin_in_system_path = shutil.which("rizin") + try: + process = run( # nosec + [rizin_in_system_path, "-v"], timeout=5, check=True, stdout=PIPE + ) + result = str(process.stdout) + + matched_versions = re.finditer( + r"[0-9]+\.[0-9]+\.[0-9]+", result[: result.index("@")] + ) + first_matched = next(matched_versions, None) + + assert first_matched + + return "v" + first_matched.group(0) + except TimeoutError: + assert False + except CalledProcessError: + assert False + + def test_remove_dup_list_with_invalid_arg(): with pytest.raises(TypeError): remove_dup_list(123) @@ -112,3 +159,129 @@ def test_descriptor_to_androguard_format_with_combination(): result = descriptor_to_androguard_format(descriptor) assert result == "(I Ljava/lang/String; [B J)" + + +def test_get_rizin_version_with_valid_path( + rizin_in_system_path, version_of_rizin_installed_on_system +): + expected_version = version_of_rizin_installed_on_system + + found_version = _get_rizin_version(rizin_in_system_path) + + assert found_version == expected_version + + +def test_get_rizin_version_with_invalid_path(tmp_path): + assert not _get_rizin_version(tmp_path) + + +def test_download_rizin_successfully(tmp_path): + target_path = tmp_path / "rizin" + + with patch("quark.utils.tools._execute_command") as mock: + download_rizin(target_path) + mock.assert_called_once() + + +def test_fail_to_download_rizin_due_to_unavailable_network(tmp_path): + target_path = tmp_path / "rizin" + + with patch("quark.utils.tools._execute_command") as mock: + mock.side_effect = CalledProcessError( + "1", + "mock command", + stderr="fatal: unable to access " + + "'https://github.com/rizinorg/rizin/'.", + ) + + assert not download_rizin(target_path) + + +def test_fail_to_download_rizin_due_to_unknown_errors(tmp_path): + target_path = tmp_path / "rizin" + + with patch("quark.utils.tools._execute_command") as mock: + mock.side_effect = CalledProcessError("1", "mock command", stderr=b"") + + assert not download_rizin(target_path) + + +def test_update_rizin(tmp_path): + target_path = tmp_path / "rizin" + target_version_tag = config.COMPATIBLE_RAZIN_VERSIONS[0] + + download_rizin(target_path) + + update_rizin(target_path, target_version_tag) + current_tag = ( + check_output( # nosec + ["git", "describe", "--tags"], + cwd=target_path, + ) + .decode() + .strip() + ) + + assert current_tag == target_version_tag + assert os.access( + target_path / "build" / "binrz" / "rizin" / "rizin", os.F_OK | os.X_OK + ) + + +def test_fail_to_update_rizin_due_to_any_errors(tmp_path): + target_path = tmp_path / "rizin" + target_version_tag = config.COMPATIBLE_RAZIN_VERSIONS[0] + + with patch("subprocess.Popen") as mock: + mock.side_effect = CalledProcessError( + "1", "mock command", stderr=b"Error message" + ) + + assert not update_rizin(target_path, target_version_tag) + + +def test_find_rizin_in_path(rizin_in_system_path): + rizin_path = find_rizin_in_PATH(config.COMPATIBLE_RAZIN_VERSIONS) + assert rizin_path == rizin_in_system_path + + +def test_find_rizin_in_configuration_folder(): + expected_executable_path = config.RIZIN_DIR + "build/binrz/rizin/rizin" + + with patch("os.path.exists") as mocked_exists: + mocked_exists.return_value = True + + with patch( + "quark.utils.tools._get_rizin_version" + ) as mocked_get_version: + mocked_get_version.return_value = config.COMPATIBLE_RAZIN_VERSIONS[ + 0 + ] + + executable_path, state = find_rizin_in_configuration_folder( + config.COMPATIBLE_RAZIN_VERSIONS + ) + + assert executable_path == expected_executable_path + assert state == "ready" + mocked_get_version.assert_called_once_with(executable_path) + + +def test_find_outdated_rizin_in_configuration_directory(): + expected_executable_path = config.RIZIN_DIR + "build/binrz/rizin/rizin" + + with patch("os.path.exists") as mocked_exists: + mocked_exists.return_value = True + + with patch( + "quark.utils.tools._get_rizin_version" + ) as mocked_get_version: + mocked_get_version.return_value = "Outdated or broken version" + + executable_path, state = find_rizin_in_configuration_folder( + config.COMPATIBLE_RAZIN_VERSIONS + ) + + assert executable_path == expected_executable_path + assert state == "outdated" + mocked_get_version.assert_called_once_with(executable_path)