|
2 | 2 | from argparse import Namespace |
3 | 3 | from pathlib import Path |
4 | 4 | from types import TracebackType |
5 | | -from typing import TYPE_CHECKING, Any, Callable, Dict, Iterable, Optional, Tuple, Type |
| 5 | +from typing import ( |
| 6 | + TYPE_CHECKING, |
| 7 | + Any, |
| 8 | + Callable, |
| 9 | + Dict, |
| 10 | + Iterable, |
| 11 | + Iterator, |
| 12 | + Optional, |
| 13 | + Tuple, |
| 14 | + Type, |
| 15 | +) |
6 | 16 |
|
7 | 17 | import numpy as np |
8 | 18 | import wkw |
@@ -243,15 +253,46 @@ def write( |
243 | 253 | current_mag_bbox = mag1_bbox.in_mag(self._mag) |
244 | 254 |
|
245 | 255 | if self._is_compressed(): |
246 | | - current_mag_bbox, data = self._handle_compressed_write( |
| 256 | + for current_mag_bbox, chunked_data in self._prepare_compressed_write( |
247 | 257 | current_mag_bbox, data |
248 | | - ) |
| 258 | + ): |
| 259 | + self._array.write(current_mag_bbox.topleft, chunked_data) |
| 260 | + else: |
| 261 | + self._array.write(current_mag_bbox.topleft, data) |
249 | 262 |
|
250 | | - self._array.write(current_mag_bbox.topleft, data) |
| 263 | + def _prepare_compressed_write( |
| 264 | + self, current_mag_bbox: BoundingBox, data: np.ndarray |
| 265 | + ) -> Iterator[Tuple[BoundingBox, np.ndarray]]: |
| 266 | + """This method takes an arbitrary sized chunk of data with an accompanying bbox, |
| 267 | + divides these into chunks of shard_shape size and delegates |
| 268 | + the preparation to _prepare_compressed_write_chunk.""" |
| 269 | + |
| 270 | + chunked_bboxes = current_mag_bbox.chunk( |
| 271 | + self.info.shard_shape, |
| 272 | + chunk_border_alignments=self.info.shard_shape, |
| 273 | + ) |
| 274 | + for chunked_bbox in chunked_bboxes: |
| 275 | + source_slice: Any |
| 276 | + if len(data.shape) == 3: |
| 277 | + source_slice = chunked_bbox.offset( |
| 278 | + -current_mag_bbox.topleft |
| 279 | + ).to_slices() |
| 280 | + else: |
| 281 | + source_slice = (slice(None, None),) + chunked_bbox.offset( |
| 282 | + -current_mag_bbox.topleft |
| 283 | + ).to_slices() |
251 | 284 |
|
252 | | - def _handle_compressed_write( |
| 285 | + yield self._prepare_compressed_write_chunk(chunked_bbox, data[source_slice]) |
| 286 | + |
| 287 | + def _prepare_compressed_write_chunk( |
253 | 288 | self, current_mag_bbox: BoundingBox, data: np.ndarray |
254 | 289 | ) -> Tuple[BoundingBox, np.ndarray]: |
| 290 | + """This method takes an arbitrary sized chunk of data with an accompanying bbox |
| 291 | + (ideally not larger than a shard) and enlarges that chunk to fit the shard it |
| 292 | + resides in (by reading the entire shard data and writing the passed data ndarray |
| 293 | + into the specified volume). That way, the returned data can be written as a whole |
| 294 | + shard which is a requirement for compressed writes.""" |
| 295 | + |
255 | 296 | aligned_bbox = current_mag_bbox.align_with_mag(self.info.shard_shape, ceil=True) |
256 | 297 |
|
257 | 298 | if current_mag_bbox != aligned_bbox: |
|
0 commit comments