Skip to content

Commit 5234b55

Browse files
committed
Efficient Per-Z Intermediates
- Switched to raw byte saving rather than TiffFiles (smaller, faster). - Appending to single-Z as each entry can be written (and read) sequentially, with file locking. - Temp Files now cleaned up as they are no longer needed. - Indicies now written as squeeze type, and types are passed in the intermediate. - Intermediate folders now PID-tagged and cleaned up when done, to avoid issues with consecutive runs.
1 parent d6232b4 commit 5234b55

File tree

9 files changed

+122
-115
lines changed

9 files changed

+122
-115
lines changed

.coverage

-4 KB
Binary file not shown.

python/ouroboros/helpers/files.py

Lines changed: 35 additions & 34 deletions
Original file line numberDiff line numberDiff line change
@@ -1,12 +1,12 @@
11
from functools import partial
2+
from io import BytesIO
23
from multiprocessing.pool import ThreadPool
34
import os
45

56
import numpy as np
67
from numpy.typing import ArrayLike
78
from pathlib import Path
89
import cv2
9-
from tifffile import TiffWriter, TiffFile
1010
import time
1111

1212
from .shapes import DataShape
@@ -171,60 +171,60 @@ def generate_tiff_write(write_func: callable, compression: str | None, micron_re
171171
**kwargs)
172172

173173

174-
def write_small_intermediate(file_path: os.PathLike, *series):
175-
with TiffWriter(file_path, append=True) as tif:
176-
for entry in series:
177-
tif.write(entry, dtype=entry.dtype)
174+
def write_raw_intermediate(target: BytesIO, *series):
175+
for entry in series:
176+
target.write(entry)
177+
return target.tell()
178178

179179

180180
def ravel_map_2d(index, source_rows, target_rows, offset):
181181
return np.add.reduce(np.add(np.divmod(index, source_rows), offset) * ((target_rows, ), (np.uint32(1), )))
182182

183183

184-
def load_z_intermediate(path: Path, offset: int = 0):
185-
with TiffFile(path) as tif:
186-
meta = tif.series[offset].asarray()
187-
source_rows, target_rows, offset_rows, offset_columns = meta
188-
return (ravel_map_2d(tif.series[offset + 1].asarray(),
189-
source_rows, target_rows,
190-
((offset_rows, ), (offset_columns, ))),
191-
tif.series[offset + 2].asarray(),
192-
tif.series[offset + 3].asarray())
184+
def load_raw_file_intermediate(handle: BytesIO):
185+
meta = np.fromfile(handle, np.uint32, 6)
186+
source_rows, target_rows, offset_rows, offset_columns, channel_count, data_length = meta
187+
t_index, t_value, t_weight = [np.dtype(code.decode()).type for code in np.fromfile(handle, 'S8', 3)]
188+
return (ravel_map_2d(np.fromfile(handle, t_index, data_length),
189+
source_rows, target_rows,
190+
((offset_rows, ), (offset_columns, ))),
191+
np.fromfile(handle, t_value, data_length * channel_count).reshape(-1, data_length),
192+
np.fromfile(handle, t_weight, data_length))
193193

194194

195195
def increment_volume(path: Path, vol: np.ndarray, offset: int = 0, cleanup=False):
196-
indicies, values, weights = load_z_intermediate(path, offset)
197-
for i in range(0, vol.shape[0] - 1):
198-
np.add.at(vol[i], indicies, np.atleast_2d(values)[i])
199-
np.add.at(vol[-1], indicies, weights)
196+
if isinstance(path, Path):
197+
with open(path, "rb") as handle:
198+
end = os.fstat(handle.fileno()).st_size
199+
handle.seek(offset)
200+
while handle.tell() < end:
201+
indicies, values, weights = load_raw_file_intermediate(handle)
202+
for i in range(0, vol.shape[0] - 1):
203+
np.add.at(vol[i], indicies, np.atleast_2d(values)[i])
204+
np.add.at(vol[-1], indicies, weights)
200205

201206
if cleanup:
202207
path.unlink()
203208

204209

