Skip to content
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,8 @@
from dex.command.commands.DexFinishTest import DexFinishTest
from dex.command.commands.DexUnreachable import DexUnreachable
from dex.command.commands.DexWatch import DexWatch
from dex.command.commands.DexStepFunction import DexStepFunction
from dex.command.commands.DexContinue import DexContinue
from dex.utils import Timer
from dex.utils.Exceptions import CommandParseError, DebuggerException

Expand All @@ -59,6 +61,8 @@ def _get_valid_commands():
DexFinishTest.get_name(): DexFinishTest,
DexUnreachable.get_name(): DexUnreachable,
DexWatch.get_name(): DexWatch,
DexStepFunction.get_name(): DexStepFunction,
DexContinue.get_name(): DexContinue,
}


Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@
from dex.debugger.DebuggerBase import DebuggerBase
from dex.utils.Exceptions import DebuggerException
from dex.utils.Timeout import Timeout

from dex.dextIR import LocIR

class BreakpointRange:
"""A range of breakpoints and a set of conditions.
Expand Down Expand Up @@ -51,6 +51,9 @@ def __init__(
values: list,
hit_count: int,
finish_on_remove: bool,
is_continue: bool = False,
function: str = None,
addr: str = None,
):
self.expression = expression
self.path = path
Expand All @@ -60,6 +63,72 @@ def __init__(
self.max_hit_count = hit_count
self.current_hit_count = 0
self.finish_on_remove = finish_on_remove
self.is_continue = is_continue
self.function = function
self.addr = addr

def limit_steps(
expression: str,
path: str,
range_from: int,
range_to: int,
values: list,
hit_count: int,
):
return BreakpointRange(
expression,
path,
range_from,
range_to,
values,
hit_count,
False,
)

def finish_test(
expression: str, path: str, on_line: int, values: list, hit_count: int
):
return BreakpointRange(
expression,
path,
on_line,
on_line,
values,
hit_count,
True,
)

def continue_from_to(
expression: str,
path: str,
from_line: int,
to_line: int,
values: list,
hit_count: int,
):
return BreakpointRange(
expression,
path,
from_line,
to_line,
values,
hit_count,
finish_on_remove=False,
is_continue=True,
)

def step_function(function: str, path: str, hit_count: int):
return BreakpointRange(
None,
path,
None,
None,
None,
hit_count,
finish_on_remove=False,
is_continue=False,
function=function,
)

def has_conditions(self):
return self.expression is not None
Expand Down Expand Up @@ -96,34 +165,40 @@ def __init__(self, context, step_collection):
def _build_bp_ranges(self):
commands = self.step_collection.commands
self._bp_ranges = []
try:
limit_commands = commands["DexLimitSteps"]
for lc in limit_commands:
bpr = BreakpointRange(
lc.expression,
lc.path,
lc.from_line,
lc.to_line,
lc.values,
lc.hit_count,
False,
)
self._bp_ranges.append(bpr)
except KeyError:

cond_controller_cmds = ["DexLimitSteps", "DexStepFunction", "DexContinue"]
if not any(c in commands for c in cond_controller_cmds):
raise DebuggerException(
"Missing DexLimitSteps commands, cannot conditionally step."
f"No conditional commands {cond_controller_cmds}, cannot conditionally step."
)

if "DexLimitSteps" in commands:
for c in commands["DexLimitSteps"]:
bpr = BreakpointRange.limit_steps(
c.expression,
c.path,
c.from_line,
c.to_line,
c.values,
c.hit_count,
)
self._bp_ranges.append(bpr)
if "DexFinishTest" in commands:
finish_commands = commands["DexFinishTest"]
for ic in finish_commands:
bpr = BreakpointRange(
ic.expression,
ic.path,
ic.on_line,
ic.on_line,
ic.values,
ic.hit_count + 1,
True,
for c in commands["DexFinishTest"]:
bpr = BreakpointRange.finish_test(
c.expression, c.path, c.on_line, c.values, c.hit_count + 1
)
self._bp_ranges.append(bpr)
if "DexContinue" in commands:
for c in commands["DexContinue"]:
bpr = BreakpointRange.continue_from_to(
c.expression, c.path, c.from_line, c.to_line, c.values, c.hit_count
)
self._bp_ranges.append(bpr)
if "DexStepFunction" in commands:
for c in commands["DexStepFunction"]:
bpr = BreakpointRange.step_function(
c.get_function(), c.path, c.hit_count
)
self._bp_ranges.append(bpr)

Expand All @@ -138,6 +213,9 @@ def _set_leading_bps(self):
bpr.path, bpr.range_from, cond_expr
)
self._leading_bp_handles[id] = bpr
elif bpr.function is not None:
id = self.debugger.add_function_breakpoint(bpr.function)
self._leading_bp_handles[id] = bpr
else:
# Add an unconditional breakpoint.
id = self.debugger.add_breakpoint(bpr.path, bpr.range_from)
Expand All @@ -163,6 +241,9 @@ def _run_debugger_custom(self, cmdline):
timed_out = False
total_timeout = Timeout(self.context.options.timeout_total)

step_function_backtraces: list[list[str]] = []
self.instr_bp_ids = set()

while not self.debugger.is_finished:
breakpoint_timeout = Timeout(self.context.options.timeout_breakpoint)
while self.debugger.is_running and not timed_out:
Expand All @@ -185,21 +266,26 @@ def _run_debugger_custom(self, cmdline):
break

step_info = self.debugger.get_step_info(self._watches, self._step_index)
backtrace = None
if step_info.current_frame:
self._step_index += 1
update_step_watches(
step_info, self._watches, self.step_collection.commands
)
self.step_collection.new_step(self.context, step_info)
backtrace = [f.function for f in step_info.frames]

record_step = False
debugger_continue = False
bp_to_delete = []
for bp_id in self.debugger.get_triggered_breakpoint_ids():
try:
# See if this is one of our leading breakpoints.
bpr = self._leading_bp_handles[bp_id]
record_step = True
except KeyError:
# This is a trailing bp. Mark it for removal.
bp_to_delete.append(bp_id)
if bp_id in self.instr_bp_ids:
self.instr_bp_ids.remove(bp_id)
else:
record_step = True
continue

bpr.add_hit()
Expand All @@ -208,17 +294,93 @@ def _run_debugger_custom(self, cmdline):
exit_desired = True
bp_to_delete.append(bp_id)
del self._leading_bp_handles[bp_id]
# Add a range of trailing breakpoints covering the lines
# requested in the DexLimitSteps command. Ignore first line as
# that's covered by the leading bp we just hit and include the
# final line.
for line in range(bpr.range_from + 1, bpr.range_to + 1):
self.debugger.add_breakpoint(bpr.path, line)

if bpr.function is not None:
if step_info.frames:
# Add this backtrace to the stack. While the current
# backtrace matches the top of the stack we'll step,
# and while there's a backtrace in the stack that
# is a subset of the current backtrace we'll step-out.
if (
len(step_function_backtraces) == 0
or backtrace != step_function_backtraces[-1]
):
step_function_backtraces.append(backtrace)

# Add an address breakpoint so we don't fall out
# the end of nested DexStepFunctions with a DexContinue.
addr = self.debugger.get_pc(frame_idx=1)
instr_id = self.debugger.add_instruction_breakpoint(addr)
# Note the breakpoint so we don't log the source location
# it in the trace later.
self.instr_bp_ids.add(instr_id)

elif bpr.is_continue:
debugger_continue = True
if bpr.range_to is not None:
self.debugger.add_breakpoint(bpr.path, bpr.range_to)

else:
# Add a range of trailing breakpoints covering the lines
# requested in the DexLimitSteps command. Ignore first line as
# that's covered by the leading bp we just hit and include the
# final line.
for line in range(bpr.range_from + 1, bpr.range_to + 1):
id = self.debugger.add_breakpoint(bpr.path, line)

# Remove any trailing or expired leading breakpoints we just hit.
self.debugger.delete_breakpoints(bp_to_delete)

debugger_next = False
debugger_out = False
if not debugger_continue and step_info.current_frame and step_info.frames:
while len(step_function_backtraces) > 0:
match_subtrace = False # Backtrace contains a target trace.
match_trace = False # Backtrace matches top of target stack.

# The top of the step_function_backtraces stack contains a
# backtrace that we want to step through. Check if the
# current backtrace ("backtrace") either matches that trace
# or otherwise contains it.
target_backtrace = step_function_backtraces[-1]
if len(backtrace) >= len(target_backtrace):
match_trace = len(backtrace) == len(target_backtrace)
# Check if backtrace contains target_backtrace, matching
# from the end (bottom of call stack) backwards.
match_subtrace = (
backtrace[-len(target_backtrace) :] == target_backtrace
)

if match_trace:
# We want to step through this function; do so and
# log the steps in the step trace.
debugger_next = True
record_step = True
break
elif match_subtrace:
# There's a function we care about buried in the
# current backtrace. Step-out until we get to it.
debugger_out = True
break
else:
# Drop backtraces that are not match_subtraces of the current
# backtrace; the functions we wanted to step through
# there are no longer reachable.
step_function_backtraces.pop()

if record_step and step_info.current_frame:
# Record the step.
update_step_watches(
step_info, self._watches, self.step_collection.commands
)
self.step_collection.new_step(self.context, step_info)

if exit_desired:
break
self.debugger.go()
elif debugger_next:
self.debugger.step_next()
elif debugger_out:
self.debugger.step_out()
else:
self.debugger.go()
time.sleep(self._pause_between_steps)
Original file line number Diff line number Diff line change
Expand Up @@ -121,7 +121,8 @@ def _init_debugger_controller(self):

self.context.options.source_files.extend(list(new_source_files))

if "DexLimitSteps" in step_collection.commands:
cond_controller_cmds = ["DexLimitSteps", "DexStepFunction", "DexContinue"]
if any(c in step_collection.commands for c in cond_controller_cmds):
debugger_controller = ConditionalController(self.context, step_collection)
else:
debugger_controller = DefaultController(self.context, step_collection)
Expand Down
Loading
Loading