|
21 | 21 | import traceback |
22 | 22 | import ctypes |
23 | 23 | import abc |
| 24 | +import io |
| 25 | +import zipfile |
24 | 26 | from typing import List, Optional, Union |
25 | 27 |
|
26 | 28 | # Binary Ninja components |
27 | 29 | import binaryninja |
28 | | -from .log import log_error_for_exception |
| 30 | +from .log import log_error_for_exception, log_error |
29 | 31 | from . import databuffer |
30 | 32 | from . import binaryview |
31 | 33 | from . import metadata |
32 | 34 | from . import _binaryninjacore as core |
33 | | -from .enums import TransformType, TransformResult |
| 35 | +from .enums import TransformCapabilities, TransformResult, TransformType |
| 36 | +from .settings import Settings |
34 | 37 |
|
35 | 38 |
|
36 | 39 | class _TransformMetaClass(type): |
@@ -254,7 +257,7 @@ def _decode_with_context(self, ctxt, context, params, count): |
254 | 257 |
|
255 | 258 | def _can_decode(self, ctxt, input): |
256 | 259 | try: |
257 | | - input_obj = binaryview.BinaryView(handle=core.BNNewViewReference(input)) |
| 260 | + input_obj = binaryview.BinaryView(handle=input) |
258 | 261 | return self.can_decode(input_obj) |
259 | 262 | except: |
260 | 263 | log_error_for_exception("Unhandled Python exception in Transform._can_decode") |
@@ -549,6 +552,11 @@ def transform_result(self) -> TransformResult: |
549 | 552 | """Get the transform result""" |
550 | 553 | return TransformResult(core.BNTransformContextGetTransformResult(self.handle)) |
551 | 554 |
|
| 555 | + @transform_result.setter |
| 556 | + def transform_result(self, result: TransformResult): |
| 557 | + """Set the transform result""" |
| 558 | + core.BNTransformContextSetTransformResult(self.handle, result) |
| 559 | + |
552 | 560 | @property |
553 | 561 | def parent(self) -> Optional['TransformContext']: |
554 | 562 | """Get the parent context""" |
@@ -808,3 +816,137 @@ def set_selected_contexts(self, contexts: Union[List['TransformContext'], 'Trans |
808 | 816 | for i, ctx in enumerate(contexts): |
809 | 817 | context_array[i] = ctx.handle |
810 | 818 | core.BNTransformSessionSetSelectedContexts(self.handle, context_array, len(contexts)) |
| 819 | + |
| 820 | + |
| 821 | +class ZipPython(Transform): |
| 822 | + """ |
| 823 | + ``ZipPython`` is a transform that handles ZIP archive decoding using Python's built-in zipfile module. |
| 824 | + It supports password-protected archives and context-aware extraction of multiple files. It is provided |
| 825 | + as a reference implementation and may not be as performant as native implementations. By default, this |
| 826 | + Transform is not registered; to use it, call `ZipPython.register()`. |
| 827 | + """ |
| 828 | + transform_type = TransformType.DecodeTransform |
| 829 | + capabilities = TransformCapabilities.TransformSupportsDetection | TransformCapabilities.TransformSupportsContext |
| 830 | + name = "ZipPython" |
| 831 | + long_name = "Zip (Python)" |
| 832 | + group = "Container" |
| 833 | + |
| 834 | + def can_decode(self, input) -> bool: |
| 835 | + try: |
| 836 | + head = input.read(0, 4) |
| 837 | + if len(head) < 4 or head[0:2] != b"PK": |
| 838 | + return False |
| 839 | + signature = head[2] | (head[3] << 8) |
| 840 | + return signature in (0x0403, 0x0201, 0x0605, 0x0708, 0x0606, 0x0706, 0x0505) |
| 841 | + except Exception: |
| 842 | + log_error("ZipPython: failed to read from BinaryView for signature check") |
| 843 | + return False |
| 844 | + |
| 845 | + def perform_decode(self, data: bytes, params: dict) -> bytes | None: |
| 846 | + try: |
| 847 | + zf = zipfile.ZipFile(io.BytesIO(data), "r") |
| 848 | + except Exception: |
| 849 | + log_error("ZipPython: failed to open data as ZIP") |
| 850 | + return None |
| 851 | + |
| 852 | + filename = None |
| 853 | + if "filename" in params: |
| 854 | + p = params["filename"] |
| 855 | + filename = p.decode("utf-8", "replace") if isinstance(p, (bytes, bytearray)) else str(p) |
| 856 | + elif zf.namelist(): |
| 857 | + filename = zf.namelist()[0] |
| 858 | + try: |
| 859 | + if filename: |
| 860 | + with zf.open(filename, "r") as f: |
| 861 | + return f.read() |
| 862 | + except (KeyError, RuntimeError, zipfile.BadZipFile, zipfile.LargeZipFile): |
| 863 | + log_error(f"ZipPython: failed to extract member '{filename}' from ZIP") |
| 864 | + return None |
| 865 | + |
| 866 | + def perform_encode(self, data, params): |
| 867 | + return None |
| 868 | + |
| 869 | + def perform_decode_with_context(self, context, params) -> bool: |
| 870 | + """ |
| 871 | + ``perform_decode_with_context`` implements context-aware ZIP extraction. |
| 872 | + Two-phase flow: |
| 873 | + - Phase 1 (discovery): populate available_files, return False. |
| 874 | + - Phase 2 (extraction): for each requested file, create a child context and return |
| 875 | + True if all succeeded, False otherwise. |
| 876 | + """ |
| 877 | + try: |
| 878 | + zf = zipfile.ZipFile(io.BytesIO(context.input.read(0, context.input.length)), "r") |
| 879 | + except Exception: |
| 880 | + context.transform_result = TransformResult.TransformFailure |
| 881 | + log_error(f"ZipPython: failed to open context input as ZIP: len={context.input.length}") |
| 882 | + return False |
| 883 | + |
| 884 | + # Build the file list (non-directories only) |
| 885 | + files: List[str] = [n for n in zf.namelist() if not n.endswith("/")] |
| 886 | + |
| 887 | + # Phase 1: discovery |
| 888 | + if not context.has_available_files: |
| 889 | + context.set_available_files(files) |
| 890 | + return False |
| 891 | + |
| 892 | + # Phase 2: extraction |
| 893 | + requested = context.requested_files |
| 894 | + if not requested: |
| 895 | + return False |
| 896 | + |
| 897 | + passwords = Settings().get_string_list('files.container.defaultPasswords') |
| 898 | + if "password" in params: |
| 899 | + p = params["password"] |
| 900 | + passwords.insert(0, p.decode("utf-8", "replace") if isinstance(p, (bytes, bytearray)) else str(p)) |
| 901 | + passwords = [None] + passwords |
| 902 | + |
| 903 | + complete = True |
| 904 | + for name in requested: |
| 905 | + if name not in files: |
| 906 | + msg = f"Requested file '{name}' not found in ZIP" |
| 907 | + context.create_child(databuffer.DataBuffer(b""), name, result=TransformResult.TransformFailure, message=msg) |
| 908 | + complete = False |
| 909 | + continue |
| 910 | + |
| 911 | + content = None |
| 912 | + error = None |
| 913 | + successful_password = None |
| 914 | + for password in passwords: |
| 915 | + try: |
| 916 | + if password is None: |
| 917 | + with zf.open(name, "r") as f: |
| 918 | + content = f.read() |
| 919 | + else: |
| 920 | + pwd = password.encode('utf-8') if isinstance(password, str) else password |
| 921 | + with zf.open(name, "r", pwd=pwd) as f: |
| 922 | + content = f.read() |
| 923 | + successful_password = password |
| 924 | + break |
| 925 | + |
| 926 | + except RuntimeError as e: |
| 927 | + error = e |
| 928 | + if 'password' not in str(e).lower() and 'encrypted' not in str(e).lower(): |
| 929 | + break |
| 930 | + except Exception as e: |
| 931 | + error = e |
| 932 | + break |
| 933 | + |
| 934 | + if successful_password is not None and successful_password in passwords: |
| 935 | + passwords.remove(successful_password) |
| 936 | + passwords.insert(0, successful_password) |
| 937 | + |
| 938 | + if content is not None: |
| 939 | + context.create_child(databuffer.DataBuffer(content), name) |
| 940 | + else: |
| 941 | + if error is None: |
| 942 | + error = RuntimeError(f"Failed to decrypt '{name}' with any provided password") |
| 943 | + if isinstance(error, RuntimeError) and 'password' in str(error).lower(): |
| 944 | + transformresult = TransformResult.TransformRequiresPassword |
| 945 | + else: |
| 946 | + transformresult = TransformResult.TransformFailure |
| 947 | + log_error(f"ZipPython: failed to extract requested file '{name}': {error}") |
| 948 | + |
| 949 | + context.create_child(databuffer.DataBuffer(b""), name, result=transformresult, message=str(error)) |
| 950 | + complete = False |
| 951 | + |
| 952 | + return complete |
0 commit comments