|
| 1 | +from __future__ import annotations |
| 2 | + |
| 3 | +import urllib.parse |
| 4 | +import zipfile |
| 5 | +from functools import cached_property |
| 6 | +from typing import TYPE_CHECKING, BinaryIO |
| 7 | + |
| 8 | +from dissect.evidence.aff4.metadata import DiskImage, FileImage, Information, Object, ValueType |
| 9 | +from dissect.evidence.aff4.util import parse_turtle |
| 10 | + |
| 11 | +if TYPE_CHECKING: |
| 12 | + import pathlib |
| 13 | + |
| 14 | +MAX_OPEN_SEGMENTS = 128 |
| 15 | + |
| 16 | + |
| 17 | +class AFF4: |
| 18 | + """AFF4 evidence container. |
| 19 | +
|
| 20 | + Args: |
| 21 | + fh: A file-like object, ``pathlib.Path`` or a list of those representing the AFF4 segments. |
| 22 | + """ |
| 23 | + |
| 24 | + def __init__(self, fh: BinaryIO | list[BinaryIO] | pathlib.Path | list[pathlib.Path]): |
| 25 | + fhs = [fh] if not isinstance(fh, list) else fh |
| 26 | + |
| 27 | + self.fh = fhs |
| 28 | + self._segments: dict[str, Segment] = {} |
| 29 | + self._segment_lru = [] |
| 30 | + self._segment_map: dict[str, int] = {} |
| 31 | + |
| 32 | + all_information: dict[str, Object] = {} |
| 33 | + |
| 34 | + for idx in range(len(self.fh)): |
| 35 | + segment = self.segment(idx) |
| 36 | + |
| 37 | + self._segment_map[segment.uri] = idx |
| 38 | + for key, value in segment.information.items(): |
| 39 | + if key in all_information and len(value) < len(all_information[key]): |
| 40 | + continue |
| 41 | + all_information[key] = value |
| 42 | + |
| 43 | + self.information = Information(self, all_information) |
| 44 | + |
| 45 | + def segment(self, idx: int | str) -> Segment: |
| 46 | + """Open a segment by index or URI. |
| 47 | +
|
| 48 | + Implements a simple LRU cache to limit the number of open segments. |
| 49 | +
|
| 50 | + Args: |
| 51 | + idx: Index or URI of the segment to open. |
| 52 | +
|
| 53 | + Returns: |
| 54 | + The opened :class:`Segment` object. |
| 55 | + """ |
| 56 | + if isinstance(idx, str): |
| 57 | + idx = self._segment_map[idx] |
| 58 | + |
| 59 | + # Poor mans LRU |
| 60 | + if idx in self._segments: |
| 61 | + self._segment_lru.remove(idx) |
| 62 | + self._segment_lru.append(idx) |
| 63 | + return self._segments[idx] |
| 64 | + |
| 65 | + if len(self._segment_lru) >= MAX_OPEN_SEGMENTS: |
| 66 | + oldest_idx = self._segment_lru.pop(0) |
| 67 | + oldest_segment = self._segments.pop(oldest_idx) |
| 68 | + |
| 69 | + # Don't close it if we received it as a file-like object |
| 70 | + if hasattr(oldest_segment.fh, "rb") and not hasattr(self.fh[oldest_idx], "read"): |
| 71 | + oldest_segment.fh.close() |
| 72 | + |
| 73 | + del oldest_segment |
| 74 | + |
| 75 | + fh = self.fh[idx] |
| 76 | + if not hasattr(fh, "read") and fh.is_file(): |
| 77 | + fh = fh.open("rb") |
| 78 | + |
| 79 | + segment = Segment(self, fh) |
| 80 | + |
| 81 | + self._segments[idx] = segment |
| 82 | + self._segment_lru.append(idx) |
| 83 | + |
| 84 | + return segment |
| 85 | + |
| 86 | + def disks(self) -> list[DiskImage]: |
| 87 | + """List all disk images in the AFF4 evidence.""" |
| 88 | + return list(self.information.find("DiskImage")) |
| 89 | + |
| 90 | + def files(self) -> list[FileImage]: |
| 91 | + """List all file images in the AFF4 evidence.""" |
| 92 | + return list(self.information.find("FileImage")) |
| 93 | + |
| 94 | + |
| 95 | +class Segment: |
| 96 | + """AFF4 segment. |
| 97 | +
|
| 98 | + Args: |
| 99 | + aff4: The parent :class:`AFF4` object. |
| 100 | + fh: A file-like object or ``pathlib.Path`` representing the segment. |
| 101 | + """ |
| 102 | + |
| 103 | + def __init__(self, aff4: AFF4, fh: BinaryIO | pathlib.Path): |
| 104 | + self.aff4 = aff4 |
| 105 | + self.fh = fh |
| 106 | + self._zip = None |
| 107 | + |
| 108 | + if hasattr(self.fh, "read"): |
| 109 | + self._zip = zipfile.ZipFile(self.fh) |
| 110 | + self.path = zipfile.Path(self._zip) |
| 111 | + else: |
| 112 | + self.path = fh |
| 113 | + |
| 114 | + @cached_property |
| 115 | + def uri(self) -> str: |
| 116 | + """Return the URI of the segment.""" |
| 117 | + if (path := self.path.joinpath("container.description")).exists(): |
| 118 | + return path.read_text() |
| 119 | + |
| 120 | + if self._zip and self._zip.comment: |
| 121 | + return self._zip.comment.split(b"\x00", 1).decode() |
| 122 | + |
| 123 | + raise ValueError("No URI found in segment") |
| 124 | + |
| 125 | + @cached_property |
| 126 | + def version(self) -> dict[str, str]: |
| 127 | + """Return the version information of the segment.""" |
| 128 | + if not (path := self.path.joinpath("version.txt")).exists(): |
| 129 | + raise ValueError("No version.txt found in segment") |
| 130 | + |
| 131 | + result = {} |
| 132 | + with path.open("rt") as fh: |
| 133 | + for line in fh: |
| 134 | + if "=" in line: |
| 135 | + key, _, value = line.strip().partition("=") |
| 136 | + result[key] = value |
| 137 | + |
| 138 | + return result |
| 139 | + |
| 140 | + @cached_property |
| 141 | + def information(self) -> dict[str, ValueType]: |
| 142 | + """Return the parsed ``information.turtle`` of the segment.""" |
| 143 | + if not (path := self.path.joinpath("information.turtle")).exists(): |
| 144 | + raise ValueError("No information.turtle found in segment") |
| 145 | + |
| 146 | + with path.open("rt") as fh: |
| 147 | + return parse_turtle(fh) |
| 148 | + |
| 149 | + def get(self, path: str) -> pathlib.Path | zipfile.Path: |
| 150 | + """Resolve a path of a file in the segment. |
| 151 | +
|
| 152 | + Args: |
| 153 | + path: Path to the file in the segment. |
| 154 | +
|
| 155 | + Returns: |
| 156 | + A :class:`Path` or :class:`zipfile.Path` object representing the file. |
| 157 | + """ |
| 158 | + path = path.removeprefix(self.uri) if path.startswith(self.uri) else urllib.parse.quote_plus(path) |
| 159 | + return self.path.joinpath(path) |
0 commit comments