Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
141 changes: 133 additions & 8 deletions quark/script/objection.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,15 +2,19 @@
# This file is part of Quark-Engine - https://github.com/quark-engine/quark-engine
# See the file 'LICENSE' for copying permission.

from collections import namedtuple
import re
from typing import Tuple, Union
from typing import List, Tuple, Union

import requests
from quark.script import Method

SupportedTypes = [int, float, str]
Instance = namedtuple("Instance", ["hashCode"])

def convertMethodToString(method: Method):
def converArgumentsToObjectionFormat(arguments: str):

def convertMethodToString(method: Union[Method, List[str]]):
def convertArgumentsToObjectionFormat(arguments: str):
argList = arguments.split()
argList = map(
lambda a: a.replace("/", ".")
Expand All @@ -21,11 +25,15 @@ def converArgumentsToObjectionFormat(arguments: str):

return ",".join(argList)

# Convert method to a list of string if method is Method type
if isinstance(method, Method):
method = [method.class_name, method.name, method.descriptor]

str_mapping = {
"class_name": method.class_name[1:-1].replace("/", "."),
"method_name": method.name,
"arguments": converArgumentsToObjectionFormat(
method.descriptor[1: method.descriptor.index(")")]
"class_name": method[0][1:-1].replace("/", "."),
"method_name": method[1],
"arguments": convertArgumentsToObjectionFormat(
method[2][1: method[2].index(")")]
),
}
return (
Expand Down Expand Up @@ -56,6 +64,36 @@ def _sendRequest(

return (response.status_code, response.json())

def _getCurrentActivity(self) -> str:
ENDPOINT = "androidHookingGetCurrentActivity"

_, response = self._sendRequest(ENDPOINT)

if response:
return response["activity"]
else:
return None

def getInstances(self, clazz: str) -> List[Instance]:
"""Get instances of the specified class.

:param clazz: the target class
:return: a list of instances
"""
ENDPOINT = "androidHeapGetLiveClassInstances"
data = {
"clazz": clazz,
}

_, response = self._sendRequest(ENDPOINT, data)

if response and isinstance(response, list):
return [
Instance(hashCode=jsonObj["hashcode"]) for jsonObj in response
]
else:
return []

def hookMethod(
self,
method: Union[Method, str],
Expand All @@ -66,7 +104,7 @@ def hookMethod(
):
"""Hook the target method with Objection.

:param method: the tagrget API
:param method: the target API
:param overloadFilter: _description_, defaults to ""
:param watchArgs: Return Args information if True, defaults to False
:param watchBacktrace: Return backtrace information if True, defaults
Expand All @@ -89,3 +127,90 @@ def hookMethod(
}

self._sendRequest(ENDPOINT, data)

def execute(
self,
method: Union[Method, List[str], str],
arguments: List[object] = None,
) -> None:
"""Execute the target method.

:param method: the target method
:param arguments: the arguments passing to the method, defaults to []
"""
if arguments is None:
arguments = []

if isinstance(method, (Method, List)):
method, overloadFilter = convertMethodToString(method)
else:
overloadFilter = ""

clazz, methodName = method.rsplit(".", 1)
instance = next(iter(self.getInstances(clazz)), None)

argStrs = []
for arg in arguments:
if isinstance(arg, str):
argStrs.append(f'"{arg}"')
elif isinstance(arg, Instance):
argStrs.append(f"getInstance({arg.hashCode})")
else:
argStrs.append(str(arg))

if overloadFilter:
overloadFilterStr = ",".join(
[
f'"{argType.strip()}"'
for argType in overloadFilter.split(",")
]
)
else:
overloadFilterStr = None

js = []

js.append(f'let clz = Java.use("{clazz}");')

if overloadFilterStr:
js.append(
(
f'let method = clz["{methodName}"]'
f".overload({overloadFilterStr});"
)
)
else:
js.append(f'let method = clz["{methodName}"];')

if argStrs:
js.append(
f"const result = method.call"
f'({"clazz" if instance is not None else "clz"},'
f'{",".join(argStrs)});'
)
else:
js.append(
f"const result = method.call"
f'({"clazz" if instance is not None else "clz"});'
)

js.append("console.log(result);")

ENDPOINT = "androidHeapEvaluateHandleMethod"

if instance:
data = {
"handle": instance.hashCode,
"js": "\n".join(js),
}

else:
currentActivity = self._getCurrentActivity()
activityInstance = self.getInstances(currentActivity)[0]

data = {
"handle": activityInstance.hashCode,
"js": "\n".join(js),
}

self._sendRequest(ENDPOINT, data)
Loading