|
23 | 23 | from typing import Callable, List, Union |
24 | 24 |
|
25 | 25 |
|
| 26 | +# TTD (Time Travel Debugging) Memory Access Type parsing |
| 27 | +def parse_ttd_access_type(access_spec): |
| 28 | + """ |
| 29 | + Parse TTD memory access type from string specification. |
| 30 | + |
| 31 | + Args: |
| 32 | + access_spec: String containing access type specification. |
| 33 | + Can be combinations of 'r' (read), 'w' (write), 'e' (execute) |
| 34 | + e.g., "r", "rw", "rwe", "we", etc. |
| 35 | + |
| 36 | + Returns: |
| 37 | + DebuggerTTDMemoryAccessType enum value |
| 38 | + """ |
| 39 | + if isinstance(access_spec, str): |
| 40 | + access_value = 0 |
| 41 | + access_spec = access_spec.lower() |
| 42 | + |
| 43 | + if 'r' in access_spec: |
| 44 | + access_value |= DebuggerTTDMemoryAccessType.DebuggerTTDMemoryRead |
| 45 | + if 'w' in access_spec: |
| 46 | + access_value |= DebuggerTTDMemoryAccessType.DebuggerTTDMemoryWrite |
| 47 | + if 'e' in access_spec: |
| 48 | + access_value |= DebuggerTTDMemoryAccessType.DebuggerTTDMemoryExecute |
| 49 | + |
| 50 | + if access_value == 0: |
| 51 | + raise ValueError(f"Invalid access type specification: '{access_spec}'. Use combinations of 'r', 'w', 'e'") |
| 52 | + |
| 53 | + return access_value |
| 54 | + else: |
| 55 | + # Assume it's already a DebuggerTTDMemoryAccessType enum value |
| 56 | + return access_spec |
| 57 | + |
| 58 | + |
26 | 59 | class DebugProcess: |
27 | 60 | """ |
28 | 61 | DebugProcess represents a process in the target. It has the following fields: |
@@ -550,6 +583,144 @@ def from_string(cls, timestamp_str): |
550 | 583 | return cls(parts[0], parts[1]) |
551 | 584 |
|
552 | 585 |
|
| 586 | +class TTDMemoryEvent: |
| 587 | + """ |
| 588 | + TTDMemoryEvent represents a memory access event in a TTD trace. It has the following fields: |
| 589 | +
|
| 590 | + * ``event_type``: type of the event (e.g., "Memory") |
| 591 | + * ``thread_id``: OS thread ID that performed the memory access |
| 592 | + * ``unique_thread_id``: unique thread ID across the trace |
| 593 | + * ``time_start``: TTD position when the memory access started |
| 594 | + * ``time_end``: TTD position when the memory access ended |
| 595 | + * ``address``: memory address that was accessed |
| 596 | + * ``size``: size of the memory access |
| 597 | + * ``memory_address``: actual memory address (may differ from address field) |
| 598 | + * ``instruction_address``: address of the instruction that performed the access |
| 599 | + * ``value``: value that was read/written/executed |
| 600 | + * ``access_type``: type of access (read/write/execute) |
| 601 | + """ |
| 602 | + |
| 603 | + def __init__(self, event_type: str, thread_id: int, unique_thread_id: int, |
| 604 | + time_start: TTDPosition, time_end: TTDPosition, address: int, |
| 605 | + size: int, memory_address: int, instruction_address: int, |
| 606 | + value: int, access_type: int): |
| 607 | + self.event_type = event_type |
| 608 | + self.thread_id = thread_id |
| 609 | + self.unique_thread_id = unique_thread_id |
| 610 | + self.time_start = time_start |
| 611 | + self.time_end = time_end |
| 612 | + self.address = address |
| 613 | + self.size = size |
| 614 | + self.memory_address = memory_address |
| 615 | + self.instruction_address = instruction_address |
| 616 | + self.value = value |
| 617 | + self.access_type = access_type |
| 618 | + |
| 619 | + def __eq__(self, other): |
| 620 | + if not isinstance(other, self.__class__): |
| 621 | + return NotImplemented |
| 622 | + return (self.event_type == other.event_type and |
| 623 | + self.thread_id == other.thread_id and |
| 624 | + self.unique_thread_id == other.unique_thread_id and |
| 625 | + self.time_start == other.time_start and |
| 626 | + self.time_end == other.time_end and |
| 627 | + self.address == other.address and |
| 628 | + self.size == other.size and |
| 629 | + self.memory_address == other.memory_address and |
| 630 | + self.instruction_address == other.instruction_address and |
| 631 | + self.value == other.value and |
| 632 | + self.access_type == other.access_type) |
| 633 | + |
| 634 | + def __ne__(self, other): |
| 635 | + if not isinstance(other, self.__class__): |
| 636 | + return NotImplemented |
| 637 | + return not (self == other) |
| 638 | + |
| 639 | + def __hash__(self): |
| 640 | + return hash((self.event_type, self.thread_id, self.unique_thread_id, |
| 641 | + self.time_start, self.time_end, self.address, self.size, |
| 642 | + self.memory_address, self.instruction_address, self.value, |
| 643 | + self.access_type)) |
| 644 | + |
| 645 | + def __setattr__(self, name, value): |
| 646 | + try: |
| 647 | + object.__setattr__(self, name, value) |
| 648 | + except AttributeError: |
| 649 | + raise AttributeError(f"attribute '{name}' is read only") |
| 650 | + |
| 651 | + def __repr__(self): |
| 652 | + return f"<TTDMemoryEvent: {self.event_type} @ {self.address:#x}, thread {self.thread_id}>" |
| 653 | + |
| 654 | + |
| 655 | +class TTDCallEvent: |
| 656 | + """ |
| 657 | + TTDCallEvent represents a function call event in a TTD trace. It has the following fields: |
| 658 | +
|
| 659 | + * ``event_type``: type of the event (always "Call" for TTD.Calls objects) |
| 660 | + * ``thread_id``: OS thread ID that made the call |
| 661 | + * ``unique_thread_id``: unique thread ID across the trace |
| 662 | + * ``function``: symbolic name of the function |
| 663 | + * ``function_address``: function's address in memory |
| 664 | + * ``return_address``: instruction to return to after the call |
| 665 | + * ``return_value``: return value of the function (if not void) |
| 666 | + * ``has_return_value``: whether the function has a return value |
| 667 | + * ``parameters``: list of parameters passed to the function |
| 668 | + * ``time_start``: TTD position when call started |
| 669 | + * ``time_end``: TTD position when call ended |
| 670 | + """ |
| 671 | + |
| 672 | + def __init__(self, event_type: str, thread_id: int, unique_thread_id: int, |
| 673 | + function: str, function_address: int, return_address: int, |
| 674 | + return_value: int, has_return_value: bool, parameters: List[str], |
| 675 | + time_start: TTDPosition, time_end: TTDPosition): |
| 676 | + self.event_type = event_type |
| 677 | + self.thread_id = thread_id |
| 678 | + self.unique_thread_id = unique_thread_id |
| 679 | + self.function = function |
| 680 | + self.function_address = function_address |
| 681 | + self.return_address = return_address |
| 682 | + self.return_value = return_value |
| 683 | + self.has_return_value = has_return_value |
| 684 | + self.parameters = parameters |
| 685 | + self.time_start = time_start |
| 686 | + self.time_end = time_end |
| 687 | + |
| 688 | + def __eq__(self, other): |
| 689 | + if not isinstance(other, self.__class__): |
| 690 | + return NotImplemented |
| 691 | + return (self.event_type == other.event_type and |
| 692 | + self.thread_id == other.thread_id and |
| 693 | + self.unique_thread_id == other.unique_thread_id and |
| 694 | + self.function == other.function and |
| 695 | + self.function_address == other.function_address and |
| 696 | + self.return_address == other.return_address and |
| 697 | + self.return_value == other.return_value and |
| 698 | + self.has_return_value == other.has_return_value and |
| 699 | + self.parameters == other.parameters and |
| 700 | + self.time_start == other.time_start and |
| 701 | + self.time_end == other.time_end) |
| 702 | + |
| 703 | + def __ne__(self, other): |
| 704 | + if not isinstance(other, self.__class__): |
| 705 | + return NotImplemented |
| 706 | + return not (self == other) |
| 707 | + |
| 708 | + def __hash__(self): |
| 709 | + return hash((self.event_type, self.thread_id, self.unique_thread_id, |
| 710 | + self.function, self.function_address, self.return_address, |
| 711 | + self.return_value, self.has_return_value, tuple(self.parameters), |
| 712 | + self.time_start, self.time_end)) |
| 713 | + |
| 714 | + def __setattr__(self, name, value): |
| 715 | + try: |
| 716 | + object.__setattr__(self, name, value) |
| 717 | + except AttributeError: |
| 718 | + raise AttributeError(f"attribute '{name}' is read only") |
| 719 | + |
| 720 | + def __repr__(self): |
| 721 | + return f"<TTDCallEvent: {self.function} @ {self.function_address:#x}, thread {self.thread_id}>" |
| 722 | + |
| 723 | + |
553 | 724 | class DebuggerController: |
554 | 725 | """ |
555 | 726 | The ``DebuggerController`` object is the core of the debugger. Most debugger operations can be performed on it. |
@@ -1773,6 +1944,104 @@ def navigate_to_timestamp(self, timestamp_str): |
1773 | 1944 | binaryninja.log_error(f"Invalid timestamp format: {e}") |
1774 | 1945 | return False |
1775 | 1946 |
|
| 1947 | + def get_ttd_memory_access_for_address(self, address: int, size: int, access_type = DebuggerTTDMemoryAccessType.DebuggerTTDMemoryRead) -> List[TTDMemoryEvent]: |
| 1948 | + """ |
| 1949 | + Get TTD memory access events for a specific address range. |
| 1950 | +
|
| 1951 | + This method is only available when debugging with TTD (Time Travel Debugging). |
| 1952 | + Use the is_ttd property to check if TTD is available before calling this method. |
| 1953 | +
|
| 1954 | + :param address: starting memory address to query |
| 1955 | + :param size: size of memory region to query |
| 1956 | + :param access_type: type of memory access to query - can be: |
| 1957 | + - DebuggerTTDMemoryAccessType enum values |
| 1958 | + - String specification like "r", "w", "e", "rw", "rwe", etc. |
| 1959 | + - Integer values (for backward compatibility) |
| 1960 | + :return: list of TTDMemoryEvent objects |
| 1961 | + :raises: May raise an exception if TTD is not available |
| 1962 | + """ |
| 1963 | + # Parse access type if it's a string |
| 1964 | + parsed_access_type = parse_ttd_access_type(access_type) |
| 1965 | + |
| 1966 | + count = ctypes.c_ulonglong() |
| 1967 | + events = dbgcore.BNDebuggerGetTTDMemoryAccessForAddress(self.handle, address, size, parsed_access_type, count) |
| 1968 | + |
| 1969 | + if not events: |
| 1970 | + return [] |
| 1971 | + |
| 1972 | + result = [] |
| 1973 | + for i in range(count.value): |
| 1974 | + event = events[i] |
| 1975 | + time_start = TTDPosition(event.timeStart.sequence, event.timeStart.step) |
| 1976 | + time_end = TTDPosition(event.timeEnd.sequence, event.timeEnd.step) |
| 1977 | + |
| 1978 | + memory_event = TTDMemoryEvent( |
| 1979 | + event_type=event.eventType if event.eventType else "", |
| 1980 | + thread_id=event.threadId, |
| 1981 | + unique_thread_id=event.uniqueThreadId, |
| 1982 | + time_start=time_start, |
| 1983 | + time_end=time_end, |
| 1984 | + address=event.address, |
| 1985 | + size=event.size, |
| 1986 | + memory_address=event.memoryAddress, |
| 1987 | + instruction_address=event.instructionAddress, |
| 1988 | + value=event.value, |
| 1989 | + access_type=event.accessType |
| 1990 | + ) |
| 1991 | + result.append(memory_event) |
| 1992 | + |
| 1993 | + dbgcore.BNDebuggerFreeTTDMemoryEvents(events, count.value) |
| 1994 | + return result |
| 1995 | + |
| 1996 | + def get_ttd_calls_for_symbols(self, symbols: str, start_return_address: int = 0, end_return_address: int = 0) -> List[TTDCallEvent]: |
| 1997 | + """ |
| 1998 | + Get TTD call events for specific symbols/functions. |
| 1999 | +
|
| 2000 | + This method is only available when debugging with TTD (Time Travel Debugging). |
| 2001 | + Use the is_ttd property to check if TTD is available before calling this method. |
| 2002 | +
|
| 2003 | + :param symbols: symbol or function name to query (e.g., "MessageBoxA", "CreateFileA") |
| 2004 | + :param start_return_address: optional start return address filter (0 = no filter) |
| 2005 | + :param end_return_address: optional end return address filter (0 = no filter) |
| 2006 | + :return: list of TTDCallEvent objects |
| 2007 | + :raises: May raise an exception if TTD is not available |
| 2008 | + """ |
| 2009 | + count = ctypes.c_ulonglong() |
| 2010 | + events = dbgcore.BNDebuggerGetTTDCallsForSymbols(self.handle, symbols, start_return_address, end_return_address, count) |
| 2011 | + |
| 2012 | + if not events: |
| 2013 | + return [] |
| 2014 | + |
| 2015 | + result = [] |
| 2016 | + for i in range(count.value): |
| 2017 | + event = events[i] |
| 2018 | + time_start = TTDPosition(event.timeStart.sequence, event.timeStart.step) |
| 2019 | + time_end = TTDPosition(event.timeEnd.sequence, event.timeEnd.step) |
| 2020 | + |
| 2021 | + # Convert parameters array to Python list |
| 2022 | + parameters = [] |
| 2023 | + if event.parameters and event.parameterCount > 0: |
| 2024 | + for j in range(event.parameterCount): |
| 2025 | + parameters.append(event.parameters[j]) |
| 2026 | + |
| 2027 | + call_event = TTDCallEvent( |
| 2028 | + event_type=event.eventType if event.eventType else "", |
| 2029 | + thread_id=event.threadId, |
| 2030 | + unique_thread_id=event.uniqueThreadId, |
| 2031 | + function=event.function if event.function else "", |
| 2032 | + function_address=event.functionAddress, |
| 2033 | + return_address=event.returnAddress, |
| 2034 | + return_value=event.returnValue, |
| 2035 | + has_return_value=event.hasReturnValue, |
| 2036 | + parameters=parameters, |
| 2037 | + time_start=time_start, |
| 2038 | + time_end=time_end |
| 2039 | + ) |
| 2040 | + result.append(call_event) |
| 2041 | + |
| 2042 | + dbgcore.BNDebuggerFreeTTDCallEvents(events, count.value) |
| 2043 | + return result |
| 2044 | + |
1776 | 2045 | def __del__(self): |
1777 | 2046 | if dbgcore is not None: |
1778 | 2047 | dbgcore.BNDebuggerFreeController(self.handle) |
|
0 commit comments