205210
def volume_from_intermediates(path: Path, shape: DataShape, thread_count: int = 4):
206-
vol = np.zeros((1 + shape.C, np.prod((shape.Y, shape.X))), dtype=np.float32)
207-
with ThreadPool(thread_count) as pool:
208-
if not path.exists():
209-
# We don't have any intermediate(s) for this value, so return empty.
210-
return vol[0]
211-
elif path.is_dir():
212-
pool.starmap(increment_volume, [(i, vol, 0, False) for i in path.glob("**/*.tif*")])
213-
else:
214-
with TiffFile(path) as tif:
215-
offset_set = range(0, len(tif.series), 4)
216-
pool.starmap(increment_volume, [(path, vol, i, False) for i in offset_set])
211+
vol = np.zeros((2, np.prod((shape.Y, shape.X))), dtype=np.float32)
212+
if path.is_dir():
213+
with ThreadPool(thread_count) as pool:
214+
pool.starmap(increment_volume, [(i, vol, 0, True) for i in path.glob("**/*.tif*")])
215+
elif path.exists():
216+
increment_volume(path, vol, 0, True)
217217

218-
nz = np.flatnonzero(vol[-1])
219-
vol[:-1, nz] /= vol[-1, nz]
220-
return vol[:-1]
218+
nz = np.flatnonzero(vol[0])
219+
vol[0, nz] /= vol[1, nz]
220+
return vol[0]
221221

222222

