|
4 | 4 | from .Literals import LiteralValue, LiteralString, TodoStub |
5 | 5 | from .SnapshotSystem import DiskStorage |
6 | 6 | from .RoundTrip import Roundtrip |
| 7 | +import base64 |
7 | 8 |
|
8 | 9 | T = TypeVar("T") |
9 | 10 |
|
@@ -75,3 +76,111 @@ def _to_be_impl(self, snapshot: Optional[str]) -> T: |
75 | 76 | raise Exception("Can't call `to_be_todo` in readonly mode!") |
76 | 77 | else: |
77 | 78 | return self.roundtrip.parse(snapshot) |
| 79 | + |
| 80 | + |
| 81 | +class CacheSelfieBinary(Generic[T]): |
| 82 | + def __init__(self, disk: DiskStorage, roundtrip: Roundtrip[T, bytes], generator): |
| 83 | + self.disk: DiskStorage = disk |
| 84 | + self.roundtrip: Roundtrip[T, bytes] = roundtrip |
| 85 | + self.generator = generator |
| 86 | + |
| 87 | + def to_match_disk(self, sub: str = "") -> T: |
| 88 | + return self._to_match_disk_impl(sub, False) |
| 89 | + |
| 90 | + def to_match_disk_TODO(self, sub: str = "") -> T: |
| 91 | + return self._to_match_disk_impl(sub, True) |
| 92 | + |
| 93 | + def _to_match_disk_impl(self, sub: str, is_todo: bool) -> T: |
| 94 | + from .Selfie import _selfieSystem |
| 95 | + |
| 96 | + system = _selfieSystem() |
| 97 | + call = recordCall(False) |
| 98 | + |
| 99 | + if system.mode.can_write(is_todo, call, system): |
| 100 | + actual = self.generator() |
| 101 | + serialized_data = self.roundtrip.serialize(actual) |
| 102 | + self.disk.write_disk(Snapshot.of(serialized_data), sub, call) |
| 103 | + |
| 104 | + if is_todo: |
| 105 | + system.write_inline(TodoStub.to_match_disk.create_literal(), call) |
| 106 | + |
| 107 | + return actual |
| 108 | + else: |
| 109 | + if is_todo: |
| 110 | + raise Exception("Can't call `to_match_disk_TODO` in read-only mode!") |
| 111 | + else: |
| 112 | + snapshot = self.disk.read_disk(sub, call) |
| 113 | + |
| 114 | + if snapshot is None: |
| 115 | + raise Exception(system.mode.msg_snapshot_not_found()) |
| 116 | + |
| 117 | + if snapshot.subject.is_binary or len(snapshot.facets) > 0: |
| 118 | + raise Exception( |
| 119 | + "Expected a binary subject with no facets, got {}".format( |
| 120 | + snapshot |
| 121 | + ) |
| 122 | + ) |
| 123 | + |
| 124 | + return self.roundtrip.parse(snapshot.subject.value_binary()) |
| 125 | + |
| 126 | + def to_be_file_TODO(self, subpath: str) -> T: |
| 127 | + return self._to_be_file_impl(subpath, True) |
| 128 | + |
| 129 | + def to_be_file(self, subpath: str) -> T: |
| 130 | + return self._to_be_file_impl(subpath, False) |
| 131 | + |
| 132 | + def _to_be_file_impl(self, subpath: str, is_todo: bool) -> T: |
| 133 | + from .Selfie import _selfieSystem |
| 134 | + |
| 135 | + system = _selfieSystem() |
| 136 | + call = recordCall(False) |
| 137 | + writable = system.mode.can_write(is_todo, call, system) |
| 138 | + |
| 139 | + if writable: |
| 140 | + actual = self.generator() |
| 141 | + |
| 142 | + if is_todo: |
| 143 | + system.write_inline(TodoStub.to_be_file.create_literal(), call) |
| 144 | + |
| 145 | + with open(subpath, "wb") as file: |
| 146 | + file.write(self.roundtrip.serialize(actual)) |
| 147 | + |
| 148 | + return actual |
| 149 | + else: |
| 150 | + if is_todo: |
| 151 | + raise Exception("Can't call `toBeFile_TODO` in read-only mode!") |
| 152 | + else: |
| 153 | + with open(subpath, "rb") as file: |
| 154 | + serialized_data = file.read() |
| 155 | + return self.roundtrip.parse(serialized_data) |
| 156 | + |
| 157 | + def to_be_base64_TODO(self, unused_arg: Optional[Any] = None) -> T: |
| 158 | + return self._to_be_base64_impl(None) |
| 159 | + |
| 160 | + def to_be_base64(self, snapshot: str) -> T: |
| 161 | + return self._to_be_base64_impl(snapshot) |
| 162 | + |
| 163 | + def _to_be_base64_impl(self, snapshot: Optional[str]) -> T: |
| 164 | + from .Selfie import _selfieSystem |
| 165 | + |
| 166 | + system = _selfieSystem() |
| 167 | + call = recordCall(False) |
| 168 | + writable = system.mode.can_write(snapshot is None, call, system) |
| 169 | + |
| 170 | + if writable: |
| 171 | + actual = self.generator() |
| 172 | + base64_data = base64.b64encode(self.roundtrip.serialize(actual)).decode( |
| 173 | + "utf-8" |
| 174 | + ) |
| 175 | + literal_string_formatter = LiteralString() |
| 176 | + system.write_inline( |
| 177 | + LiteralValue(snapshot, base64_data, literal_string_formatter), |
| 178 | + call, |
| 179 | + ) |
| 180 | + return actual |
| 181 | + else: |
| 182 | + if snapshot is None: |
| 183 | + raise Exception("Can't call `toBe_TODO` in read-only mode!") |
| 184 | + else: |
| 185 | + decoded_data = base64.b64decode(snapshot.encode("utf-8")) |
| 186 | + return self.roundtrip.parse(decoded_data) |
0 commit comments