diff --git a/.github/workflows/pytest.yml b/.github/workflows/pytest.yml index 7aefeb07..6ab8c8e1 100644 --- a/.github/workflows/pytest.yml +++ b/.github/workflows/pytest.yml @@ -31,8 +31,9 @@ jobs: # Install Rizin - sudo git clone --branch v0.3.4 https://github.com/rizinorg/rizin /opt/rizin/ + sudo git clone https://github.com/rizinorg/rizin /opt/rizin/ cd /opt/rizin/ + sudo git checkout de8a5cac5532845643a52d1231b17a7b34feb50a meson build ninja -C build sudo ninja -C build install diff --git a/quark/core/rzapkinfo.py b/quark/core/rzapkinfo.py index 446ab39a..7e621248 100644 --- a/quark/core/rzapkinfo.py +++ b/quark/core/rzapkinfo.py @@ -10,7 +10,7 @@ import zipfile from collections import defaultdict, namedtuple from os import PathLike -from typing import Dict, Generator, List, Optional, Set, Union +from typing import Dict, Generator, List, Optional, Set, Tuple, Union import rzpipe @@ -32,15 +32,6 @@ "long": "J", "float": "F", "double": "D", - "Boolean": "Ljava/lang/Boolean;", - "Byte": "Ljava/lang/Byte;", - "Character": "Ljava/lang/Character;", - "Short": "Ljava/lang/Short;", - "Integer": "Ljava/lang/Integer;", - "Long": "Ljava/lang/Long;", - "Float": "Ljava/lang/Float;", - "Double": "Ljava/lang/Double;", - "String": "Ljava/lang/String;", } RIZIN_ESCAPE_CHAR_LIST = ["<", ">", "$"] @@ -84,17 +75,44 @@ def __init__( @functools.lru_cache def _get_rz(self, index): + """ + Return a Rizin object that opens the specified Dex file. + + :param index: an index indicating which Dex file should the returned + object open + :return: a Rizin object opening the specified Dex file + """ rz = rzpipe.open(self._dex_list[index]) rz.cmd("aa") return rz def _convert_type_to_type_signature(self, raw_type: str): + """ + Convert a Java type in the format of the Java language into the + one in the format of the Java VM type signature. + + For example, + + `int` will be converted into the Java VM type signature `I`. + + `long` will be converted into the Java VM type signature `L`. + + `String...` will be converted into the Java VM type signature + `[Ljava/lang/String;`. + + :param raw_type: a type in the format of the Java language + :return: a type in the format of the Java VM type signature + """ + if not raw_type: + return raw_type + if raw_type.endswith("[]"): return "[" + self._convert_type_to_type_signature(raw_type[:-2]) if raw_type.startswith("["): return "[" + self._convert_type_to_type_signature(raw_type[1:]) + if "..." in raw_type: + index = raw_type.index("...") + return "[" + self._convert_type_to_type_signature(raw_type[:index]) + if raw_type in PRIMITIVE_TYPE_MAPPING: return PRIMITIVE_TYPE_MAPPING[raw_type] @@ -103,121 +121,154 @@ def _convert_type_to_type_signature(self, raw_type: str): raw_type = raw_type.replace("_", "$") return "L" + raw_type + ";" - return raw_type + return "Ljava/lang/" + raw_type + ";" @staticmethod def _escape_str_in_rizin_manner(raw_str: str): + """ + Convert characters with special meanings in Rizin into `_`. + For now, these characters are `<`, `>` and `$`. + + :param raw_str: a string that may consist of characters with special + meanings. + :return: a new string contains no characters with special meanings. + """ for c in RIZIN_ESCAPE_CHAR_LIST: raw_str = raw_str.replace(c, "_") return raw_str - @functools.lru_cache - def _get_methods_classified(self, dexindex): - rz = self._get_rz(dexindex) + def _parse_method_from_isj_obj(self, json_obj, dexindex): + """ + Parse a JSON object provided by the Rizin command `isj` or `is.j` into + an instance of MethodObject. + + :param json_obj: a JSON object provided by the Rizin command `isj` or + `is.j` + :param dexindex: an index indicating from which Dex file the JSON + object is generated + :return: an instance of MethodObject + """ + if json_obj.get("type") not in ["FUNC", "METH"]: + return None - method_json_list = rz.cmdj("isj") - method_dict = defaultdict(list) - for json_obj in method_json_list: - if json_obj.get("type") not in ["FUNC", "METH"]: - continue + # -- Descriptor -- + full_method_name = json_obj["name"] + raw_argument_str = next( + re.finditer("\\(.*\\).*", full_method_name), None + ) + if raw_argument_str is None: + return None + + raw_argument_str = raw_argument_str.group(0) + + if raw_argument_str.endswith(")"): + # Convert Java lauguage type to JVM type signature - # -- Descriptor -- - full_method_name = json_obj["name"] - raw_argument_str = next( - re.finditer("\\(.*\\).*", full_method_name), None + # Parse the arguments + raw_argument_str = raw_argument_str[1:-1] + arguments = [ + self._convert_type_to_type_signature(arg) + for arg in raw_argument_str.split(", ") + ] + + # Parse the return type + return_type = next( + re.finditer( + "[A-Za-zL][A-Za-z0-9L/\\;[\\]$.]+ ", full_method_name + ), + None, + ) + if return_type is None: + print(f"Unresolved method signature: {full_method_name}") + return None + return_type = return_type.group(0).strip() + + # Convert + raw_argument_str = ( + "(" + + " ".join(arguments) + + ")" + + self._convert_type_to_type_signature(return_type) ) - if raw_argument_str is None: - continue - raw_argument_str = raw_argument_str.group(0) - if raw_argument_str.endswith(")"): - # Convert Java lauguage type to JVM type signature + descriptor = descriptor_to_androguard_format(raw_argument_str) - # Parse the arguments - raw_argument_str = raw_argument_str[1:-1] - arguments = [ - self._convert_type_to_type_signature(arg) - for arg in raw_argument_str.split(", ") - ] + # -- Method name -- + method_name = json_obj["realname"] - # Parse the return type - return_type = next( - re.finditer( - "[A-Za-zL][A-Za-z0-9L/\\;[\\]$.]+ ", full_method_name - ), - None, - ) - if return_type is None: - print(f"Unresolved method signature: {full_method_name}") - continue - return_type = return_type.group(0).strip() - - # Convert - raw_argument_str = ( - "(" - + " ".join(arguments) - + ")" - + self._convert_type_to_type_signature(return_type) - ) + # -- Is imported -- + is_imported = json_obj["is_imported"] - descriptor = descriptor_to_androguard_format(raw_argument_str) + # -- Class name -- + # Test if the class name is truncated + escaped_method_name = self._escape_str_in_rizin_manner(method_name) + if escaped_method_name.endswith("_"): + escaped_method_name = escaped_method_name[:-1] - # -- Method name -- - method_name = json_obj["realname"] + flag_name = json_obj["flagname"] - # -- Is imported -- - is_imported = json_obj["is_imported"] + # sym.imp.clone doesn't belong to a class + if flag_name == "sym.imp.clone": + method = MethodObject( + class_name="", + name="clone", + descriptor="()Ljava/lang/Object;", + cache=RizinCache(json_obj["vaddr"], dexindex, is_imported), + ) + return method - # -- Class name -- - # Test if the class name is truncated - escaped_method_name = self._escape_str_in_rizin_manner(method_name) - if escaped_method_name.endswith("_"): - escaped_method_name = escaped_method_name[:-1] + if escaped_method_name not in flag_name: + logging.warning( + f"The class name may be truncated: {json_obj['flagname']}" + ) - flag_name = json_obj["flagname"] + # Drop the method name + match = None + for match in re.finditer("_+[A-Za-z]+", flag_name): + pass + if match is None: + logging.warning(f"Skip the damaged flag: {json_obj['flagname']}") + return None + match = match.group(0) + flag_name = flag_name[: flag_name.rfind(match)] - # sym.imp.clone doesn't belong to a class - if flag_name == "sym.imp.clone": - method = MethodObject( - class_name="", - name="clone", - descriptor="()Ljava/lang/Object;", - cache=RizinCache(json_obj["vaddr"], dexindex, is_imported), - ) - method_dict[""].append(method) - continue + # Drop the prefixes sym. and imp. + while flag_name.startswith("sym.") or flag_name.startswith("imp."): + flag_name = flag_name[4:] - if escaped_method_name not in flag_name: - logging.warning( - f"The class name may be truncated: {json_obj['flagname']}" - ) + class_name = self._convert_type_to_type_signature(flag_name) - # Drop the method name - match = None - for match in re.finditer("_+[A-Za-z]+", flag_name): - pass - if match is None: - logging.warning( - f"Skip the damaged flag: {json_obj['flagname']}" - ) - continue - match = match.group(0) - flag_name = flag_name[: flag_name.rfind(match)] + # Append the method + method = MethodObject( + class_name=class_name, + name=method_name, + descriptor=descriptor, + cache=RizinCache(json_obj["vaddr"], dexindex, is_imported), + ) - # Drop the prefixes sym. and imp. - while flag_name.startswith("sym.") or flag_name.startswith("imp."): - flag_name = flag_name[4:] + return method - class_name = self._convert_type_to_type_signature(flag_name) + @functools.lru_cache + def _get_methods_classified(self, dexindex): + """ + Parse all methods in the specified Dex and convert them into a + dictionary. The dictionary takes their belonging classes as the keys. + Then, it categorizes them into lists. + + :param dexindex: an index indicating which Dex file should this method + parse + :return: a dictionary taking a class name as the key and a list of + MethodObject as the corresponding value. + """ + rz = self._get_rz(dexindex) - # Append the method - method = MethodObject( - class_name=class_name, - name=method_name, - descriptor=descriptor, - cache=RizinCache(json_obj["vaddr"], dexindex, is_imported), - ) - method_dict[class_name].append(method) + method_json_list = rz.cmdj("isj") + method_dict = defaultdict(list) + for json_obj in method_json_list: + method = self._parse_method_from_isj_obj(json_obj, dexindex) + + if method: + method_dict[method.class_name].append(method) # Remove duplicates for class_name, method_list in method_dict.items(): @@ -227,6 +278,12 @@ def _get_methods_classified(self, dexindex): @functools.cached_property def permissions(self) -> List[str]: + """ + Inherited from baseapkinfo.py. + Return the permissions used by the sample. + + :return: a list of permissions. + """ axml = AxmlReader(self._manifest) permission_list = set() @@ -243,6 +300,12 @@ def permissions(self) -> List[str]: @property def android_apis(self) -> Set[MethodObject]: + """ + Inherited from baseapkinfo.py. + Return all Android native APIs used by the sample. + + :return: a set of MethodObjects + """ return { method for method in self.all_methods @@ -251,10 +314,27 @@ def android_apis(self) -> Set[MethodObject]: @property def custom_methods(self) -> Set[MethodObject]: - return {method for method in self.all_methods if not method.cache.is_imported} + """_ + Inherited from baseapkinfo.py. + Return all custom methods declared by the sample. + + :return: a set of MethodObjects + """ + return { + method + for method in self.all_methods + if not method.cache.is_imported + } @functools.cached_property def all_methods(self) -> Set[MethodObject]: + """_ + Inherited from baseapkinfo.py. + Return all methods including Android native APIs and custom methods + declared in the sample. + + :return: a set of MethodObjects + """ method_set = set() for dex_index in range(self._number_of_dex): for method_list in self._get_methods_classified(dex_index).values(): @@ -268,6 +348,19 @@ def find_method( method_name: Optional[str] = ".*", descriptor: Optional[str] = ".*", ) -> MethodObject: + """ + Inherited from baseapkinfo.py. + Find a method with the given class name, method name, and descriptor. + + :param class_name: the class name of the target method. Defaults to + ".*" + :param method_name: the method name of the target method. Defaults to + ".*" + :param descriptor: the descriptor of the target method. Defaults to + ".*" + :return: a MethodObject of the target method + """ + def method_filter(method): return (not method_name or method_name == method.name) and ( not descriptor or descriptor == method.descriptor @@ -285,6 +378,14 @@ def method_filter(method): @functools.lru_cache def upperfunc(self, method_object: MethodObject) -> Set[MethodObject]: + """ + Inherited from baseapkinfo.py. + Find the xrefs from the specified method. + + :param method_object: a target method which the returned methods + should call + :return: a set of MethodObjects + """ cache = method_object.cache r2 = self._get_rz(cache.dexindex) @@ -296,66 +397,76 @@ def upperfunc(self, method_object: MethodObject) -> Set[MethodObject]: if xref["type"] != "CALL": continue - if "fcn_addr" in xref: - matched_method = self._get_method_by_address(xref["fcn_addr"]) + if "from" in xref: + matched_method = self._get_method_by_address(xref["from"]) if not matched_method: logging.debug( - f"Cannot identify function at {xref['fcn_addr']}." + f"Cannot identify function at {xref['from']}." ) continue upperfunc_set.add(matched_method) else: logging.debug( - f"Key from was not found at searching" - f" upper methods for {method_object}." + f"Key from was not found when trying to search" + f" upper methods of {method_object}." ) return upperfunc_set @functools.lru_cache - def lowerfunc(self, method_object: MethodObject) -> Set[MethodObject]: + def lowerfunc( + self, method_object: MethodObject + ) -> Set[Tuple[MethodObject, int]]: + """ + Inherited from baseapkinfo.py. + Find the xrefs to the specified method. + + :param method_object: a target method used to find what methods it + calls + :return: a set of tuples consisting of the called method and the + offset of the invocation + """ cache = method_object.cache - r2 = self._get_rz(cache.dexindex) - - xrefs = r2.cmdj(f"axffj @ {cache.address}") + rz = self._get_rz(cache.dexindex) - if not xrefs: - return set() + instruct_flow = rz.cmdj(f"pdfj @ {cache.address}")["ops"] - lowerfunc_set = set() - for xref in xrefs: - if xref["type"] != "CALL": - continue + lowerfunc_list = [] + for ins in instruct_flow: + if "xrefs_from" in ins: + call_xrefs = ( + xref + for xref in ins["xrefs_from"] + if xref["type"] == "CALL" + ) - if "to" in xref: - matched_method = self._get_method_by_address(xref["to"]) - if not matched_method: - logging.debug( - f"Cannot identify function at {xref['fcn_addr']}." - ) - continue + for call_xref in call_xrefs: + lowerfunc = self._get_method_by_address(call_xref["addr"]) + if not lowerfunc: + logging.debug( + f"Cannot identify function at {call_xref['addr']}." + ) + continue - offset = xref["from"] - cache.address + offset = ins["offset"] - cache.address - lowerfunc_set.add( - ( - matched_method, - offset, - ) - ) - else: - logging.debug( - f"Key from was not found at searching" - f" upper methods for {method_object}." - ) + lowerfunc_list.append((lowerfunc, offset)) - return lowerfunc_set + return lowerfunc_list def get_method_bytecode( self, method_object: MethodObject ) -> Generator[BytecodeObject, None, None]: + """ + Inherited from baseapkinfo.py. + Return the bytecodes of the specified method. + + :param method_object: a target method to get the corresponding + bytecodes + :yield: a generator of BytecodeObjects + """ cache = method_object.cache if not cache.is_imported: @@ -369,6 +480,12 @@ def get_method_bytecode( yield self._parse_smali(ins["disasm"]) def get_strings(self) -> Set[str]: + """ + Inherited from baseapkinfo.py. + Return all strings in the sample. + + :return: a set of strings + """ strings = set() for dex_index in range(self._number_of_dex): rz = self._get_rz(dex_index) @@ -386,6 +503,19 @@ def get_wrapper_smali( first_method: MethodObject, second_method: MethodObject, ) -> Dict[str, Union[BytecodeObject, str]]: + """ + Inherited from baseapkinfo.py. + Find the invocations that call two specified methods, first_method + and second_method, respectively. Then, return a dictionary storing + the corresponding bytecodes and hex values. + + :param parent_method: a parent method to scan + :param first_method: the first method called by the parent method + :param second_method: the second method called by the parent method + :return: a dictionary storing the corresponding bytecodes and hex + values. + """ + def convert_bytecode_to_list(bytecode): return [bytecode.mnemonic] + bytecode.registers + [bytecode.parameter] @@ -449,6 +579,15 @@ def convert_bytecode_to_list(bytecode): @functools.cached_property def superclass_relationships(self) -> Dict[str, Set[str]]: + """ + Inherited from baseapkinfo.py. + Return a dictionary holding the inheritance relationship of classes in + the sample. The dictionary takes a class name as the key and the + corresponding superclass as the value. + + :return: a dictionary taking a class name as the key and the + corresponding superclass as the value. + """ hierarchy_dict = defaultdict(set) for dex_index in range(self._number_of_dex): @@ -466,6 +605,16 @@ def superclass_relationships(self) -> Dict[str, Set[str]]: @functools.cached_property def subclass_relationships(self) -> Dict[str, Set[str]]: + """ + Inherited from baseapkinfo.py. + Return a dictionary holding the inheritance relationship of classes in + the sample. Return a dictionary holding the inheritance relationship + of classes in the sample. The dictionary takes a class name as the key + and the corresponding subclasses as the value. + + :return: a dictionary taking a class name as the key and the + corresponding subclasses as the value. + """ hierarchy_dict = defaultdict(set) for dex_index in range(self._number_of_dex): @@ -482,15 +631,32 @@ def subclass_relationships(self) -> Dict[str, Set[str]]: return hierarchy_dict def _get_method_by_address(self, address: int) -> MethodObject: - if address < 0: - return None + """ + Find a method via a specified address. + + :param address: an address used to find the corresponding method + :return: the MethodObject of the method in the given address + """ + dexindex = 0 - for method in self.all_methods: - if method.cache.address == address: - return method + rz = self._get_rz(dexindex) + json_array = rz.cmdj(f"is.j @ {address}") + + if json_array: + return self._parse_method_from_isj_obj(json_array[0], dexindex) + else: + return None @staticmethod def _parse_smali(smali: str) -> BytecodeObject: + """ + Convert a Smali code provided by the Rizin command `pdfj` into a + BytecodeObject. + + :param smali: a Smali code provided by the Rizin command `pdfj` + :raises ValueError: if the Smali code follows an unknown format + :return: a BytecodeObject + """ if smali == "": raise ValueError("Argument cannot be empty.") diff --git a/quark/utils/tools.py b/quark/utils/tools.py index eb3c3eb7..60b87c8f 100644 --- a/quark/utils/tools.py +++ b/quark/utils/tools.py @@ -43,12 +43,22 @@ def contains(subset_to_check, target_list): def descriptor_to_androguard_format(descriptor): + """ + Insert a space between the arguments of the given descriptor. + + :param descriptor: a descriptor whose arguments may or may not be + separated by spaces + :raises ValueError: if the descriptor is not surrounded by + parentheses + :return: a descriptor with arguments separated by spaces + """ + if "(" not in descriptor or ")" not in descriptor: raise ValueError(f"Invalid descriptor. {descriptor}") delimiter = descriptor.index(")") - arg_str = descriptor[:delimiter] + arg_str = descriptor[1:delimiter] args = re.findall(r"L.+?;|[ZBCSIJFD]|\[", arg_str) new_descriptor = "(" + " ".join(args) + descriptor[delimiter:] diff --git a/tests/core/test_apkinfo.py b/tests/core/test_apkinfo.py index e8482fa2..0d3a8f64 100644 --- a/tests/core/test_apkinfo.py +++ b/tests/core/test_apkinfo.py @@ -120,7 +120,7 @@ def test_android_apis(self, apkinfo): if apkinfo.core_library == "androguard": assert len(apkinfo.android_apis) == 1270 elif apkinfo.core_library == "rizin": - assert len(apkinfo.android_apis) == 1269 + assert len(apkinfo.android_apis) == 1438 assert api.issubset(apkinfo.android_apis) def test_custom_methods(self, apkinfo): @@ -139,7 +139,7 @@ def test_custom_methods(self, apkinfo): if apkinfo.core_library == "androguard": assert len(apkinfo.custom_methods) == 3999 elif apkinfo.core_library == "rizin": - assert len(apkinfo.custom_methods) == 3990 + assert len(apkinfo.custom_methods) == 3999 assert test_custom_method.issubset(apkinfo.custom_methods) def test_all_methods(self, apkinfo): @@ -159,7 +159,7 @@ def test_all_methods(self, apkinfo): if apkinfo.core_library == "androguard": assert len(apkinfo.all_methods) == 5452 elif apkinfo.core_library == "rizin": - assert len(apkinfo.all_methods) == 5260 + assert len(apkinfo.all_methods) == 5451 assert test_custom_method.issubset(apkinfo.all_methods)