diff --git a/.github/workflows/pytest.yml b/.github/workflows/pytest.yml index a2fc5a73..e8e6b0dc 100644 --- a/.github/workflows/pytest.yml +++ b/.github/workflows/pytest.yml @@ -26,7 +26,7 @@ jobs: - name: Install dependencies run: | python -m pip install --upgrade pip - python -m pip install pytest rzpipe meson==0.62.0 ninja coverage ciphey frida objection + python -m pip install pytest rzpipe meson==0.62.0 ninja coverage ciphey frida objection r2pipe==1.8.0 # Install graphviz & ninja sudo apt-get -y install graphviz ninja-build @@ -39,6 +39,14 @@ jobs: sudo ninja -C build install sudo ldconfig -v cd - + + # Install Radare2 (5.8.8) + sudo apt install -y musl-tools + sudo git clone https://github.com/radareorg/radare2 /opt/radare2/ + cd /opt/radare2/ + sudo git checkout 5.8.8 + sudo sys/install.sh + cd - # Install click >= 8.0.0 for CLI supports python -m pip install click==8.0.3 diff --git a/Pipfile b/Pipfile index 6b90bf93..eed1bf14 100644 --- a/Pipfile +++ b/Pipfile @@ -25,6 +25,7 @@ rzpipe = "<=0.1.2" objection = "<=1.11.0" frida = "<=15.2.2" ciphey = ">=5.0.0,<=5.14.0" +r2pipe = "==1.8.0" [requires] python_version = "3.8" diff --git a/README.md b/README.md index e26bf401..3480441d 100644 --- a/README.md +++ b/README.md @@ -61,6 +61,7 @@ * [CWE-088](https://quark-engine.readthedocs.io/en/latest/quark_script.html#detect-cwe-88-in-android-application-vuldroid-apk) Improper Neutralization of Argument Delimiters in a Command * [CWE-089](https://quark-engine.readthedocs.io/en/latest/quark_script.html#detect-cwe-89-in-android-application-androgoat-apk) Improper Neutralization of Special Elements used in an SQL Command ('SQL Injection') * [CWE-094](https://quark-engine.readthedocs.io/en/latest/quark_script.html#detect-cwe-94-in-android-application-ovaa-apk) Improper Control of Generation of Code ('Code Injection') +* [CWE-117](https://quark-engine.readthedocs.io/en/latest/quark_script.html#detect-cwe-117-in-android-application-allsafe-apk) Improper Output Neutralization for Logs * [CWE-295](https://quark-engine.readthedocs.io/en/latest/quark_script.html#detect-cwe-295-in-android-application-insecureshop-apk) Improper Certificate Validation * [CWE-312](https://quark-engine.readthedocs.io/en/latest/quark_script.html#detect-cwe-312-in-android-application-ovaa-apk) Cleartext Storage of Sensitive Information * [CWE-319](https://quark-engine.readthedocs.io/en/latest/quark_script.html#detect-cwe-319-in-android-application-ovaa-apk) Cleartext Transmission of Sensitive Information @@ -74,7 +75,8 @@ * [CWE-798](https://quark-engine.readthedocs.io/en/latest/quark_script.html#detect-cwe-798-in-android-application-ovaa-apk) Use of Hard-coded Credentials * [CWE-921](https://quark-engine.readthedocs.io/en/latest/quark_script.html#detect-cwe-921-in-android-application-ovaa-apk) Storage of Sensitive Data in a Mechanism without Access Control * [CWE-925](https://quark-engine.readthedocs.io/en/latest/quark_script.html#detect-cwe-925-in-android-application-insecurebankv2-androgoat) Improper Verification of Intent by Broadcast Receiver -* [CWE-926](https://quark-engine.readthedocs.io/en/latest/quark_script.html#detect-cwe-926-in-android-application-dvba-apk) Improper Export of Android Application Components +* [CWE-926](https://quark-engine.readthedocs.io/en/latest/quark_script.html#detect-cwe-926-in-android-application-dvba-apk) Improper Export of Android Application Components +* [CWE-940](https://quark-engine.readthedocs.io/en/latest/quark_script.html#detect-cwe-940-in-android-application-ovaa-vuldroid) Improper Verification of Source of a Communication Channel # Quick Start diff --git a/docs/source/index.rst b/docs/source/index.rst index a652d7f8..795ec13c 100644 --- a/docs/source/index.rst +++ b/docs/source/index.rst @@ -20,6 +20,7 @@ This guide will explain how to set up Quark, use it, and customize it. install_index quark_script + visual_quark_script_program quark_mit_program quark_reports addRules diff --git a/docs/source/quark_script.rst b/docs/source/quark_script.rst index b1afca74..df17d075 100644 --- a/docs/source/quark_script.rst +++ b/docs/source/quark_script.rst @@ -2081,3 +2081,158 @@ Quark Script Result $ python3 CWE-78.py CWE-78 is detected in method, Lcom/vuldroid/application/RootDetection; onCreate (Landroid/os/Bundle;)V + + +Detect CWE-117 in Android Application (allsafe.apk) +------------------------------------------------------ +This scenario seeks to find **Improper Output Neutralization for Logs**. See `CWE-117 `_ for more details. + +Let’s use this `APK `_ and the above APIs to show how the Quark script finds this vulnerability. + +First, we design a detection rule ``writeContentToLog.json`` to spot on behavior using the method that writes contents to the log file. + +Then, we use ``behaviorInstance.getParamValues()`` to get all parameter values of this method. And we check if these parameters contain keywords of APIs for neutralization, such as escape, replace, format, and setFilter. + +If the answer is **YES**, that may result in secret context leakage into the log file, or the attacker may perform log forging attacks. + +Quark Script CWE-117.py +========================== + +.. code-block:: python + + from quark.script import Rule, runQuarkAnalysis + + SAMPLE_PATH = "allsafe.apk" + RULE_PATH = "writeContentToLog.json" + KEYWORDS_FOR_NEUTRALIZATION = ["escape", "replace", "format", "setFilter"] + + ruleInstance = Rule(RULE_PATH) + quarkResult = runQuarkAnalysis(SAMPLE_PATH, ruleInstance) + + for logOutputBehavior in quarkResult.behaviorOccurList: + + secondAPIParam = logOutputBehavior.getParamValues()[1] + + isKeywordFound = False + for keyword in KEYWORDS_FOR_NEUTRALIZATION: + if keyword in secondAPIParam: + isKeywordFound = True + break + + if not isKeywordFound: + print(f"CWE-117 is detected in method,{secondAPIParam}") + +Quark Rule: writeContentToLog.json +============================================== + +.. code-block:: json + + { + "crime": "Write contents to the log.", + "permission": [], + "api": [ + { + "descriptor": "()Landroid/text/Editable;", + "class": "Lcom/google/android/material/textfield/TextInputEditText;", + "method": "getText" + }, + { + "descriptor": "(Ljava/lang/String;Ljava/lang/String;)I", + "class": "Landroid/util/Log;", + "method": "d" + } + ], + "score": 1, + "label": [] + } + +Quark Script Result +====================== +- **allsafe.apk** + +.. code-block:: TEXT + + $ python CWE-117.py + CWE-117 is detected in method,Ljava/lang/StringBuilder;->toString()Ljava/lang/String;(Ljava/lang/StringBuilder;->append(Ljava/lang/String;)Ljava/lang/StringBuilder;(Ljava/lang/StringBuilder;->append(Ljava/lang/String;)Ljava/lang/StringBuilder;(Ljava/lang/StringBuilder;->()V(Ljava/lang/StringBuilder;),User entered secret: ),Ljava/lang/Object;->toString()Ljava/lang/String;(Lcom/google/android/material/textfield/TextInputEditText;->getText()Landroid/text/Editable;()))) + +Detect CWE-940 in Android Application (ovaa,Vuldroid) +------------------------------------------------------ +This scenario aims to demonstrate the detection of the **Improper Verification of Source of a Communication Channel** vulnerability using `ovaa.apk `_ and `Vuldroid.apk `_. See `CWE-940 `_ for more details. + +To begin with, we create a detection rule named ``LoadUrlFromIntent.json`` to identify behavior that loads url from intent data to the WebView. + +Next, we retrieve the methods that pass the url. Following this, we check if these methods are only for setting intent, such as findViewById, getStringExtra, or getIntent. + +If **NO**, it could imply that the APK uses communication channels without proper verification, which may cause CWE-940 vulnerability. + +Quark Script CWE-940.py +========================== + +The Quark Script below uses ovaa.apk to demonstrate. You can change the ``SAMPLE_PATH`` to the sample you want to detect. For example, ``SAMPLE_PATH = "Vuldroid.apk"``. + + +.. code-block:: python + + from quark.script import runQuarkAnalysis, Rule + + SAMPLE_PATH = "ovaa.apk" + RULE_PATH = "LoadUrlFromIntent.json" + + INTENT_SETTING_METHODS = [ + "findViewById", + "getStringExtra", + "getIntent", + ] + + ruleInstance = Rule(RULE_PATH) + + quarkResult = runQuarkAnalysis(SAMPLE_PATH, ruleInstance) + + for behaviorInstance in quarkResult.behaviorOccurList: + methodsInArgs = behaviorInstance.getMethodsInArgs() + + verifiedMethodCandidates = [] + + for method in methodsInArgs: + if method.methodName not in INTENT_SETTING_METHODS: + verifiedMethodCandidates.append(method) + + if verifiedMethodCandidates == []: + caller = behaviorInstance.methodCaller.fullName + print(f"cwe-940 is detected in method, {caller}") + + + +Quark Rule: LoadUrlFromIntent.json +============================================== + +.. code-block:: json + + { + "crime": "Load Url from Intent and open WebView", + "permission": [], + "api": [ + { + "class": "Landroid/content/Intent;", + "method": "getStringExtra", + "descriptor": "(Ljava/lang/String;)Ljava/lang/String" + }, + { + "class": "Landroid/webkit/WebView;", + "method": "loadUrl", + "descriptor": "(Ljava/lang/String;)V" + } + ], + "score": 1, + "label": [] + } + +Quark Script Result +====================== +- **ovaa.apk** + +.. code-block:: TEXT + + $ python CWE-940.py + CWE-940 is detected in method, Loversecured/ovaa/activities/WebViewActivity; onCreate (Landroid/os/Bundle;)V + diff --git a/docs/source/visual_quark_script_program.rst b/docs/source/visual_quark_script_program.rst new file mode 100644 index 00000000..9e8e2a28 --- /dev/null +++ b/docs/source/visual_quark_script_program.rst @@ -0,0 +1,22 @@ +++++++++++++++++++++++++++++ +Visual Quark Script Program +++++++++++++++++++++++++++++ + +Introduction of the Program +---------------------------- + +Quark Script is a powerful tool for detecting and analyzing mobile security. However, it can be complex and challenging for user who are not familiar with programming. To overcome this challenge, we are pleased to announce our plan to develop a visual programming tool that simplifies the Quark Script organization process, making it easy for anyone to organize Quark Script using a simple UI interface. + +Goal of the Program +-------------------- + +We aim to make Quark Script programming accessible to everyone and remove the barriers that often come with traditional programming languages. So we design a new visual tool for Quark Script organization. It would be more intuitive, time-saving, and effort-saving for users, even if they are not familiar with programming. + +Web Design Layout +------------------ + +The initial draft of the web design is as below. + +Design by: `@Commuter95 `_ + +.. image:: https://github.com/quark-engine/quark-engine/assets/16009212/053d62e2-181a-4fb1-96d7-95fe59809dc3 diff --git a/quark/__init__.py b/quark/__init__.py index f5194672..cf2cc658 100644 --- a/quark/__init__.py +++ b/quark/__init__.py @@ -1 +1 @@ -__version__ = "23.7.1" +__version__ = "23.8.1" diff --git a/quark/cli.py b/quark/cli.py index 6f5a6e28..6d06ca97 100644 --- a/quark/cli.py +++ b/quark/cli.py @@ -133,7 +133,7 @@ "--core-library", "core_library", help="Specify the core library used to analyze an APK", - type=click.Choice(("androguard", "rizin"), case_sensitive=False), + type=click.Choice(("androguard", "rizin", "radare2"), case_sensitive=False), required=False, default="androguard", ) diff --git a/quark/core/axmlreader/__init__.py b/quark/core/axmlreader/__init__.py index a062a37e..d8f7dd98 100644 --- a/quark/core/axmlreader/__init__.py +++ b/quark/core/axmlreader/__init__.py @@ -8,6 +8,7 @@ import pkg_resources import rzpipe +import r2pipe # Resource Types Definition # Please reference to @@ -84,7 +85,7 @@ class AxmlReader(object): A Class that parses the Android XML file """ - def __init__(self, file_path, structure_path=None): + def __init__(self, file_path, core_library="rizin", structure_path=None): if structure_path is None: structure_path = pkg_resources.resource_filename( "quark.core.axmlreader", "axml_definition" @@ -96,10 +97,13 @@ def __init__(self, file_path, structure_path=None): f" of Rizin in {structure_path}" ) - self._rz = rzpipe.open(file_path) - self._rz.cmd(f"pfo {structure_path}") + if core_library == "rizin": + self._core = rzpipe.open(file_path) + else: + self._core = r2pipe.open(file_path) + self._core.cmd(f"pfo {structure_path}") - self._file_size = int(self._rz.cmd("i~size[1]"), 16) + self._file_size = int(self._core.cmd("i~size[1]"), 16) self._ptr = 0 self._cache = {} @@ -110,7 +114,7 @@ def __init__(self, file_path, structure_path=None): raise AxmlException("Filesize exceeds theoretical lower bound.") # File Header - header = self._rz.cmdj("pfj axml_ResChunk_header @ 0x0") + header = self._core.cmdj("pfj axml_ResChunk_header @ 0x0") self._data_type = header[0]["value"] self._axml_size = header[2]["value"] @@ -133,7 +137,7 @@ def __init__(self, file_path, structure_path=None): return # String Pool - string_pool_header = self._rz.cmdj("pfj axml_ResStringPool_header @ 8") + string_pool_header = self._core.cmdj("pfj axml_ResStringPool_header @ 8") string_pool_size = string_pool_header[0]["value"][2]["value"] @@ -163,18 +167,18 @@ def __init__(self, file_path, structure_path=None): self._stringCount = string_pool_header[1]["value"] stringStart = string_pool_header[4]["value"] - self._rz.cmd(f"f string_pool_header @ 0x8 ") + self._core.cmd(f"f string_pool_header @ 0x8 ") string_pool_index = header_size + self._ptr - self._rz.cmd(f"f string_pool_index @ { string_pool_index }") + self._core.cmd(f"f string_pool_index @ { string_pool_index }") string_pool_data = stringStart + self._ptr - self._rz.cmd(f"f string_pool_data @ { string_pool_data }") + self._core.cmd(f"f string_pool_data @ { string_pool_data }") self._ptr += string_pool_size if self._ptr >= self._axml_size: return # Resource Map (Optional) - header = self._rz.cmdj(f"pfj axml_ResChunk_header @ {self._ptr}") + header = self._core.cmdj(f"pfj axml_ResChunk_header @ {self._ptr}") header_type = header[0]["value"] if header_type == RES_XML_RESOURCE_MAP_TYPE: @@ -201,7 +205,7 @@ def __iter__(self) -> Iterator[ResChunkHeader]: :yield: header of a resource chunk defined in the binary """ while self._axml_size - self._ptr >= 16: - header = self._rz.cmdj(f"pfj axml_ResXMLTree_node @ {self._ptr}") + header = self._core.cmdj(f"pfj axml_ResXMLTree_node @ {self._ptr}") node_type = header[0]["value"][0]["value"] header_size = header[0]["value"][1]["value"] @@ -224,7 +228,7 @@ def __iter__(self) -> Iterator[ResChunkHeader]: chunk = {"Address": self._ptr, "Type": node_type} if node_type == RES_XML_START_ELEMENT_TYPE: - ext = self._rz.cmdj( + ext = self._core.cmdj( f"pfj axml_ResXMLTree_attrExt @ { ext_ptr }" ) @@ -235,7 +239,7 @@ def __iter__(self) -> Iterator[ResChunkHeader]: # node['AttrCount'] = ext[4]['value'] elif node_type == RES_XML_END_ELEMENT_TYPE: - ext = self._rz.cmdj( + ext = self._core.cmdj( f"pfj axml_ResXMLTree_endElementExt @ { ext_ptr }" ) @@ -246,7 +250,7 @@ def __iter__(self) -> Iterator[ResChunkHeader]: RES_XML_START_NAMESPACE_TYPE, RES_XML_END_NAMESPACE_TYPE, ]: - ext = self._rz.cmdj( + ext = self._core.cmdj( f"pfj axml_ResXMLTree_namespaceExt @ { ext_ptr }" ) @@ -254,7 +258,7 @@ def __iter__(self) -> Iterator[ResChunkHeader]: chunk["Uri"] = ext[1]["value"][0]["value"] elif node_type == RES_XML_CDATA_TYPE: - ext = self._rz.cmdj( + ext = self._core.cmdj( f"pfj axml_ResXMLTree_cdataExt @ { ext_ptr }" ) @@ -281,7 +285,7 @@ def get_string(self, index): if index < 0 or index >= self._stringCount: return None - return self._rz.cmdj( + return self._core.cmdj( f"pfj Z @ string_pool_data + `pfv n4 " f"@ string_pool_index+ {index}*4` + 2" )[0]["string"] @@ -296,14 +300,14 @@ def get_attributes(self, chunk: ResChunkHeader) -> List[ResValue]: return None extAddress = int(chunk["Address"]) + 16 - attrExt = self._rz.cmdj(f"pfj axml_ResXMLTree_attrExt @ {extAddress}") + attrExt = self._core.cmdj(f"pfj axml_ResXMLTree_attrExt @ {extAddress}") attrAddress = extAddress + attrExt[2]["value"] attributeSize = attrExt[3]["value"] attributeCount = attrExt[4]["value"] attributes = [] for _ in range(attributeCount): - attr = self._rz.cmdj( + attr = self._core.cmdj( f"pfj axml_ResXMLTree_attribute @ {attrAddress}" ) @@ -402,6 +406,6 @@ def get_xml_tree(self) -> XMLElementTree: def __del__(self): try: - self._rz.quit() + self._core.quit() except BaseException: pass diff --git a/quark/core/quark.py b/quark/core/quark.py index 4ecab5e1..423f1acf 100644 --- a/quark/core/quark.py +++ b/quark/core/quark.py @@ -14,6 +14,7 @@ from quark.core.analysis import QuarkAnalysis from quark.core.apkinfo import AndroguardImp from quark.core.rzapkinfo import RizinImp +from quark.core.r2apkinfo import R2Imp from quark.evaluator.pyeval import PyEval from quark.utils import tools from quark.utils.colors import ( @@ -49,6 +50,8 @@ def __init__(self, apk, core_library="androguard"): core_library = core_library.lower() if core_library == "rizin": self.apkinfo = RizinImp(apk) + elif core_library == "radare2": + self.apkinfo = R2Imp(apk) elif core_library == "androguard": self.apkinfo = AndroguardImp(apk) else: diff --git a/quark/core/r2apkinfo.py b/quark/core/r2apkinfo.py new file mode 100644 index 00000000..7e768b77 --- /dev/null +++ b/quark/core/r2apkinfo.py @@ -0,0 +1,702 @@ +# -*- coding: utf-8 -*- +# This file is part of Quark-Engine - https://github.com/quark-engine/quark-engine +# See the file 'LICENSE' for copying permission. + +import functools +import logging +import os.path +import re +import tempfile +import zipfile +from collections import defaultdict, namedtuple +from os import PathLike +from typing import Any, Dict, Generator, List, Optional, Set, Tuple, Union + +import r2pipe + +from quark.core.axmlreader import AxmlReader +from quark.core.interface.baseapkinfo import BaseApkinfo, XMLElement +from quark.core.struct.bytecodeobject import BytecodeObject +from quark.core.struct.methodobject import MethodObject +from quark.utils.tools import ( + descriptor_to_androguard_format, + remove_dup_list, +) + +R2Cache = namedtuple("r2_cache", "address is_imported") + +PRIMITIVE_TYPE_MAPPING = { + "void": "V", + "boolean": "Z", + "byte": "B", + "char": "C", + "short": "S", + "int": "I", + "long": "J", + "float": "F", + "double": "D", +} + +R2_ESCAPE_CHAR_LIST = ["$"] + + +class R2Imp(BaseApkinfo): + def __init__( + self, + apk_filepath: Union[str, PathLike], + tmp_dir: Union[str, PathLike] = None, + ): + super().__init__(apk_filepath, "radare2") + + if self.ret_type == "DEX": + self._tmp_dir = None + + elif self.ret_type == "APK": + self._tmp_dir = tempfile.mkdtemp() if tmp_dir is None else tmp_dir + + # Extract AndroidManifest.xml + with zipfile.ZipFile(self.apk_filepath) as apk: + apk.extract("AndroidManifest.xml", path=self._tmp_dir) + + self._manifest = os.path.join(self._tmp_dir, "AndroidManifest.xml") + + else: + raise ValueError("Unsupported File type.") + + @functools.cached_property + def _r2(self): + """ + Return a R2 object that opens the specified Dex file. + + :param index: an index indicating which Dex file should the returned + object open + :return: a R2 object opening the specified Dex file + """ + if self.ret_type == "DEX": + r2 = r2pipe.open(f"{self.apk_filepath}") + elif self.ret_type == "APK": + r2 = r2pipe.open(f"apk://{self.apk_filepath}") + + r2.cmd("aa") + return r2 + + def _convert_type_to_type_signature(self, raw_type: str): + """ + Convert a Java type in the format of the Java language into the + one in the format of the Java VM type signature. + + For example, + + `int` will be converted into the Java VM type signature `I`. + + `long` will be converted into the Java VM type signature `L`. + + `String...` will be converted into the Java VM type signature + `[Ljava/lang/String;`. + + :param raw_type: a type in the format of the Java language + :return: a type in the format of the Java VM type signature + """ + if not raw_type: + return raw_type + + if raw_type.endswith("[]"): + return "[" + self._convert_type_to_type_signature(raw_type[:-2]) + + if raw_type.startswith("["): + return "[" + self._convert_type_to_type_signature(raw_type[1:]) + + if "..." in raw_type: + index = raw_type.index("...") + return "[" + self._convert_type_to_type_signature(raw_type[:index]) + + if raw_type in PRIMITIVE_TYPE_MAPPING: + return PRIMITIVE_TYPE_MAPPING[raw_type] + + if "." in raw_type or "_" in raw_type: + raw_type = raw_type.replace(".", "/") + raw_type = raw_type.replace("_", "$") + return "L" + raw_type + ";" + + return raw_type + ";" + + @staticmethod + def _escape_str_in_r2_manner(raw_str: str): + """ + Convert characters with special meanings in R2 into `_`. + For now, the character is `$`. + + :param raw_str: a string that may consist of characters with special + meanings. + :return: a new string contains no characters with special meanings. + """ + for c in R2_ESCAPE_CHAR_LIST: + raw_str = raw_str.replace(c, "_") + return raw_str + + def _parse_method_from_isj_obj(self, json_obj): + """ + Parse a JSON object provided by the R2 command `isj` or `is.j` into + an instance of MethodObject. + + :param json_obj: a JSON object provided by the R2 command `isj` or + `is.j` + :param dexindex: an index indicating from which Dex file the JSON + object is generated + :return: an instance of MethodObject + """ + if json_obj.get("type") not in ["FUNC", "METH"]: + return None + + parse_pattern = re.compile(r"(^[\[|L].*)\.method\.(.*)(\(.*\).*)") + + real_name = json_obj.get("realname") + if not real_name: + return None + + class_name, method_name, descriptor = parse_pattern.match(real_name).groups() + + # -- Descriptor -- + descriptor = descriptor_to_androguard_format(descriptor) + + # -- Is imported -- + is_imported = json_obj.get("is_imported") + + # -- Method name -- + method_name = self._escape_str_in_r2_manner(method_name) + if method_name.endswith("_"): + method_name = method_name[:-1] + + # -- Class name -- + + # Exclude start with "imp.[" + if class_name.startswith("["): + return None + + class_name = self._convert_type_to_type_signature(class_name) + + # Append the method + method = MethodObject( + class_name=class_name, + name=method_name, + descriptor=descriptor, + cache=R2Cache(json_obj["vaddr"], is_imported), + ) + + return method + + @functools.lru_cache + def _get_methods_classified(self): + """ + Parse all methods in the specified Dex and convert them into a + dictionary. The dictionary takes their belonging classes as the keys. + Then, it categorizes them into lists. + + :return: a dictionary taking a class name as the key and a list of + MethodObject as the corresponding value. + """ + method_json_list = self._r2.cmdj("isj") + method_dict = defaultdict(list) + for json_obj in method_json_list: + method = self._parse_method_from_isj_obj(json_obj) + + if method: + method_dict[method.class_name].append(method) + + # Remove duplicates + for class_name, method_list in method_dict.items(): + method_dict[class_name] = remove_dup_list(method_list) + + return method_dict + + @functools.cached_property + def permissions(self) -> List[str]: + """ + Inherited from baseapkinfo.py. + Return the permissions used by the sample. + + :return: a list of permissions. + """ + axml = AxmlReader(self._manifest, core_library="radare2") + elm_key_name = "{http://schemas.android.com/apk/res/android}name" + permission_list = set() + for elm in axml.get_xml_tree().iter("uses-permission"): + permission = elm.attrib.get(elm_key_name) + permission_list.add(permission) + + return permission_list + + @functools.cached_property + def application(self) -> XMLElement: + """Get the application element from the manifest file. + + :return: an application element + """ + + axml = AxmlReader(self._manifest, core_library="radare2") + root = axml.get_xml_tree() + + return root.find("application") + + @functools.cached_property + def activities(self) -> List[XMLElement]: + """ + Return all activity from given APK. + + :return: a list of all activities + """ + axml = AxmlReader(self._manifest, core_library="radare2") + root = axml.get_xml_tree() + + return root.findall("application/activity") + + @functools.cached_property + def receivers(self) -> List[XMLElement]: + """ + Return all receivers from the given APK. + + :return: a list of all receivers + """ + axml = AxmlReader(self._manifest, core_library="radare2") + root = axml.get_xml_tree() + + return root.findall("application/receiver") + + @property + def android_apis(self) -> Set[MethodObject]: + """ + Inherited from baseapkinfo.py. + Return all Android native APIs used by the sample. + + :return: a set of MethodObjects + """ + return { + method + for method in self.all_methods + if method.is_android_api() and method.cache.is_imported + } + + @property + def custom_methods(self) -> Set[MethodObject]: + """_ + Inherited from baseapkinfo.py. + Return all custom methods declared by the sample. + + :return: a set of MethodObjects + """ + return { + method + for method in self.all_methods + if not method.cache.is_imported + } + + @functools.cached_property + def all_methods(self) -> Set[MethodObject]: + """_ + Inherited from baseapkinfo.py. + Return all methods including Android native APIs and custom methods + declared in the sample. + + :return: a set of MethodObjects + """ + method_set = set() + for method_list in self._get_methods_classified().values(): + method_set.update(method_list) + + return method_set + + def find_method( + self, + class_name: Optional[str] = ".*", + method_name: Optional[str] = ".*", + descriptor: Optional[str] = ".*", + ) -> List[MethodObject]: + """ + Inherited from baseapkinfo.py. + Find a method with the given class name, method name, and descriptor. + + :param class_name: the class name of the target method. Defaults to + ".*" + :param method_name: the method name of the target method. Defaults to + ".*" + :param descriptor: the descriptor of the target method. Defaults to + ".*" + :return: a list of the target MethodObject + """ + if not class_name: + class_name = ".*" + + if not method_name: + method_name = ".*" + + if method_name != ".*": + method_name = re.escape(method_name) + + if not descriptor: + descriptor = ".*" + + if descriptor != ".*": + descriptor = re.escape(descriptor) + + def method_filter(method): + return re.match(method_name, method.name) and re.match( + descriptor, method.descriptor + ) + + filtered_methods = list() + + if class_name != ".*": + method_dict = self._get_methods_classified() + filtered_methods += list( + filter(method_filter, method_dict[class_name]) + ) + else: + method_dict = self._get_methods_classified() + for key_name in method_dict: + filtered_methods += list( + filter(method_filter, method_dict[key_name]) + ) + + return filtered_methods + + @functools.lru_cache + def upperfunc(self, method_object: MethodObject) -> Set[MethodObject]: + """ + Inherited from baseapkinfo.py. + Find the xrefs from the specified method. + + :param method_object: a target method which the returned methods + should call + :return: a set of MethodObjects + """ + cache = method_object.cache + + xrefs = self._r2.cmdj(f"axtj @ {cache.address}") + upperfunc_set = set() + for xref in xrefs: + if xref["type"] != "CALL": + continue + + if "from" in xref: + matched_method = self._get_method_by_address(xref["from"]) + if not matched_method: + logging.debug( + f"Cannot identify function at {xref['from']}." + ) + continue + + upperfunc_set.add(matched_method) + else: + logging.debug( + f"Key from was not found when trying to search" + f" upper methods of {method_object}." + ) + + return upperfunc_set + + @functools.lru_cache + def lowerfunc( + self, method_object: MethodObject + ) -> Set[Tuple[MethodObject, int]]: + """ + Inherited from baseapkinfo.py. + Find the xrefs to the specified method. + + :param method_object: a target method used to find what methods it + calls + :return: a set of tuples consisting of the called method and the + offset of the invocation + """ + cache = method_object.cache + + instruct_flow = self._r2.cmdj(f"pdfj @ {cache.address}")["ops"] + + lowerfunc_list = [] + for ins in instruct_flow: + if "refs" in ins: + call_xrefs = ( + xref + for xref in ins["refs"] + if xref["type"] == "CALL" + ) + + for call_xref in call_xrefs: + lowerfunc = self._get_method_by_address(call_xref["addr"]) + if not lowerfunc: + logging.debug( + f"Cannot identify function at {call_xref['addr']}." + ) + continue + + offset = ins["offset"] - cache.address + + lowerfunc_list.append((lowerfunc, offset)) + + return lowerfunc_list + + def get_method_bytecode( + self, method_object: MethodObject + ) -> Generator[BytecodeObject, None, None]: + """ + Inherited from baseapkinfo.py. + Return the bytecodes of the specified method. + + :param method_object: a target method to get the corresponding + bytecodes + :yield: a generator of BytecodeObjects + """ + cache = method_object.cache + if not cache.is_imported: + + instruct_flow = self._r2.cmdj(f"pdfj @ {cache.address}")["ops"] + if instruct_flow: + for ins in instruct_flow: + if "disasm" not in ins: + continue + + yield self._parse_smali(ins["disasm"]) + + def get_strings(self) -> Set[str]: + """ + Inherited from baseapkinfo.py. + Return all strings in the sample. + + :return: a set of strings + """ + strings = set() + string_detail_list = self._r2.cmdj("izzj") + strings.update( + [string_detail["string"] for string_detail in string_detail_list] + ) + + return strings + + def get_wrapper_smali( + self, + parent_method: MethodObject, + first_method: MethodObject, + second_method: MethodObject, + ) -> Dict[str, Union[BytecodeObject, str]]: + """ + Inherited from baseapkinfo.py. + Find the invocations that call two specified methods, first_method + and second_method, respectively. Then, return a dictionary storing + the corresponding bytecodes and hex values. + + :param parent_method: a parent method to scan + :param first_method: the first method called by the parent method + :param second_method: the second method called by the parent method + :return: a dictionary storing the corresponding bytecodes and hex + values. + """ + + def convert_bytecode_to_list(bytecode): + return [bytecode.mnemonic] + bytecode.registers + [bytecode.parameter] + + cache = parent_method.cache + + result = { + "first": None, + "first_hex": None, + "second": None, + "second_hex": None, + } + + search_pattern = "{class_name}.{name}{descriptor}" + first_method_pattern = search_pattern.format( + class_name=first_method.class_name[:-1], + name=first_method.name, + descriptor=first_method.descriptor, + ) + second_method_pattern = search_pattern.format( + class_name=second_method.class_name[:-1], + name=second_method.name, + descriptor=second_method.descriptor, + ) + + if cache.is_imported: + return {} + + instruction_flow = self._r2.cmdj(f"pdfj @ {cache.address}")["ops"] + + if instruction_flow: + for ins in instruction_flow: + # Skip the instruction without disam field. + if "disam" not in ins: + continue + + if ins["disasm"].startswith("invoke"): + if ";" in ins["disasm"]: + index = ins["disasm"].rindex(";") + instrcution_string = ins["disasm"][:index] + + if first_method_pattern in instrcution_string: + result["first"] = convert_bytecode_to_list( + self._parse_smali(instrcution_string) + ) + result["first_hex"] = " ".join( + map( + lambda r: r.group(0), + re.finditer(r"\w{2}", ins["bytes"]), + ) + ) + if second_method_pattern in instrcution_string: + result["second"] = convert_bytecode_to_list( + self._parse_smali(instrcution_string) + ) + result["second_hex"] = " ".join( + map( + lambda r: r.group(0), + re.finditer(r"\w{2}", ins["bytes"]), + ) + ) + + return result + + @functools.cached_property + def superclass_relationships(self) -> Dict[str, Set[str]]: + """ + Inherited from baseapkinfo.py. + Return a dictionary holding the inheritance relationship of classes in + the sample. The dictionary takes a class name as the key and the + corresponding superclass as the value. + + :return: a dictionary taking a class name as the key and the + corresponding superclass as the value. + """ + hierarchy_dict = defaultdict(set) + + class_info_list = self._r2.cmdj("icj") + for class_info in class_info_list: + class_name = class_info["classname"] + class_name = self._convert_type_to_type_signature(class_name) + super_classes = class_info["super"] + + for super_class in super_classes: + hierarchy_dict[class_name].add(super_class) + + return hierarchy_dict + + @functools.cached_property + def subclass_relationships(self) -> Dict[str, Set[str]]: + """ + Inherited from baseapkinfo.py. + Return a dictionary holding the inheritance relationship of classes in + the sample. Return a dictionary holding the inheritance relationship + of classes in the sample. The dictionary takes a class name as the key + and the corresponding subclasses as the value. + + :return: a dictionary taking a class name as the key and the + corresponding subclasses as the value. + """ + hierarchy_dict = defaultdict(set) + + class_info_list = self._r2.cmdj("icj") + for class_info in class_info_list: + class_name = class_info["classname"] + super_class = class_info["super"] + + hierarchy_dict[super_class].add(class_name) + + return hierarchy_dict + + def _get_method_by_address(self, address: int) -> MethodObject: + """ + Find a method via a specified address. + + :param address: an address used to find the corresponding method + :return: the MethodObject of the method in the given address + """ + json_data = self._r2.cmdj(f"is.j @ {address}") + json_data = json_data.get("symbols") + + if json_data: + return self._parse_method_from_isj_obj(json_data) + else: + return None + + def _get_string_by_address(self, address: str) -> str: + """ + Find the content of string via the specified string address. + + :param address: an address used to find the corresponding method + :return: the content in the given address + """ + content = self._r2.cmd(f"pfq z @ {int(address, 16)}") + return content + + @staticmethod + def _parse_parameter(parameter: str, p_type: str = "int") -> Any: + """Parse the value of the parameter based on the mnemonic. + + :param mnemonic: the mnemonic of a bytecode + :param parameter: the parameter of a bytecode + :return: the value of the parameter + """ + if p_type == "int": + try: + parameter = int(parameter, 16) + except (TypeError, ValueError): + return R2Imp._parse_parameter(parameter, "float") + + elif p_type == "float": + try: + parameter = float(parameter) + except (TypeError, ValueError): + return R2Imp._parse_parameter(parameter, "str") + + elif p_type == "str": + parameter = re.sub(r"\.", ";->", parameter, count=1) + # Skip extra parameter. e.g. 0x18a or space + parameter = parameter.split(" ;")[0] + + return parameter + + def _parse_smali(self, smali: str) -> BytecodeObject: + """ + Convert a Smali code provided by the R2 command `pdfj` into a + BytecodeObject. + + :param smali: a Smali code provided by the R2 command `pdfj` + :raises ValueError: if the Smali code follows an unknown format + :return: a BytecodeObject + """ + if smali == "": + raise ValueError("Argument cannot be empty.") + + if " " not in smali: + return BytecodeObject(smali, None, None) + + mnemonic, args = smali.split(maxsplit=1) # Split into twe parts + + args = [arg.strip() for arg in re.split("[{},]+", args) if arg] + + if mnemonic == "const-string" and args[-1][:2] == "0x": + args[-1] = self._get_string_by_address(args[-1]) + + parameter = None + # Remove the parameter at the last + if args and not args[-1].startswith("v"): + parameter = R2Imp._parse_parameter(args[-1]) + args = args[:-1] + + register_list = [] + # Ranged registers + if len(args) == 1 and (":" in args[0] or ".." in args[0]): + register_list = args[0] + register_list = [ + int(reg[1:]) for reg in re.split("[:.]+", register_list) if reg + ] + + if ".." in args[0]: + register_list = range(register_list[0], register_list[1] + 1) + + # Simple registers + elif len(args) != 0: + try: + register_list = [int(arg[1:]) for arg in args] + except ValueError: + raise ValueError( + f"Cannot parse bytecode. Unknown smali {smali}." + ) + + register_list = [f"v{index}" for index in register_list] + + return BytecodeObject(mnemonic, register_list, parameter) diff --git a/setup.py b/setup.py index 60c5842e..e868ec90 100644 --- a/setup.py +++ b/setup.py @@ -15,6 +15,7 @@ "plotly", "rzpipe", "click", + "r2pipe==1.8.0" ] # # "kaleido", diff --git a/tests/core/test_apkinfo.py b/tests/core/test_apkinfo.py index c8d1c873..3bb8506d 100644 --- a/tests/core/test_apkinfo.py +++ b/tests/core/test_apkinfo.py @@ -7,6 +7,7 @@ from quark.core.apkinfo import AndroguardImp from quark.core.interface.baseapkinfo import BaseApkinfo from quark.core.rzapkinfo import RizinImp +from quark.core.r2apkinfo import R2Imp from quark.core.struct.bytecodeobject import BytecodeObject from quark.core.struct.methodobject import MethodObject @@ -28,7 +29,7 @@ def apk_path(): @pytest.fixture( scope="function", - params=((AndroguardImp), (RizinImp)), + params=((AndroguardImp), (RizinImp), (R2Imp)), ) def apkinfo(request, apk_path): Apkinfo, apk_path = request.param, apk_path @@ -37,6 +38,33 @@ def apkinfo(request, apk_path): yield apkinfo +@pytest.fixture( + scope="function", + params=((AndroguardImp), (RizinImp)), +) +def apkinfo_without_R2Imp(request, apk_path): + """Since R2 has some issue, + create this function to skip R2 relevant test for some test functions. + """ + Apkinfo, apk_path = request.param, apk_path + apkinfo = Apkinfo(apk_path) + + yield apkinfo + + +@pytest.fixture( + scope="function", + params=((R2Imp),), +) +def apkinfo_with_R2Imp_only(request, apk_path): + """For testcases involved with R2 core lib. + """ + Apkinfo, apk_path = request.param, apk_path + apkinfo = Apkinfo(apk_path) + + yield apkinfo + + @pytest.fixture(scope="function") def dex_file(): APK_SOURCE = ( @@ -290,7 +318,8 @@ def test_find_method(apkinfo, test_input, expected): assert isinstance(result, list) assert expect_method in result - def test_upperfunc(self, apkinfo): + def test_upperfunc(self, apkinfo_without_R2Imp): + apkinfo = apkinfo_without_R2Imp api = apkinfo.find_method( "Lcom/example/google/service/ContactsHelper;", "", @@ -307,25 +336,8 @@ def test_upperfunc(self, apkinfo): assert expect_function in upper_methods - def test_lowerfunc(self, apkinfo): - method = apkinfo.find_method( - "Lcom/example/google/service/WebServiceCalling;", - "Send", - "(Landroid/os/Handler; Ljava/lang/String;)V", - )[0] - - expect_method = MethodObject( - "Ljava/lang/StringBuilder;", - "append", - "(Ljava/lang/String;)Ljava/lang/StringBuilder;", - ) - expect_offset = 42 - - upper_methods = apkinfo.lowerfunc(method) - - assert (expect_method, expect_offset) in upper_methods - - def test_get_method_bytecode(self, apkinfo): + def test_get_method_bytecode(self, apkinfo_without_R2Imp): + apkinfo = apkinfo_without_R2Imp expected_bytecode_list = [ BytecodeObject( "iput-object", @@ -365,7 +377,8 @@ def test_get_method_bytecode(self, apkinfo): for expected in expected_bytecode_list: assert expected in bytecodes - def test_lowerfunc(self, apkinfo): + def test_lowerfunc(self, apkinfo_without_R2Imp): + apkinfo = apkinfo_without_R2Imp method = apkinfo.find_method( "Lcom/example/google/service/SMSReceiver;", "isContact", @@ -390,3 +403,32 @@ def test_superclass_relationships_with_expected_class(self, apkinfo): upper_set = apkinfo.superclass_relationships[class_name] assert expected_upper_class == upper_set + + + @staticmethod + @pytest.mark.parametrize( + "test_input, expected", + [ + ( + "Landroid/view/KeyEvent;", + str, + ), + ( + 0x3e8, + float, + ), + ( + ("Ljava/lang/StringBuilder;->append(Ljava/lang/String;)" + "Ljava/lang/StringBuilder;"), + str, + ), + ( + "str.google.c.a.tc", + str, + ), + ], + ) + def test_parse_parameter(test_input, expected, apkinfo_with_R2Imp_only): + apkinfo = apkinfo_with_R2Imp_only + parsed_param = apkinfo._parse_parameter(test_input) + assert isinstance(parsed_param, expected) diff --git a/tests/core/test_axmlreader.py b/tests/core/test_axmlreader.py index 417b7dc4..fce545d5 100644 --- a/tests/core/test_axmlreader.py +++ b/tests/core/test_axmlreader.py @@ -11,6 +11,16 @@ from quark.core.axmlreader import AxmlReader, ResValue + +@pytest.fixture( + scope="function", + params=(("radare2"), ("rizin")), +) +def core_library(request): + core_lib = request.param + yield core_lib + + def extractManifest(samplePath: PathLike) -> str: folder = Path(samplePath).parent @@ -27,8 +37,8 @@ def MANIFEST_PATH_14d9f(SAMPLE_PATH_14d9f): class TestAxmlReader: @staticmethod - def testIter(MANIFEST_PATH_14d9f) -> None: - axmlReader = AxmlReader(MANIFEST_PATH_14d9f) + def testIter(core_library, MANIFEST_PATH_14d9f) -> None: + axmlReader = AxmlReader(MANIFEST_PATH_14d9f, core_library) expectedTag = {"Address": 3728, "Type": 256, "Prefix": 9, "Uri": 10} tag = next(iter(axmlReader)) @@ -37,23 +47,23 @@ def testIter(MANIFEST_PATH_14d9f) -> None: helper.assertDictEqual(tag, expectedTag) @staticmethod - def testFileSize(MANIFEST_PATH_14d9f): - axmlReader = AxmlReader(MANIFEST_PATH_14d9f) + def testFileSize(core_library, MANIFEST_PATH_14d9f): + axmlReader = AxmlReader(MANIFEST_PATH_14d9f, core_library) assert axmlReader.file_size == 7676 @staticmethod - def testAxmlSize(MANIFEST_PATH_14d9f): - axmlReader = AxmlReader(MANIFEST_PATH_14d9f) + def testAxmlSize(core_library, MANIFEST_PATH_14d9f): + axmlReader = AxmlReader(MANIFEST_PATH_14d9f, core_library) assert axmlReader.axml_size == 7676 @staticmethod - def testGetString(MANIFEST_PATH_14d9f): - axmlReader = AxmlReader(MANIFEST_PATH_14d9f) + def testGetString(core_library, MANIFEST_PATH_14d9f): + axmlReader = AxmlReader(MANIFEST_PATH_14d9f, core_library) assert axmlReader.get_string(13) == "manifest" @staticmethod - def testGetAttributes(MANIFEST_PATH_14d9f): - axmlReader = AxmlReader(MANIFEST_PATH_14d9f) + def testGetAttributes(core_library, MANIFEST_PATH_14d9f): + axmlReader = AxmlReader(MANIFEST_PATH_14d9f, core_library) manifestTag = list(axmlReader)[1] expectedAttributes = [ @@ -68,8 +78,8 @@ def testGetAttributes(MANIFEST_PATH_14d9f): assert expectedAttrib == attrib @staticmethod - def testGetXmlTree(MANIFEST_PATH_14d9f): - axmlReader = AxmlReader(MANIFEST_PATH_14d9f) + def testGetXmlTree(core_library, MANIFEST_PATH_14d9f): + axmlReader = AxmlReader(MANIFEST_PATH_14d9f, core_library) xml = axmlReader.get_xml_tree() manifestLabel = xml.getroot() assert len(manifestLabel.findall("uses-sdk")) == 1