223223
def write_conv_vol(writer: callable, source_path, shape, dtype, scaling, target_folder, index, interpolation):
224224
perf = {}
225-
start = time.perf_counter()
225+
vol_start = time.perf_counter()
226226
vol = volume_from_intermediates(source_path, shape)
227-
perf["Merge Volume"] = time.perf_counter() - start
227+
perf["Merge Volume"] = time.perf_counter() - vol_start
228228
if scaling is not None:
229229
start = time.perf_counter()
230230
# CV2 is only 2D but we're resizing from the 1D image anyway at the moment.
@@ -241,4 +241,5 @@ def write_conv_vol(writer: callable, source_path, shape, dtype, scaling, target_
241241
writer(target_folder.joinpath(f"{index}.tif"),
242242
data=np_convert(dtype, vol.T.reshape(shape.Y, shape.X, shape.C), normalize=False, safe_bool=True))
243243
perf["Write Merged"] = time.perf_counter() - start
244+
perf["Total Chunk Merge"] = time.perf_counter() - vol_start
244245
return perf

python/ouroboros/helpers/parse.py

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -11,10 +11,10 @@ class CV_FORMAT(Enum):
1111
PRECOMPUTED = ["neuroglancer-precomputed"]
1212
ZARR = ["zarr", "zarr2", "zarr3"]
1313
N5 = ["n5"]
14-
14+
1515
def __str__(self):
1616
return f"{self.name.lower()}://"
17-
17+
1818
@classmethod
1919
def get(cls, suffix):
2020
for e in cls:
@@ -56,8 +56,8 @@ def parse_source(cls, value, handler: ValidatorFunctionWrapHandler) -> str:
5656
split_source = base_source.split("|")
5757
if len(split_source) > 1:
5858
kv_store = split_source[1].split(":")
59-
base_source = f"{CV_FORMAT.get(kv_store[0])}{split_source[0]}{kv_store[1]}"
60-
59+
base_source = f"{CV_FORMAT.get(kv_store[0])}{split_source[0]}{kv_store[1]}"
60+
6161
return SourceModel(url=base_source) if isinstance(source, SourceModel) else base_source
6262

6363

python/ouroboros/helpers/shapes.py

Lines changed: 0 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,6 @@
22
Module containing shapes of data.
33
"""
44
from abc import ABC, abstractmethod
5-
from collections import namedtuple
65
from dataclasses import dataclass, asdict, replace, fields, astuple, make_dataclass, Field, InitVar
76
from functools import cached_property, reduce
87
import operator
@@ -233,10 +232,6 @@ class Z(DataShape): Z: int # noqa: E701,E702
233232
class GenericOrder(DataShape): A: int; B: int; C: int # noqa: E701,E702
234233

235234

236-
# ????
237-
NPString = namedtuple("NPString", 'T')
238-
239-
240235
@dataclass
241236
class DataRange(object):
242237
start: DataShape; stop: DataShape; step: DataShape # noqa: E701,E702

python/ouroboros/helpers/slice.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -219,7 +219,7 @@ def backproject_box(bounding_box: BoundingBox, slice_rects: np.ndarray, slices:
219219
np.add.at(volume[-1], points + point_inc, c_weights)
220220

221221
# Get indicies of the flattened Z-Y-X backprojected domain that have values.
222-
nz_vol = np.flatnonzero(volume[-1])
222+
nz_vol = np.flatnonzero(volume[-1]).astype(squish_type)
223223

224224
# Return indicies and only the volume region with values.
225225
return nz_vol, volume[:-1, nz_vol].squeeze(), volume[-1, nz_vol].squeeze()

python/ouroboros/pipeline/backproject_pipeline.py

Lines changed: 41 additions & 37 deletions
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,7 @@
1111
import time
1212
import traceback
1313

14+
from filelock import FileLock
1415
import numpy as np
1516

1617
from ouroboros.helpers.memory_usage import (
@@ -32,7 +33,7 @@
3233
join_path,
3334
generate_tiff_write,
3435
write_conv_vol,
35-
write_small_intermediate
36+
write_raw_intermediate
3637
)
3738
from ouroboros.helpers.shapes import DataRange, ImgSliceC
3839

@@ -127,19 +128,16 @@ def _process(self, input_data: any) -> tuple[any, None] | tuple[None, any]:
127128

128129
straightened_volume_path = new_straightened_volume_path
129130

130-
# Write huge temp files (need to address)
131131
full_bounding_box = BoundingBox.bound_boxes(volume_cache.bounding_boxes)
132132
write_shape = np.flip(full_bounding_box.get_shape()).tolist()
133-
print(f"\nFront Projection Shape: {FPShape}")
134-
print(f"\nBack Projection Shape (Z/Y/X):{write_shape}")
135133

136134
pipeline_input.output_file_path = (f"{config.output_file_name}_"
137135
f"{'_'.join(map(str, full_bounding_box.get_min(np.uint32)))}")
138136
folder_path = Path(config.output_file_folder, pipeline_input.output_file_path)
139137
folder_path.mkdir(exist_ok=True, parents=True)
140138

141-
i_path = Path(config.output_file_folder,
142-
f"{config.output_file_name}_t_{'_'.join(map(str, full_bounding_box.get_min(np.uint32)))}")
139+
# Intermediate Path
140+
i_path = Path(config.output_file_folder, f"{os.getpid()}_{config.output_file_name}")
143141

144142
if config.make_single_file:
145143
is_big_tiff = calculate_gigabytes_from_dimensions(
@@ -189,12 +187,12 @@ def _process(self, input_data: any) -> tuple[any, None] | tuple[None, any]:
189187
for chunk, _, chunk_rects, _, index in chunk_range.get_iter(chunk_iter):
190188
bp_futures.append(executor.submit(
191189
process_chunk,
192-
config,
193-
straightened_volume_path,
194-
chunk_rects,
195-
chunk,
196-
index,
197-
full_bounding_box
190+
config=config,
191+
straightened_volume_path=straightened_volume_path,
192+
chunk_rects=chunk_rects,
193+
chunk=chunk,
194+
index=index,
195+
full_bounding_box=full_bounding_box
198196
))
199197

200198
# Track what's written.
@@ -206,8 +204,8 @@ def _process(self, input_data: any) -> tuple[any, None] | tuple[None, any]:
206204
def note_written(write_future):
207205
nonlocal pages_written
208206
pages_written += 1
209-
self.update_progress((np.sum(processed) / len(chunk_range)) * (2 / 3)
210-
+ (pages_written / num_pages) * (1 / 3))
207+
self.update_progress((np.sum(processed) / len(chunk_range)) * (exec_procs / self.num_processes)
208+
+ (pages_written / num_pages) * (write_procs / self.num_processes))
211209
for key, value in write_future.result().items():
212210
self.add_timing(key, value)
213211

@@ -222,8 +220,8 @@ def note_written(write_future):
222220

223221
# Update the progress bar
224222
processed[index] = 1
225-
self.update_progress((np.sum(processed) / len(chunk_range)) * (2 / 3)
226-
+ (pages_written / num_pages) * (1 / 3))
223+
self.update_progress((np.sum(processed) / len(chunk_range)) * (exec_procs / self.num_processes)
224+
+ (pages_written / num_pages) * (write_procs / self.num_processes))
227225

228226
update_writable_rects(processed, slice_rects, min_dim, writeable, DEFAULT_CHUNK_SIZE)
229227

@@ -233,14 +231,14 @@ def note_written(write_future):
233231
for index in write:
234232
write_futures.append(write_executor.submit(
235233
write_conv_vol,
236-
tif_write(tifffile.imwrite),
237-
i_path.joinpath(f"i_{index:05}"),
238-
ImgSliceC(*write_shape[1:], channels),
239-
bool if config.make_backprojection_binary else np.uint16,
240-
scaling_factors,
241-
folder_path,
242-
index,
243-
config.upsample_order
234+
writer=tif_write(tifffile.imwrite),
235+
source_path=i_path.joinpath(f"i_{index:05}.dat"),
236+
shape=ImgSliceC(*write_shape[1:], channels),
237+
dtype=bool if config.make_backprojection_binary else np.uint16,
238+
scaling=scaling_factors,
239+
target_folder=folder_path,
240+
index=index,
241+
interpolation=config.upsample_order
244242
))
245243
write_futures[-1].add_done_callback(note_written)
246244

@@ -271,8 +269,7 @@ def note_written(write_future):
271269

272270
if config.make_single_file:
273271
shutil.rmtree(folder_path)
274-
shutil.rmtree(Path(config.output_file_folder,
275-
f"{config.output_file_name}_t_{'_'.join(map(str, full_bounding_box.get_min(np.uint32)))}"))
272+
shutil.rmtree(i_path)
276273

277274
return None
278275

@@ -320,7 +317,7 @@ def process_chunk(
320317

321318
if values.nbytes == 0:
322319
# No data to write from this chunk, so return as such.
323-
durations["total_process"] = [time.perf_counter() - start_total]
320+
durations["total_chunk_process"] = [time.perf_counter() - start_total]
324321
return durations, index, []
325322

326323
# Save the data
@@ -336,7 +333,9 @@ def process_chunk(
336333
"target_rows": full_bounding_box.get_shape()[0],
337334
"offset_columns": offset[1],
338335
"offset_rows": offset[2],
336+
"channel_count": np.uint32(1 if len(slices.shape) < 4 else slices.shape[-1]),
339337
}
338+
type_ar = np.array([yx_vals.dtype.str, values.dtype.str, weights.dtype.str], dtype='S8')
340339
durations["split"] = [time.perf_counter() - start]
341340

342341
# Gets slices off full array corresponding to each Z value.
@@ -347,27 +346,32 @@ def process_chunk(
347346
durations["stack"] = [time.perf_counter() - start]
348347
start = time.perf_counter()
349348

350-
file_path = Path(config.output_file_folder,
351-
f"{config.output_file_name}_t_{'_'.join(map(str, full_bounding_box.get_min(np.uint32)))}")
352-
file_path.mkdir(exist_ok=True, parents=True)
349+
i_path = Path(config.output_file_folder, f"{os.getppid()}_{config.output_file_name}")
350+
i_path.mkdir(exist_ok=True, parents=True)
353351

354-
def write_z(i, z_slice):
352+
def write_z(target, z_slice):
353+
write_raw_intermediate(target,
354+
np.fromiter(offset_dict.values(), dtype=np.uint32, count=5).tobytes(),
355+
np.uint32(len(yx_vals[z_slice])).tobytes(),
356+
type_ar.tobytes(),
357+
yx_vals[z_slice].tobytes(), values[z_slice].tobytes(), weights[z_slice].tobytes())
358+
359+
def make_z(i, z_slice):
355360
offset_z = z_stack[i] + offset[0]
356-
file_path.joinpath(f"i_{offset_z:05}").mkdir(exist_ok=True, parents=True)
357-
write_small_intermediate(file_path.joinpath(f"i_{offset_z:05}", f"{index}.tif"),
358-
np.fromiter(offset_dict.values(), dtype=np.uint32, count=4),
359-
yx_vals[z_slice], np.atleast_2d(values)[:, z_slice], weights[z_slice])
361+
z_path = i_path.joinpath(f"i_{offset_z:05}.dat")
362+
with FileLock(z_path.with_suffix(".lock")):
363+
write_z(open(z_path, "ab"), z_slice)
360364

361365
with ThreadPool(12) as pool:
362-
pool.starmap(write_z, enumerate(z_slices))
366+
pool.starmap(make_z, enumerate(z_slices))
363367

364368
durations["write_intermediate"] = [time.perf_counter() - start]
365369
except BaseException as be:
366370
print(f"Error on BP: {be}")
367371
traceback.print_tb(be.__traceback__, file=sys.stderr)
368372
raise be
369373

370-
durations["total_process"] = [time.perf_counter() - start_total]
374+
durations["total_chunk_process"] = [time.perf_counter() - start_total]
371375

372376
return durations, index, z_stack + offset[0]
373377

0 commit comments

Comments
 (0)