Skip to content
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,8 @@
from dex.command.commands.DexFinishTest import DexFinishTest
from dex.command.commands.DexUnreachable import DexUnreachable
from dex.command.commands.DexWatch import DexWatch
from dex.command.commands.DexStepFunction import DexStepFunction
from dex.command.commands.DexContinue import DexContinue
from dex.utils import Timer
from dex.utils.Exceptions import CommandParseError, DebuggerException

Expand All @@ -59,6 +61,8 @@ def _get_valid_commands():
DexFinishTest.get_name(): DexFinishTest,
DexUnreachable.get_name(): DexUnreachable,
DexWatch.get_name(): DexWatch,
DexStepFunction.get_name(): DexStepFunction,
DexContinue.get_name(): DexContinue,
}


Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -381,8 +381,6 @@ def _handle_message(
else:
pass
elif message["type"] == "response":
request_seq = message["request_seq"]
debugger_state.set_response(request_seq, message)
# TODO: We also receive a "continued" event, but it seems reasonable to set state based on either the
# response or the event, since the DAP does not specify an order in which they are sent. May need revisiting
# if there turns out to be some odd ordering issues, e.g. if we can receive messages in the order
Expand All @@ -409,6 +407,10 @@ def _handle_message(
body = message.get("body")
if body:
debugger_state.capabilities.update(logger, body)
# Now we've done whatever we need to do with the response, tell the
# receiver thread we've got it.
request_seq = message["request_seq"]
debugger_state.set_response(request_seq, message)

def _colorize_dap_message(message: dict) -> dict:
colorized_message = copy.deepcopy(message)
Expand Down Expand Up @@ -696,11 +698,12 @@ def get_triggered_breakpoint_ids(self):
# Breakpoints can only have been triggered if we've hit one.
stop_reason = self._translate_stop_reason(self._debugger_state.stopped_reason)
if stop_reason != StopReason.BREAKPOINT:
return []
return set()
breakpoint_ids = set(
[
dex_id
for dap_id in self._debugger_state.stopped_bps
if dap_id in self.dap_id_to_dex_ids
for dex_id in self.dap_id_to_dex_ids[dap_id]
]
)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@
from dex.debugger.DebuggerBase import DebuggerBase
from dex.utils.Exceptions import DebuggerException
from dex.utils.Timeout import Timeout

from dex.dextIR import LocIR

class BreakpointRange:
"""A range of breakpoints and a set of conditions.
Expand Down Expand Up @@ -51,6 +51,9 @@ def __init__(
values: list,
hit_count: int,
finish_on_remove: bool,
is_continue: bool = False,
function: str = None,
addr: str = None,
):
self.expression = expression
self.path = path
Expand All @@ -60,6 +63,72 @@ def __init__(
self.max_hit_count = hit_count
self.current_hit_count = 0
self.finish_on_remove = finish_on_remove
self.is_continue = is_continue
self.function = function
self.addr = addr

def limit_steps(
expression: str,
path: str,
range_from: int,
range_to: int,
values: list,
hit_count: int,
):
return BreakpointRange(
expression,
path,
range_from,
range_to,
values,
hit_count,
False,
)

def finish_test(
expression: str, path: str, on_line: int, values: list, hit_count: int
):
return BreakpointRange(
expression,
path,
on_line,
on_line,
values,
hit_count,
True,
)

def continue_from_to(
expression: str,
path: str,
from_line: int,
to_line: int,
values: list,
hit_count: int,
):
return BreakpointRange(
expression,
path,
from_line,
to_line,
values,
hit_count,
finish_on_remove=False,
is_continue=True,
)

def step_function(function: str, path: str, hit_count: int):
return BreakpointRange(
None,
path,
None,
None,
None,
hit_count,
finish_on_remove=False,
is_continue=False,
function=function,
)

def has_conditions(self):
return self.expression is not None
Expand Down Expand Up @@ -96,34 +165,40 @@ def __init__(self, context, step_collection):
def _build_bp_ranges(self):
commands = self.step_collection.commands
self._bp_ranges = []
try:
limit_commands = commands["DexLimitSteps"]
for lc in limit_commands:
bpr = BreakpointRange(
lc.expression,
lc.path,
lc.from_line,
lc.to_line,
lc.values,
lc.hit_count,
False,
)
self._bp_ranges.append(bpr)
except KeyError:

cond_controller_cmds = ["DexLimitSteps", "DexStepFunction", "DexContinue"]
if not any(c in commands for c in cond_controller_cmds):
raise DebuggerException(
"Missing DexLimitSteps commands, cannot conditionally step."
f"No conditional commands {cond_controller_cmds}, cannot conditionally step."
)

if "DexLimitSteps" in commands:
for c in commands["DexLimitSteps"]:
bpr = BreakpointRange.limit_steps(
c.expression,
c.path,
c.from_line,
c.to_line,
c.values,
c.hit_count,
)
self._bp_ranges.append(bpr)
if "DexFinishTest" in commands:
finish_commands = commands["DexFinishTest"]
for ic in finish_commands:
bpr = BreakpointRange(
ic.expression,
ic.path,
ic.on_line,
ic.on_line,
ic.values,
ic.hit_count + 1,
True,
for c in commands["DexFinishTest"]:
bpr = BreakpointRange.finish_test(
c.expression, c.path, c.on_line, c.values, c.hit_count + 1
)
self._bp_ranges.append(bpr)
if "DexContinue" in commands:
for c in commands["DexContinue"]:
bpr = BreakpointRange.continue_from_to(
c.expression, c.path, c.from_line, c.to_line, c.values, c.hit_count
)
self._bp_ranges.append(bpr)
if "DexStepFunction" in commands:
for c in commands["DexStepFunction"]:
bpr = BreakpointRange.step_function(
c.get_function(), c.path, c.hit_count
)
self._bp_ranges.append(bpr)

Expand All @@ -138,6 +213,9 @@ def _set_leading_bps(self):
bpr.path, bpr.range_from, cond_expr
)
self._leading_bp_handles[id] = bpr
elif bpr.function is not None:
id = self.debugger.add_function_breakpoint(bpr.function)
self._leading_bp_handles[id] = bpr
else:
# Add an unconditional breakpoint.
id = self.debugger.add_breakpoint(bpr.path, bpr.range_from)
Expand All @@ -163,6 +241,9 @@ def _run_debugger_custom(self, cmdline):
timed_out = False
total_timeout = Timeout(self.context.options.timeout_total)

step_function_backtraces: list[list[str]] = []
self.instr_bp_ids = set()

while not self.debugger.is_finished:
breakpoint_timeout = Timeout(self.context.options.timeout_breakpoint)
while self.debugger.is_running and not timed_out:
Expand All @@ -185,21 +266,26 @@ def _run_debugger_custom(self, cmdline):
break

step_info = self.debugger.get_step_info(self._watches, self._step_index)
backtrace = None
if step_info.current_frame:
self._step_index += 1
update_step_watches(
step_info, self._watches, self.step_collection.commands
)
self.step_collection.new_step(self.context, step_info)
backtrace = [f.function for f in step_info.frames]

record_step = False
debugger_continue = False
bp_to_delete = []
for bp_id in self.debugger.get_triggered_breakpoint_ids():
try:
# See if this is one of our leading breakpoints.
bpr = self._leading_bp_handles[bp_id]
record_step = True
except KeyError:
# This is a trailing bp. Mark it for removal.
bp_to_delete.append(bp_id)
if bp_id in self.instr_bp_ids:
self.instr_bp_ids.remove(bp_id)
else:
record_step = True
continue

bpr.add_hit()
Expand All @@ -208,17 +294,93 @@ def _run_debugger_custom(self, cmdline):
exit_desired = True
bp_to_delete.append(bp_id)
del self._leading_bp_handles[bp_id]
# Add a range of trailing breakpoints covering the lines
# requested in the DexLimitSteps command. Ignore first line as
# that's covered by the leading bp we just hit and include the
# final line.
for line in range(bpr.range_from + 1, bpr.range_to + 1):
self.debugger.add_breakpoint(bpr.path, line)

if bpr.function is not None:
if step_info.frames:
# Add this backtrace to the stack. While the current
# backtrace matches the top of the stack we'll step,
# and while there's a backtrace in the stack that
# is a subset of the current backtrace we'll step-out.
if (
len(step_function_backtraces) == 0
or backtrace != step_function_backtraces[-1]
):
step_function_backtraces.append(backtrace)

# Add an address breakpoint so we don't fall out
# the end of nested DexStepFunctions with a DexContinue.
addr = self.debugger.get_pc(frame_idx=1)
instr_id = self.debugger.add_instruction_breakpoint(addr)
# Note the breakpoint so we don't log the source location
# it in the trace later.
self.instr_bp_ids.add(instr_id)

elif bpr.is_continue:
debugger_continue = True
if bpr.range_to is not None:
self.debugger.add_breakpoint(bpr.path, bpr.range_to)

else:
# Add a range of trailing breakpoints covering the lines
# requested in the DexLimitSteps command. Ignore first line as
# that's covered by the leading bp we just hit and include the
# final line.
for line in range(bpr.range_from + 1, bpr.range_to + 1):
id = self.debugger.add_breakpoint(bpr.path, line)

# Remove any trailing or expired leading breakpoints we just hit.
self.debugger.delete_breakpoints(bp_to_delete)

debugger_next = False
debugger_out = False
if not debugger_continue and step_info.current_frame and step_info.frames:
while len(step_function_backtraces) > 0:
match_subtrace = False # Backtrace contains a target trace.
match_trace = False # Backtrace matches top of target stack.

# The top of the step_function_backtraces stack contains a
# backtrace that we want to step through. Check if the
# current backtrace ("backtrace") either matches that trace
# or otherwise contains it.
target_backtrace = step_function_backtraces[-1]
if len(backtrace) >= len(target_backtrace):
match_trace = len(backtrace) == len(target_backtrace)
# Check if backtrace contains target_backtrace, matching
# from the end (bottom of call stack) backwards.
match_subtrace = (
backtrace[-len(target_backtrace) :] == target_backtrace
)

if match_trace:
# We want to step through this function; do so and
# log the steps in the step trace.
debugger_next = True
record_step = True
break
elif match_subtrace:
# There's a function we care about buried in the
# current backtrace. Step-out until we get to it.
debugger_out = True
break
else:
# Drop backtraces that are not match_subtraces of the current
# backtrace; the functions we wanted to step through
# there are no longer reachable.
step_function_backtraces.pop()

if record_step and step_info.current_frame:
# Record the step.
update_step_watches(
step_info, self._watches, self.step_collection.commands
)
self.step_collection.new_step(self.context, step_info)

if exit_desired:
break
self.debugger.go()
elif debugger_next:
self.debugger.step_next()
elif debugger_out:
self.debugger.step_out()
else:
self.debugger.go()
time.sleep(self._pause_between_steps)
Original file line number Diff line number Diff line change
Expand Up @@ -121,7 +121,8 @@ def _init_debugger_controller(self):

self.context.options.source_files.extend(list(new_source_files))

if "DexLimitSteps" in step_collection.commands:
cond_controller_cmds = ["DexLimitSteps", "DexStepFunction", "DexContinue"]
if any(c in step_collection.commands for c in cond_controller_cmds):
debugger_controller = ConditionalController(self.context, step_collection)
else:
debugger_controller = DefaultController(self.context, step_collection)
Expand Down
Loading
Loading