diff --git a/quark/script/objection.py b/quark/script/objection.py index 158603fd..beec129a 100644 --- a/quark/script/objection.py +++ b/quark/script/objection.py @@ -2,15 +2,19 @@ # This file is part of Quark-Engine - https://github.com/quark-engine/quark-engine # See the file 'LICENSE' for copying permission. +from collections import namedtuple import re -from typing import Tuple, Union +from typing import List, Tuple, Union import requests from quark.script import Method +SupportedTypes = [int, float, str] +Instance = namedtuple("Instance", ["hashCode"]) -def convertMethodToString(method: Method): - def converArgumentsToObjectionFormat(arguments: str): + +def convertMethodToString(method: Union[Method, List[str]]): + def convertArgumentsToObjectionFormat(arguments: str): argList = arguments.split() argList = map( lambda a: a.replace("/", ".") @@ -21,11 +25,15 @@ def converArgumentsToObjectionFormat(arguments: str): return ",".join(argList) + # Convert method to a list of string if method is Method type + if isinstance(method, Method): + method = [method.class_name, method.name, method.descriptor] + str_mapping = { - "class_name": method.class_name[1:-1].replace("/", "."), - "method_name": method.name, - "arguments": converArgumentsToObjectionFormat( - method.descriptor[1: method.descriptor.index(")")] + "class_name": method[0][1:-1].replace("/", "."), + "method_name": method[1], + "arguments": convertArgumentsToObjectionFormat( + method[2][1: method[2].index(")")] ), } return ( @@ -56,6 +64,36 @@ def _sendRequest( return (response.status_code, response.json()) + def _getCurrentActivity(self) -> str: + ENDPOINT = "androidHookingGetCurrentActivity" + + _, response = self._sendRequest(ENDPOINT) + + if response: + return response["activity"] + else: + return None + + def getInstances(self, clazz: str) -> List[Instance]: + """Get instances of the specified class. + + :param clazz: the target class + :return: a list of instances + """ + ENDPOINT = "androidHeapGetLiveClassInstances" + data = { + "clazz": clazz, + } + + _, response = self._sendRequest(ENDPOINT, data) + + if response and isinstance(response, list): + return [ + Instance(hashCode=jsonObj["hashcode"]) for jsonObj in response + ] + else: + return [] + def hookMethod( self, method: Union[Method, str], @@ -66,7 +104,7 @@ def hookMethod( ): """Hook the target method with Objection. - :param method: the tagrget API + :param method: the target API :param overloadFilter: _description_, defaults to "" :param watchArgs: Return Args information if True, defaults to False :param watchBacktrace: Return backtrace information if True, defaults @@ -89,3 +127,90 @@ def hookMethod( } self._sendRequest(ENDPOINT, data) + + def execute( + self, + method: Union[Method, List[str], str], + arguments: List[object] = None, + ) -> None: + """Execute the target method. + + :param method: the target method + :param arguments: the arguments passing to the method, defaults to [] + """ + if arguments is None: + arguments = [] + + if isinstance(method, (Method, List)): + method, overloadFilter = convertMethodToString(method) + else: + overloadFilter = "" + + clazz, methodName = method.rsplit(".", 1) + instance = next(iter(self.getInstances(clazz)), None) + + argStrs = [] + for arg in arguments: + if isinstance(arg, str): + argStrs.append(f'"{arg}"') + elif isinstance(arg, Instance): + argStrs.append(f"getInstance({arg.hashCode})") + else: + argStrs.append(str(arg)) + + if overloadFilter: + overloadFilterStr = ",".join( + [ + f'"{argType.strip()}"' + for argType in overloadFilter.split(",") + ] + ) + else: + overloadFilterStr = None + + js = [] + + js.append(f'let clz = Java.use("{clazz}");') + + if overloadFilterStr: + js.append( + ( + f'let method = clz["{methodName}"]' + f".overload({overloadFilterStr});" + ) + ) + else: + js.append(f'let method = clz["{methodName}"];') + + if argStrs: + js.append( + f"const result = method.call" + f'({"clazz" if instance is not None else "clz"},' + f'{",".join(argStrs)});' + ) + else: + js.append( + f"const result = method.call" + f'({"clazz" if instance is not None else "clz"});' + ) + + js.append("console.log(result);") + + ENDPOINT = "androidHeapEvaluateHandleMethod" + + if instance: + data = { + "handle": instance.hashCode, + "js": "\n".join(js), + } + + else: + currentActivity = self._getCurrentActivity() + activityInstance = self.getInstances(currentActivity)[0] + + data = { + "handle": activityInstance.hashCode, + "js": "\n".join(js), + } + + self._sendRequest(ENDPOINT, data) diff --git a/tests/script/test_objection.py b/tests/script/test_objection.py index 58064aed..82a10f30 100644 --- a/tests/script/test_objection.py +++ b/tests/script/test_objection.py @@ -1,13 +1,49 @@ # -*- coding: utf-8 -*- # This file is part of Quark-Engine - https://github.com/quark-engine/quark-engine # See the file 'LICENSE' for copying permission. +import pytest -from unittest import TestCase -from unittest.mock import patch +from unittest.mock import Mock, patch from quark.core.struct.methodobject import MethodObject from quark.script import Method -from quark.script.objection import Objection, convertMethodToString +from quark.script.objection import Objection, convertMethodToString, Instance + + +@pytest.fixture(scope="function") +def mockedRequestPost(): + with patch("requests.post") as mock: + yield mock + + +@pytest.fixture(scope="function") +def mockedGetInstances(): + with patch("quark.script.objection.Objection.getInstances") as mock: + yield mock + + +@pytest.fixture(scope="function") +def mockedGetCurrentActivity(): + with patch("quark.script.objection.Objection._getCurrentActivity") as mock: + yield mock + + +@pytest.fixture(scope="function") +def responseForGetInstancesSuccess(): + response = Mock() + response.status_code.return_value = 200 + response.json.return_value = [{"hashcode": 11111111}] + + yield response + + +@pytest.fixture(scope="function") +def responseForGatInstancesFail(): + response = Mock() + response.status_code.return_value = 200 + response.json.return_value = {"message": "Error"} + + yield response def testCovertMethodToStringWithClasses(): @@ -79,7 +115,29 @@ def testCovertMethodToStringWithArrayAndClass(): class TestObjection: @staticmethod - def testHookMethodWithMethodObject(): + def testGetInstancesSuccess( + mockedRequestPost, responseForGetInstancesSuccess + ): + mockedRequestPost.return_value = responseForGetInstancesSuccess + obj = Objection("127.0.0.1:8888") + clazz = "Lcom/google/progress/WifiCheckTask;" + + instances = obj.getInstances(clazz) + + assert instances == [Instance(11111111)] + + @staticmethod + def testGetInstancesFail(mockedRequestPost, responseForGatInstancesFail): + mockedRequestPost.return_value = responseForGatInstancesFail + obj = Objection("127.0.0.1:8888") + clazz = "Lcom/google/progress/WifiCheckTask;" + + instances = obj.getInstances(clazz) + + assert instances == [] + + @staticmethod + def testHookMethodWithMethodObject(mockedRequestPost: Mock): obj = Objection("127.0.0.1:8888") method = Method( None, @@ -90,30 +148,120 @@ def testHookMethodWithMethodObject(): ), ) - with patch("requests.post") as mocked_post: - obj.hookMethod( - method, watchArgs=True, watchBacktrace=True, watchRet=True - ) - - expectedJson = { - "pattern": ( - "com.google.progress.WifiCheckTask" - ".checkWifiCanOrNotConnectServer" - ), - "overloadFilter": "[Ljava.lang.String;", - "watchArgs": True, - "watchBacktrace": True, - "watchRet": True, - } - - args, keyworkArgs = mocked_post.call_args_list[0] - - assert ( - "http://127.0.0.1:8888/rpc/invoke/androidHookingWatchMethod" - in args - ) - - TestCase().assertDictEqual(keyworkArgs["json"], expectedJson) - TestCase().assertDictEqual( - keyworkArgs["headers"], {"Content-type": "application/json"} - ) + obj.hookMethod( + method, watchArgs=True, watchBacktrace=True, watchRet=True + ) + + expectedJson = { + "pattern": ( + "com.google.progress.WifiCheckTask" + ".checkWifiCanOrNotConnectServer" + ), + "overloadFilter": "[Ljava.lang.String;", + "watchArgs": True, + "watchBacktrace": True, + "watchRet": True, + } + + mockedRequestPost.assert_called_once_with( + "http://127.0.0.1:8888/rpc/invoke/androidHookingWatchMethod", + json=expectedJson, + headers={"Content-type": "application/json"}, + ) + + @staticmethod + def testExecuteWithAnInstance(mockedRequestPost, mockedGetInstances): + mockedGetInstances.return_value = [Instance(11111111)] + obj = Objection("127.0.0.1:8888") + method = Method( + None, + MethodObject( + "La/b/clazz;", + "methodName", + "()Z", + ), + ) + + obj.execute(method) + + expectedJson = { + "handle": 11111111, + "js": ( + 'let clz = Java.use("a.b.clazz");\n' + 'let method = clz["methodName"];\n' + "const result = method.call(clazz);\n" + "console.log(result);" + ), + } + + mockedRequestPost.assert_called_once_with( + "http://127.0.0.1:8888/rpc/invoke/androidHeapEvaluateHandleMethod", + json=expectedJson, + headers={"Content-type": "application/json"}, + ) + + @staticmethod + def testExecuteWithoutAnInstance( + mockedRequestPost, mockedGetInstances, mockedGetCurrentActivity + ): + mockedGetCurrentActivity.return_value = "activity" + mockedGetInstances.side_effect = [[], [Instance(22222222)]] + obj = Objection("127.0.0.1:8888") + method = Method( + None, + MethodObject( + "La/b/clazz;", + "methodName", + "()Z", + ), + ) + + obj.execute(method) + + expectedJson = { + "handle": 22222222, + "js": ( + 'let clz = Java.use("a.b.clazz");\n' + 'let method = clz["methodName"];\n' + "const result = method.call(clz);\n" + "console.log(result);" + ), + } + + mockedRequestPost.assert_called_once_with( + "http://127.0.0.1:8888/rpc/invoke/androidHeapEvaluateHandleMethod", + json=expectedJson, + headers={"Content-type": "application/json"}, + ) + + @staticmethod + def testExecuteWithArguments(mockedRequestPost, mockedGetInstances): + mockedGetInstances.return_value = [Instance(11111111)] + obj = Objection("127.0.0.1:8888") + method = Method( + None, + MethodObject( + "La/b/clazz;", + "methodName", + "(Ljava/lang/String; Ljava/lang/String)Z", + ), + ) + + obj.execute(method, ["Arg1", "Arg2"]) + + expectedJson = { + "handle": 11111111, + "js": ( + 'let clz = Java.use("a.b.clazz");\n' + 'let method = clz["methodName"]' + '.overload("java.lang.String","java.lang.String");\n' + 'const result = method.call(clazz,"Arg1","Arg2");\n' + "console.log(result);" + ), + } + + mockedRequestPost.assert_called_once_with( + "http://127.0.0.1:8888/rpc/invoke/androidHeapEvaluateHandleMethod", + json=expectedJson, + headers={"Content-type": "application/json"}, + )