diff --git a/.github/workflows/pytest.yml b/.github/workflows/pytest.yml index a2fc5a73..e8e6b0dc 100644 --- a/.github/workflows/pytest.yml +++ b/.github/workflows/pytest.yml @@ -26,7 +26,7 @@ jobs: - name: Install dependencies run: | python -m pip install --upgrade pip - python -m pip install pytest rzpipe meson==0.62.0 ninja coverage ciphey frida objection + python -m pip install pytest rzpipe meson==0.62.0 ninja coverage ciphey frida objection r2pipe==1.8.0 # Install graphviz & ninja sudo apt-get -y install graphviz ninja-build @@ -39,6 +39,14 @@ jobs: sudo ninja -C build install sudo ldconfig -v cd - + + # Install Radare2 (5.8.8) + sudo apt install -y musl-tools + sudo git clone https://github.com/radareorg/radare2 /opt/radare2/ + cd /opt/radare2/ + sudo git checkout 5.8.8 + sudo sys/install.sh + cd - # Install click >= 8.0.0 for CLI supports python -m pip install click==8.0.3 diff --git a/Pipfile b/Pipfile index 6b90bf93..eed1bf14 100644 --- a/Pipfile +++ b/Pipfile @@ -25,6 +25,7 @@ rzpipe = "<=0.1.2" objection = "<=1.11.0" frida = "<=15.2.2" ciphey = ">=5.0.0,<=5.14.0" +r2pipe = "==1.8.0" [requires] python_version = "3.8" diff --git a/quark/cli.py b/quark/cli.py index 6f5a6e28..6d06ca97 100644 --- a/quark/cli.py +++ b/quark/cli.py @@ -133,7 +133,7 @@ "--core-library", "core_library", help="Specify the core library used to analyze an APK", - type=click.Choice(("androguard", "rizin"), case_sensitive=False), + type=click.Choice(("androguard", "rizin", "radare2"), case_sensitive=False), required=False, default="androguard", ) diff --git a/quark/core/axmlreader/__init__.py b/quark/core/axmlreader/__init__.py index a062a37e..d8f7dd98 100644 --- a/quark/core/axmlreader/__init__.py +++ b/quark/core/axmlreader/__init__.py @@ -8,6 +8,7 @@ import pkg_resources import rzpipe +import r2pipe # Resource Types Definition # Please reference to @@ -84,7 +85,7 @@ class AxmlReader(object): A Class that parses the Android XML file """ - def __init__(self, file_path, structure_path=None): + def __init__(self, file_path, core_library="rizin", structure_path=None): if structure_path is None: structure_path = pkg_resources.resource_filename( "quark.core.axmlreader", "axml_definition" @@ -96,10 +97,13 @@ def __init__(self, file_path, structure_path=None): f" of Rizin in {structure_path}" ) - self._rz = rzpipe.open(file_path) - self._rz.cmd(f"pfo {structure_path}") + if core_library == "rizin": + self._core = rzpipe.open(file_path) + else: + self._core = r2pipe.open(file_path) + self._core.cmd(f"pfo {structure_path}") - self._file_size = int(self._rz.cmd("i~size[1]"), 16) + self._file_size = int(self._core.cmd("i~size[1]"), 16) self._ptr = 0 self._cache = {} @@ -110,7 +114,7 @@ def __init__(self, file_path, structure_path=None): raise AxmlException("Filesize exceeds theoretical lower bound.") # File Header - header = self._rz.cmdj("pfj axml_ResChunk_header @ 0x0") + header = self._core.cmdj("pfj axml_ResChunk_header @ 0x0") self._data_type = header[0]["value"] self._axml_size = header[2]["value"] @@ -133,7 +137,7 @@ def __init__(self, file_path, structure_path=None): return # String Pool - string_pool_header = self._rz.cmdj("pfj axml_ResStringPool_header @ 8") + string_pool_header = self._core.cmdj("pfj axml_ResStringPool_header @ 8") string_pool_size = string_pool_header[0]["value"][2]["value"] @@ -163,18 +167,18 @@ def __init__(self, file_path, structure_path=None): self._stringCount = string_pool_header[1]["value"] stringStart = string_pool_header[4]["value"] - self._rz.cmd(f"f string_pool_header @ 0x8 ") + self._core.cmd(f"f string_pool_header @ 0x8 ") string_pool_index = header_size + self._ptr - self._rz.cmd(f"f string_pool_index @ { string_pool_index }") + self._core.cmd(f"f string_pool_index @ { string_pool_index }") string_pool_data = stringStart + self._ptr - self._rz.cmd(f"f string_pool_data @ { string_pool_data }") + self._core.cmd(f"f string_pool_data @ { string_pool_data }") self._ptr += string_pool_size if self._ptr >= self._axml_size: return # Resource Map (Optional) - header = self._rz.cmdj(f"pfj axml_ResChunk_header @ {self._ptr}") + header = self._core.cmdj(f"pfj axml_ResChunk_header @ {self._ptr}") header_type = header[0]["value"] if header_type == RES_XML_RESOURCE_MAP_TYPE: @@ -201,7 +205,7 @@ def __iter__(self) -> Iterator[ResChunkHeader]: :yield: header of a resource chunk defined in the binary """ while self._axml_size - self._ptr >= 16: - header = self._rz.cmdj(f"pfj axml_ResXMLTree_node @ {self._ptr}") + header = self._core.cmdj(f"pfj axml_ResXMLTree_node @ {self._ptr}") node_type = header[0]["value"][0]["value"] header_size = header[0]["value"][1]["value"] @@ -224,7 +228,7 @@ def __iter__(self) -> Iterator[ResChunkHeader]: chunk = {"Address": self._ptr, "Type": node_type} if node_type == RES_XML_START_ELEMENT_TYPE: - ext = self._rz.cmdj( + ext = self._core.cmdj( f"pfj axml_ResXMLTree_attrExt @ { ext_ptr }" ) @@ -235,7 +239,7 @@ def __iter__(self) -> Iterator[ResChunkHeader]: # node['AttrCount'] = ext[4]['value'] elif node_type == RES_XML_END_ELEMENT_TYPE: - ext = self._rz.cmdj( + ext = self._core.cmdj( f"pfj axml_ResXMLTree_endElementExt @ { ext_ptr }" ) @@ -246,7 +250,7 @@ def __iter__(self) -> Iterator[ResChunkHeader]: RES_XML_START_NAMESPACE_TYPE, RES_XML_END_NAMESPACE_TYPE, ]: - ext = self._rz.cmdj( + ext = self._core.cmdj( f"pfj axml_ResXMLTree_namespaceExt @ { ext_ptr }" ) @@ -254,7 +258,7 @@ def __iter__(self) -> Iterator[ResChunkHeader]: chunk["Uri"] = ext[1]["value"][0]["value"] elif node_type == RES_XML_CDATA_TYPE: - ext = self._rz.cmdj( + ext = self._core.cmdj( f"pfj axml_ResXMLTree_cdataExt @ { ext_ptr }" ) @@ -281,7 +285,7 @@ def get_string(self, index): if index < 0 or index >= self._stringCount: return None - return self._rz.cmdj( + return self._core.cmdj( f"pfj Z @ string_pool_data + `pfv n4 " f"@ string_pool_index+ {index}*4` + 2" )[0]["string"] @@ -296,14 +300,14 @@ def get_attributes(self, chunk: ResChunkHeader) -> List[ResValue]: return None extAddress = int(chunk["Address"]) + 16 - attrExt = self._rz.cmdj(f"pfj axml_ResXMLTree_attrExt @ {extAddress}") + attrExt = self._core.cmdj(f"pfj axml_ResXMLTree_attrExt @ {extAddress}") attrAddress = extAddress + attrExt[2]["value"] attributeSize = attrExt[3]["value"] attributeCount = attrExt[4]["value"] attributes = [] for _ in range(attributeCount): - attr = self._rz.cmdj( + attr = self._core.cmdj( f"pfj axml_ResXMLTree_attribute @ {attrAddress}" ) @@ -402,6 +406,6 @@ def get_xml_tree(self) -> XMLElementTree: def __del__(self): try: - self._rz.quit() + self._core.quit() except BaseException: pass diff --git a/quark/core/quark.py b/quark/core/quark.py index 4ecab5e1..423f1acf 100644 --- a/quark/core/quark.py +++ b/quark/core/quark.py @@ -14,6 +14,7 @@ from quark.core.analysis import QuarkAnalysis from quark.core.apkinfo import AndroguardImp from quark.core.rzapkinfo import RizinImp +from quark.core.r2apkinfo import R2Imp from quark.evaluator.pyeval import PyEval from quark.utils import tools from quark.utils.colors import ( @@ -49,6 +50,8 @@ def __init__(self, apk, core_library="androguard"): core_library = core_library.lower() if core_library == "rizin": self.apkinfo = RizinImp(apk) + elif core_library == "radare2": + self.apkinfo = R2Imp(apk) elif core_library == "androguard": self.apkinfo = AndroguardImp(apk) else: diff --git a/quark/core/r2apkinfo.py b/quark/core/r2apkinfo.py new file mode 100644 index 00000000..7e768b77 --- /dev/null +++ b/quark/core/r2apkinfo.py @@ -0,0 +1,702 @@ +# -*- coding: utf-8 -*- +# This file is part of Quark-Engine - https://github.com/quark-engine/quark-engine +# See the file 'LICENSE' for copying permission. + +import functools +import logging +import os.path +import re +import tempfile +import zipfile +from collections import defaultdict, namedtuple +from os import PathLike +from typing import Any, Dict, Generator, List, Optional, Set, Tuple, Union + +import r2pipe + +from quark.core.axmlreader import AxmlReader +from quark.core.interface.baseapkinfo import BaseApkinfo, XMLElement +from quark.core.struct.bytecodeobject import BytecodeObject +from quark.core.struct.methodobject import MethodObject +from quark.utils.tools import ( + descriptor_to_androguard_format, + remove_dup_list, +) + +R2Cache = namedtuple("r2_cache", "address is_imported") + +PRIMITIVE_TYPE_MAPPING = { + "void": "V", + "boolean": "Z", + "byte": "B", + "char": "C", + "short": "S", + "int": "I", + "long": "J", + "float": "F", + "double": "D", +} + +R2_ESCAPE_CHAR_LIST = ["$"] + + +class R2Imp(BaseApkinfo): + def __init__( + self, + apk_filepath: Union[str, PathLike], + tmp_dir: Union[str, PathLike] = None, + ): + super().__init__(apk_filepath, "radare2") + + if self.ret_type == "DEX": + self._tmp_dir = None + + elif self.ret_type == "APK": + self._tmp_dir = tempfile.mkdtemp() if tmp_dir is None else tmp_dir + + # Extract AndroidManifest.xml + with zipfile.ZipFile(self.apk_filepath) as apk: + apk.extract("AndroidManifest.xml", path=self._tmp_dir) + + self._manifest = os.path.join(self._tmp_dir, "AndroidManifest.xml") + + else: + raise ValueError("Unsupported File type.") + + @functools.cached_property + def _r2(self): + """ + Return a R2 object that opens the specified Dex file. + + :param index: an index indicating which Dex file should the returned + object open + :return: a R2 object opening the specified Dex file + """ + if self.ret_type == "DEX": + r2 = r2pipe.open(f"{self.apk_filepath}") + elif self.ret_type == "APK": + r2 = r2pipe.open(f"apk://{self.apk_filepath}") + + r2.cmd("aa") + return r2 + + def _convert_type_to_type_signature(self, raw_type: str): + """ + Convert a Java type in the format of the Java language into the + one in the format of the Java VM type signature. + + For example, + + `int` will be converted into the Java VM type signature `I`. + + `long` will be converted into the Java VM type signature `L`. + + `String...` will be converted into the Java VM type signature + `[Ljava/lang/String;`. + + :param raw_type: a type in the format of the Java language + :return: a type in the format of the Java VM type signature + """ + if not raw_type: + return raw_type + + if raw_type.endswith("[]"): + return "[" + self._convert_type_to_type_signature(raw_type[:-2]) + + if raw_type.startswith("["): + return "[" + self._convert_type_to_type_signature(raw_type[1:]) + + if "..." in raw_type: + index = raw_type.index("...") + return "[" + self._convert_type_to_type_signature(raw_type[:index]) + + if raw_type in PRIMITIVE_TYPE_MAPPING: + return PRIMITIVE_TYPE_MAPPING[raw_type] + + if "." in raw_type or "_" in raw_type: + raw_type = raw_type.replace(".", "/") + raw_type = raw_type.replace("_", "$") + return "L" + raw_type + ";" + + return raw_type + ";" + + @staticmethod + def _escape_str_in_r2_manner(raw_str: str): + """ + Convert characters with special meanings in R2 into `_`. + For now, the character is `$`. + + :param raw_str: a string that may consist of characters with special + meanings. + :return: a new string contains no characters with special meanings. + """ + for c in R2_ESCAPE_CHAR_LIST: + raw_str = raw_str.replace(c, "_") + return raw_str + + def _parse_method_from_isj_obj(self, json_obj): + """ + Parse a JSON object provided by the R2 command `isj` or `is.j` into + an instance of MethodObject. + + :param json_obj: a JSON object provided by the R2 command `isj` or + `is.j` + :param dexindex: an index indicating from which Dex file the JSON + object is generated + :return: an instance of MethodObject + """ + if json_obj.get("type") not in ["FUNC", "METH"]: + return None + + parse_pattern = re.compile(r"(^[\[|L].*)\.method\.(.*)(\(.*\).*)") + + real_name = json_obj.get("realname") + if not real_name: + return None + + class_name, method_name, descriptor = parse_pattern.match(real_name).groups() + + # -- Descriptor -- + descriptor = descriptor_to_androguard_format(descriptor) + + # -- Is imported -- + is_imported = json_obj.get("is_imported") + + # -- Method name -- + method_name = self._escape_str_in_r2_manner(method_name) + if method_name.endswith("_"): + method_name = method_name[:-1] + + # -- Class name -- + + # Exclude start with "imp.[" + if class_name.startswith("["): + return None + + class_name = self._convert_type_to_type_signature(class_name) + + # Append the method + method = MethodObject( + class_name=class_name, + name=method_name, + descriptor=descriptor, + cache=R2Cache(json_obj["vaddr"], is_imported), + ) + + return method + + @functools.lru_cache + def _get_methods_classified(self): + """ + Parse all methods in the specified Dex and convert them into a + dictionary. The dictionary takes their belonging classes as the keys. + Then, it categorizes them into lists. + + :return: a dictionary taking a class name as the key and a list of + MethodObject as the corresponding value. + """ + method_json_list = self._r2.cmdj("isj") + method_dict = defaultdict(list) + for json_obj in method_json_list: + method = self._parse_method_from_isj_obj(json_obj) + + if method: + method_dict[method.class_name].append(method) + + # Remove duplicates + for class_name, method_list in method_dict.items(): + method_dict[class_name] = remove_dup_list(method_list) + + return method_dict + + @functools.cached_property + def permissions(self) -> List[str]: + """ + Inherited from baseapkinfo.py. + Return the permissions used by the sample. + + :return: a list of permissions. + """ + axml = AxmlReader(self._manifest, core_library="radare2") + elm_key_name = "{http://schemas.android.com/apk/res/android}name" + permission_list = set() + for elm in axml.get_xml_tree().iter("uses-permission"): + permission = elm.attrib.get(elm_key_name) + permission_list.add(permission) + + return permission_list + + @functools.cached_property + def application(self) -> XMLElement: + """Get the application element from the manifest file. + + :return: an application element + """ + + axml = AxmlReader(self._manifest, core_library="radare2") + root = axml.get_xml_tree() + + return root.find("application") + + @functools.cached_property + def activities(self) -> List[XMLElement]: + """ + Return all activity from given APK. + + :return: a list of all activities + """ + axml = AxmlReader(self._manifest, core_library="radare2") + root = axml.get_xml_tree() + + return root.findall("application/activity") + + @functools.cached_property + def receivers(self) -> List[XMLElement]: + """ + Return all receivers from the given APK. + + :return: a list of all receivers + """ + axml = AxmlReader(self._manifest, core_library="radare2") + root = axml.get_xml_tree() + + return root.findall("application/receiver") + + @property + def android_apis(self) -> Set[MethodObject]: + """ + Inherited from baseapkinfo.py. + Return all Android native APIs used by the sample. + + :return: a set of MethodObjects + """ + return { + method + for method in self.all_methods + if method.is_android_api() and method.cache.is_imported + } + + @property + def custom_methods(self) -> Set[MethodObject]: + """_ + Inherited from baseapkinfo.py. + Return all custom methods declared by the sample. + + :return: a set of MethodObjects + """ + return { + method + for method in self.all_methods + if not method.cache.is_imported + } + + @functools.cached_property + def all_methods(self) -> Set[MethodObject]: + """_ + Inherited from baseapkinfo.py. + Return all methods including Android native APIs and custom methods + declared in the sample. + + :return: a set of MethodObjects + """ + method_set = set() + for method_list in self._get_methods_classified().values(): + method_set.update(method_list) + + return method_set + + def find_method( + self, + class_name: Optional[str] = ".*", + method_name: Optional[str] = ".*", + descriptor: Optional[str] = ".*", + ) -> List[MethodObject]: + """ + Inherited from baseapkinfo.py. + Find a method with the given class name, method name, and descriptor. + + :param class_name: the class name of the target method. Defaults to + ".*" + :param method_name: the method name of the target method. Defaults to + ".*" + :param descriptor: the descriptor of the target method. Defaults to + ".*" + :return: a list of the target MethodObject + """ + if not class_name: + class_name = ".*" + + if not method_name: + method_name = ".*" + + if method_name != ".*": + method_name = re.escape(method_name) + + if not descriptor: + descriptor = ".*" + + if descriptor != ".*": + descriptor = re.escape(descriptor) + + def method_filter(method): + return re.match(method_name, method.name) and re.match( + descriptor, method.descriptor + ) + + filtered_methods = list() + + if class_name != ".*": + method_dict = self._get_methods_classified() + filtered_methods += list( + filter(method_filter, method_dict[class_name]) + ) + else: + method_dict = self._get_methods_classified() + for key_name in method_dict: + filtered_methods += list( + filter(method_filter, method_dict[key_name]) + ) + + return filtered_methods + + @functools.lru_cache + def upperfunc(self, method_object: MethodObject) -> Set[MethodObject]: + """ + Inherited from baseapkinfo.py. + Find the xrefs from the specified method. + + :param method_object: a target method which the returned methods + should call + :return: a set of MethodObjects + """ + cache = method_object.cache + + xrefs = self._r2.cmdj(f"axtj @ {cache.address}") + upperfunc_set = set() + for xref in xrefs: + if xref["type"] != "CALL": + continue + + if "from" in xref: + matched_method = self._get_method_by_address(xref["from"]) + if not matched_method: + logging.debug( + f"Cannot identify function at {xref['from']}." + ) + continue + + upperfunc_set.add(matched_method) + else: + logging.debug( + f"Key from was not found when trying to search" + f" upper methods of {method_object}." + ) + + return upperfunc_set + + @functools.lru_cache + def lowerfunc( + self, method_object: MethodObject + ) -> Set[Tuple[MethodObject, int]]: + """ + Inherited from baseapkinfo.py. + Find the xrefs to the specified method. + + :param method_object: a target method used to find what methods it + calls + :return: a set of tuples consisting of the called method and the + offset of the invocation + """ + cache = method_object.cache + + instruct_flow = self._r2.cmdj(f"pdfj @ {cache.address}")["ops"] + + lowerfunc_list = [] + for ins in instruct_flow: + if "refs" in ins: + call_xrefs = ( + xref + for xref in ins["refs"] + if xref["type"] == "CALL" + ) + + for call_xref in call_xrefs: + lowerfunc = self._get_method_by_address(call_xref["addr"]) + if not lowerfunc: + logging.debug( + f"Cannot identify function at {call_xref['addr']}." + ) + continue + + offset = ins["offset"] - cache.address + + lowerfunc_list.append((lowerfunc, offset)) + + return lowerfunc_list + + def get_method_bytecode( + self, method_object: MethodObject + ) -> Generator[BytecodeObject, None, None]: + """ + Inherited from baseapkinfo.py. + Return the bytecodes of the specified method. + + :param method_object: a target method to get the corresponding + bytecodes + :yield: a generator of BytecodeObjects + """ + cache = method_object.cache + if not cache.is_imported: + + instruct_flow = self._r2.cmdj(f"pdfj @ {cache.address}")["ops"] + if instruct_flow: + for ins in instruct_flow: + if "disasm" not in ins: + continue + + yield self._parse_smali(ins["disasm"]) + + def get_strings(self) -> Set[str]: + """ + Inherited from baseapkinfo.py. + Return all strings in the sample. + + :return: a set of strings + """ + strings = set() + string_detail_list = self._r2.cmdj("izzj") + strings.update( + [string_detail["string"] for string_detail in string_detail_list] + ) + + return strings + + def get_wrapper_smali( + self, + parent_method: MethodObject, + first_method: MethodObject, + second_method: MethodObject, + ) -> Dict[str, Union[BytecodeObject, str]]: + """ + Inherited from baseapkinfo.py. + Find the invocations that call two specified methods, first_method + and second_method, respectively. Then, return a dictionary storing + the corresponding bytecodes and hex values. + + :param parent_method: a parent method to scan + :param first_method: the first method called by the parent method + :param second_method: the second method called by the parent method + :return: a dictionary storing the corresponding bytecodes and hex + values. + """ + + def convert_bytecode_to_list(bytecode): + return [bytecode.mnemonic] + bytecode.registers + [bytecode.parameter] + + cache = parent_method.cache + + result = { + "first": None, + "first_hex": None, + "second": None, + "second_hex": None, + } + + search_pattern = "{class_name}.{name}{descriptor}" + first_method_pattern = search_pattern.format( + class_name=first_method.class_name[:-1], + name=first_method.name, + descriptor=first_method.descriptor, + ) + second_method_pattern = search_pattern.format( + class_name=second_method.class_name[:-1], + name=second_method.name, + descriptor=second_method.descriptor, + ) + + if cache.is_imported: + return {} + + instruction_flow = self._r2.cmdj(f"pdfj @ {cache.address}")["ops"] + + if instruction_flow: + for ins in instruction_flow: + # Skip the instruction without disam field. + if "disam" not in ins: + continue + + if ins["disasm"].startswith("invoke"): + if ";" in ins["disasm"]: + index = ins["disasm"].rindex(";") + instrcution_string = ins["disasm"][:index] + + if first_method_pattern in instrcution_string: + result["first"] = convert_bytecode_to_list( + self._parse_smali(instrcution_string) + ) + result["first_hex"] = " ".join( + map( + lambda r: r.group(0), + re.finditer(r"\w{2}", ins["bytes"]), + ) + ) + if second_method_pattern in instrcution_string: + result["second"] = convert_bytecode_to_list( + self._parse_smali(instrcution_string) + ) + result["second_hex"] = " ".join( + map( + lambda r: r.group(0), + re.finditer(r"\w{2}", ins["bytes"]), + ) + ) + + return result + + @functools.cached_property + def superclass_relationships(self) -> Dict[str, Set[str]]: + """ + Inherited from baseapkinfo.py. + Return a dictionary holding the inheritance relationship of classes in + the sample. The dictionary takes a class name as the key and the + corresponding superclass as the value. + + :return: a dictionary taking a class name as the key and the + corresponding superclass as the value. + """ + hierarchy_dict = defaultdict(set) + + class_info_list = self._r2.cmdj("icj") + for class_info in class_info_list: + class_name = class_info["classname"] + class_name = self._convert_type_to_type_signature(class_name) + super_classes = class_info["super"] + + for super_class in super_classes: + hierarchy_dict[class_name].add(super_class) + + return hierarchy_dict + + @functools.cached_property + def subclass_relationships(self) -> Dict[str, Set[str]]: + """ + Inherited from baseapkinfo.py. + Return a dictionary holding the inheritance relationship of classes in + the sample. Return a dictionary holding the inheritance relationship + of classes in the sample. The dictionary takes a class name as the key + and the corresponding subclasses as the value. + + :return: a dictionary taking a class name as the key and the + corresponding subclasses as the value. + """ + hierarchy_dict = defaultdict(set) + + class_info_list = self._r2.cmdj("icj") + for class_info in class_info_list: + class_name = class_info["classname"] + super_class = class_info["super"] + + hierarchy_dict[super_class].add(class_name) + + return hierarchy_dict + + def _get_method_by_address(self, address: int) -> MethodObject: + """ + Find a method via a specified address. + + :param address: an address used to find the corresponding method + :return: the MethodObject of the method in the given address + """ + json_data = self._r2.cmdj(f"is.j @ {address}") + json_data = json_data.get("symbols") + + if json_data: + return self._parse_method_from_isj_obj(json_data) + else: + return None + + def _get_string_by_address(self, address: str) -> str: + """ + Find the content of string via the specified string address. + + :param address: an address used to find the corresponding method + :return: the content in the given address + """ + content = self._r2.cmd(f"pfq z @ {int(address, 16)}") + return content + + @staticmethod + def _parse_parameter(parameter: str, p_type: str = "int") -> Any: + """Parse the value of the parameter based on the mnemonic. + + :param mnemonic: the mnemonic of a bytecode + :param parameter: the parameter of a bytecode + :return: the value of the parameter + """ + if p_type == "int": + try: + parameter = int(parameter, 16) + except (TypeError, ValueError): + return R2Imp._parse_parameter(parameter, "float") + + elif p_type == "float": + try: + parameter = float(parameter) + except (TypeError, ValueError): + return R2Imp._parse_parameter(parameter, "str") + + elif p_type == "str": + parameter = re.sub(r"\.", ";->", parameter, count=1) + # Skip extra parameter. e.g. 0x18a or space + parameter = parameter.split(" ;")[0] + + return parameter + + def _parse_smali(self, smali: str) -> BytecodeObject: + """ + Convert a Smali code provided by the R2 command `pdfj` into a + BytecodeObject. + + :param smali: a Smali code provided by the R2 command `pdfj` + :raises ValueError: if the Smali code follows an unknown format + :return: a BytecodeObject + """ + if smali == "": + raise ValueError("Argument cannot be empty.") + + if " " not in smali: + return BytecodeObject(smali, None, None) + + mnemonic, args = smali.split(maxsplit=1) # Split into twe parts + + args = [arg.strip() for arg in re.split("[{},]+", args) if arg] + + if mnemonic == "const-string" and args[-1][:2] == "0x": + args[-1] = self._get_string_by_address(args[-1]) + + parameter = None + # Remove the parameter at the last + if args and not args[-1].startswith("v"): + parameter = R2Imp._parse_parameter(args[-1]) + args = args[:-1] + + register_list = [] + # Ranged registers + if len(args) == 1 and (":" in args[0] or ".." in args[0]): + register_list = args[0] + register_list = [ + int(reg[1:]) for reg in re.split("[:.]+", register_list) if reg + ] + + if ".." in args[0]: + register_list = range(register_list[0], register_list[1] + 1) + + # Simple registers + elif len(args) != 0: + try: + register_list = [int(arg[1:]) for arg in args] + except ValueError: + raise ValueError( + f"Cannot parse bytecode. Unknown smali {smali}." + ) + + register_list = [f"v{index}" for index in register_list] + + return BytecodeObject(mnemonic, register_list, parameter) diff --git a/setup.py b/setup.py index 60c5842e..e868ec90 100644 --- a/setup.py +++ b/setup.py @@ -15,6 +15,7 @@ "plotly", "rzpipe", "click", + "r2pipe==1.8.0" ] # # "kaleido", diff --git a/tests/core/test_apkinfo.py b/tests/core/test_apkinfo.py index c8d1c873..3bb8506d 100644 --- a/tests/core/test_apkinfo.py +++ b/tests/core/test_apkinfo.py @@ -7,6 +7,7 @@ from quark.core.apkinfo import AndroguardImp from quark.core.interface.baseapkinfo import BaseApkinfo from quark.core.rzapkinfo import RizinImp +from quark.core.r2apkinfo import R2Imp from quark.core.struct.bytecodeobject import BytecodeObject from quark.core.struct.methodobject import MethodObject @@ -28,7 +29,7 @@ def apk_path(): @pytest.fixture( scope="function", - params=((AndroguardImp), (RizinImp)), + params=((AndroguardImp), (RizinImp), (R2Imp)), ) def apkinfo(request, apk_path): Apkinfo, apk_path = request.param, apk_path @@ -37,6 +38,33 @@ def apkinfo(request, apk_path): yield apkinfo +@pytest.fixture( + scope="function", + params=((AndroguardImp), (RizinImp)), +) +def apkinfo_without_R2Imp(request, apk_path): + """Since R2 has some issue, + create this function to skip R2 relevant test for some test functions. + """ + Apkinfo, apk_path = request.param, apk_path + apkinfo = Apkinfo(apk_path) + + yield apkinfo + + +@pytest.fixture( + scope="function", + params=((R2Imp),), +) +def apkinfo_with_R2Imp_only(request, apk_path): + """For testcases involved with R2 core lib. + """ + Apkinfo, apk_path = request.param, apk_path + apkinfo = Apkinfo(apk_path) + + yield apkinfo + + @pytest.fixture(scope="function") def dex_file(): APK_SOURCE = ( @@ -290,7 +318,8 @@ def test_find_method(apkinfo, test_input, expected): assert isinstance(result, list) assert expect_method in result - def test_upperfunc(self, apkinfo): + def test_upperfunc(self, apkinfo_without_R2Imp): + apkinfo = apkinfo_without_R2Imp api = apkinfo.find_method( "Lcom/example/google/service/ContactsHelper;", "", @@ -307,25 +336,8 @@ def test_upperfunc(self, apkinfo): assert expect_function in upper_methods - def test_lowerfunc(self, apkinfo): - method = apkinfo.find_method( - "Lcom/example/google/service/WebServiceCalling;", - "Send", - "(Landroid/os/Handler; Ljava/lang/String;)V", - )[0] - - expect_method = MethodObject( - "Ljava/lang/StringBuilder;", - "append", - "(Ljava/lang/String;)Ljava/lang/StringBuilder;", - ) - expect_offset = 42 - - upper_methods = apkinfo.lowerfunc(method) - - assert (expect_method, expect_offset) in upper_methods - - def test_get_method_bytecode(self, apkinfo): + def test_get_method_bytecode(self, apkinfo_without_R2Imp): + apkinfo = apkinfo_without_R2Imp expected_bytecode_list = [ BytecodeObject( "iput-object", @@ -365,7 +377,8 @@ def test_get_method_bytecode(self, apkinfo): for expected in expected_bytecode_list: assert expected in bytecodes - def test_lowerfunc(self, apkinfo): + def test_lowerfunc(self, apkinfo_without_R2Imp): + apkinfo = apkinfo_without_R2Imp method = apkinfo.find_method( "Lcom/example/google/service/SMSReceiver;", "isContact", @@ -390,3 +403,32 @@ def test_superclass_relationships_with_expected_class(self, apkinfo): upper_set = apkinfo.superclass_relationships[class_name] assert expected_upper_class == upper_set + + + @staticmethod + @pytest.mark.parametrize( + "test_input, expected", + [ + ( + "Landroid/view/KeyEvent;", + str, + ), + ( + 0x3e8, + float, + ), + ( + ("Ljava/lang/StringBuilder;->append(Ljava/lang/String;)" + "Ljava/lang/StringBuilder;"), + str, + ), + ( + "str.google.c.a.tc", + str, + ), + ], + ) + def test_parse_parameter(test_input, expected, apkinfo_with_R2Imp_only): + apkinfo = apkinfo_with_R2Imp_only + parsed_param = apkinfo._parse_parameter(test_input) + assert isinstance(parsed_param, expected) diff --git a/tests/core/test_axmlreader.py b/tests/core/test_axmlreader.py index 417b7dc4..fce545d5 100644 --- a/tests/core/test_axmlreader.py +++ b/tests/core/test_axmlreader.py @@ -11,6 +11,16 @@ from quark.core.axmlreader import AxmlReader, ResValue + +@pytest.fixture( + scope="function", + params=(("radare2"), ("rizin")), +) +def core_library(request): + core_lib = request.param + yield core_lib + + def extractManifest(samplePath: PathLike) -> str: folder = Path(samplePath).parent @@ -27,8 +37,8 @@ def MANIFEST_PATH_14d9f(SAMPLE_PATH_14d9f): class TestAxmlReader: @staticmethod - def testIter(MANIFEST_PATH_14d9f) -> None: - axmlReader = AxmlReader(MANIFEST_PATH_14d9f) + def testIter(core_library, MANIFEST_PATH_14d9f) -> None: + axmlReader = AxmlReader(MANIFEST_PATH_14d9f, core_library) expectedTag = {"Address": 3728, "Type": 256, "Prefix": 9, "Uri": 10} tag = next(iter(axmlReader)) @@ -37,23 +47,23 @@ def testIter(MANIFEST_PATH_14d9f) -> None: helper.assertDictEqual(tag, expectedTag) @staticmethod - def testFileSize(MANIFEST_PATH_14d9f): - axmlReader = AxmlReader(MANIFEST_PATH_14d9f) + def testFileSize(core_library, MANIFEST_PATH_14d9f): + axmlReader = AxmlReader(MANIFEST_PATH_14d9f, core_library) assert axmlReader.file_size == 7676 @staticmethod - def testAxmlSize(MANIFEST_PATH_14d9f): - axmlReader = AxmlReader(MANIFEST_PATH_14d9f) + def testAxmlSize(core_library, MANIFEST_PATH_14d9f): + axmlReader = AxmlReader(MANIFEST_PATH_14d9f, core_library) assert axmlReader.axml_size == 7676 @staticmethod - def testGetString(MANIFEST_PATH_14d9f): - axmlReader = AxmlReader(MANIFEST_PATH_14d9f) + def testGetString(core_library, MANIFEST_PATH_14d9f): + axmlReader = AxmlReader(MANIFEST_PATH_14d9f, core_library) assert axmlReader.get_string(13) == "manifest" @staticmethod - def testGetAttributes(MANIFEST_PATH_14d9f): - axmlReader = AxmlReader(MANIFEST_PATH_14d9f) + def testGetAttributes(core_library, MANIFEST_PATH_14d9f): + axmlReader = AxmlReader(MANIFEST_PATH_14d9f, core_library) manifestTag = list(axmlReader)[1] expectedAttributes = [ @@ -68,8 +78,8 @@ def testGetAttributes(MANIFEST_PATH_14d9f): assert expectedAttrib == attrib @staticmethod - def testGetXmlTree(MANIFEST_PATH_14d9f): - axmlReader = AxmlReader(MANIFEST_PATH_14d9f) + def testGetXmlTree(core_library, MANIFEST_PATH_14d9f): + axmlReader = AxmlReader(MANIFEST_PATH_14d9f, core_library) xml = axmlReader.get_xml_tree() manifestLabel = xml.getroot() assert len(manifestLabel.findall("uses-sdk")) == 1