Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Binary file modified .coverage
Binary file not shown.
3 changes: 2 additions & 1 deletion python/Dockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,8 @@ ENV POETRY_NO_INTERACTION=1 \
RUN apt-get update -y
RUN apt-get install \
gcc \
libpq-dev
libpq-dev \
cargo

WORKDIR /app

Expand Down
3 changes: 2 additions & 1 deletion python/Dockerfile-prod
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,8 @@ FROM thehale/python-poetry:2.1.3-py3.11-slim as python-base
RUN apt-get update -y
RUN apt-get install -y \
gcc \
libpq-dev
libpq-dev \
cargo

COPY ./dist/*.whl ./

Expand Down
69 changes: 35 additions & 34 deletions python/ouroboros/helpers/files.py
Original file line number Diff line number Diff line change
@@ -1,12 +1,12 @@
from functools import partial
from io import BytesIO
from multiprocessing.pool import ThreadPool
import os

import numpy as np
from numpy.typing import ArrayLike
from pathlib import Path
import cv2
from tifffile import TiffWriter, TiffFile
import time

from .shapes import DataShape
Expand Down Expand Up @@ -171,60 +171,60 @@ def generate_tiff_write(write_func: callable, compression: str | None, micron_re
**kwargs)


def write_small_intermediate(file_path: os.PathLike, *series):
with TiffWriter(file_path, append=True) as tif:
for entry in series:
tif.write(entry, dtype=entry.dtype)
def write_raw_intermediate(target: BytesIO, *series):
for entry in series:
target.write(entry)
return target.tell()


def ravel_map_2d(index, source_rows, target_rows, offset):
return np.add.reduce(np.add(np.divmod(index, source_rows), offset) * ((target_rows, ), (np.uint32(1), )))


def load_z_intermediate(path: Path, offset: int = 0):
with TiffFile(path) as tif:
meta = tif.series[offset].asarray()
source_rows, target_rows, offset_rows, offset_columns = meta
return (ravel_map_2d(tif.series[offset + 1].asarray(),
source_rows, target_rows,
((offset_rows, ), (offset_columns, ))),
tif.series[offset + 2].asarray(),
tif.series[offset + 3].asarray())
def load_raw_file_intermediate(handle: BytesIO):
meta = np.fromfile(handle, np.uint32, 6)
source_rows, target_rows, offset_rows, offset_columns, channel_count, data_length = meta
t_index, t_value, t_weight = [np.dtype(code.decode()).type for code in np.fromfile(handle, 'S8', 3)]
return (ravel_map_2d(np.fromfile(handle, t_index, data_length),
source_rows, target_rows,
((offset_rows, ), (offset_columns, ))),
np.fromfile(handle, t_value, data_length * channel_count).reshape(-1, data_length),
np.fromfile(handle, t_weight, data_length))


def increment_volume(path: Path, vol: np.ndarray, offset: int = 0, cleanup=False):
indicies, values, weights = load_z_intermediate(path, offset)
for i in range(0, vol.shape[0] - 1):
np.add.at(vol[i], indicies, np.atleast_2d(values)[i])
np.add.at(vol[-1], indicies, weights)
if isinstance(path, Path):
with open(path, "rb") as handle:
end = os.fstat(handle.fileno()).st_size
handle.seek(offset)
while handle.tell() < end:
indicies, values, weights = load_raw_file_intermediate(handle)
for i in range(0, vol.shape[0] - 1):
np.add.at(vol[i], indicies, np.atleast_2d(values)[i])
np.add.at(vol[-1], indicies, weights)

if cleanup:
path.unlink()


def volume_from_intermediates(path: Path, shape: DataShape, thread_count: int = 4):
vol = np.zeros((1 + shape.C, np.prod((shape.Y, shape.X))), dtype=np.float32)
with ThreadPool(thread_count) as pool:
if not path.exists():
# We don't have any intermediate(s) for this value, so return empty.
return vol[0]
elif path.is_dir():
pool.starmap(increment_volume, [(i, vol, 0, False) for i in path.glob("**/*.tif*")])
else:
with TiffFile(path) as tif:
offset_set = range(0, len(tif.series), 4)
pool.starmap(increment_volume, [(path, vol, i, False) for i in offset_set])
vol = np.zeros((2, np.prod((shape.Y, shape.X))), dtype=np.float32)
if path.is_dir():
with ThreadPool(thread_count) as pool:
pool.starmap(increment_volume, [(i, vol, 0, True) for i in path.glob("**/*.tif*")])
elif path.exists():
increment_volume(path, vol, 0, True)

nz = np.flatnonzero(vol[-1])
vol[:-1, nz] /= vol[-1, nz]
return vol[:-1]
nz = np.flatnonzero(vol[0])
vol[0, nz] /= vol[1, nz]
return vol[0]


def write_conv_vol(writer: callable, source_path, shape, dtype, scaling, target_folder, index, interpolation):
perf = {}
start = time.perf_counter()
vol_start = time.perf_counter()
vol = volume_from_intermediates(source_path, shape)
perf["Merge Volume"] = time.perf_counter() - start
perf["Merge Volume"] = time.perf_counter() - vol_start
if scaling is not None:
start = time.perf_counter()
# CV2 is only 2D but we're resizing from the 1D image anyway at the moment.
Expand All @@ -241,4 +241,5 @@ def write_conv_vol(writer: callable, source_path, shape, dtype, scaling, target_
writer(target_folder.joinpath(f"{index}.tif"),
data=np_convert(dtype, vol.T.reshape(shape.Y, shape.X, shape.C), normalize=False, safe_bool=True))
perf["Write Merged"] = time.perf_counter() - start
perf["Total Chunk Merge"] = time.perf_counter() - vol_start
return perf
8 changes: 4 additions & 4 deletions python/ouroboros/helpers/parse.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,10 +11,10 @@ class CV_FORMAT(Enum):
PRECOMPUTED = ["neuroglancer-precomputed"]
ZARR = ["zarr", "zarr2", "zarr3"]
N5 = ["n5"]

def __str__(self):
return f"{self.name.lower()}://"

@classmethod
def get(cls, suffix):
for e in cls:
Expand Down Expand Up @@ -56,8 +56,8 @@ def parse_source(cls, value, handler: ValidatorFunctionWrapHandler) -> str:
split_source = base_source.split("|")
if len(split_source) > 1:
kv_store = split_source[1].split(":")
base_source = f"{CV_FORMAT.get(kv_store[0])}{split_source[0]}{kv_store[1]}"
base_source = f"{CV_FORMAT.get(kv_store[0])}{split_source[0]}{kv_store[1]}"

return SourceModel(url=base_source) if isinstance(source, SourceModel) else base_source


Expand Down
5 changes: 0 additions & 5 deletions python/ouroboros/helpers/shapes.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,6 @@
Module containing shapes of data.
"""
from abc import ABC, abstractmethod
from collections import namedtuple
from dataclasses import dataclass, asdict, replace, fields, astuple, make_dataclass, Field, InitVar
from functools import cached_property, reduce
import operator
Expand Down Expand Up @@ -233,10 +232,6 @@ class Z(DataShape): Z: int # noqa: E701,E702
class GenericOrder(DataShape): A: int; B: int; C: int # noqa: E701,E702


# ????
NPString = namedtuple("NPString", 'T')


@dataclass
class DataRange(object):
start: DataShape; stop: DataShape; step: DataShape # noqa: E701,E702
Expand Down
2 changes: 1 addition & 1 deletion python/ouroboros/helpers/slice.py
Original file line number Diff line number Diff line change
Expand Up @@ -219,7 +219,7 @@ def backproject_box(bounding_box: BoundingBox, slice_rects: np.ndarray, slices:
np.add.at(volume[-1], points + point_inc, c_weights)

# Get indicies of the flattened Z-Y-X backprojected domain that have values.
nz_vol = np.flatnonzero(volume[-1])
nz_vol = np.flatnonzero(volume[-1]).astype(squish_type)

# Return indicies and only the volume region with values.
return nz_vol, volume[:-1, nz_vol].squeeze(), volume[-1, nz_vol].squeeze()
Expand Down
78 changes: 41 additions & 37 deletions python/ouroboros/pipeline/backproject_pipeline.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@
import time
import traceback

from filelock import FileLock
import numpy as np

from ouroboros.helpers.memory_usage import (
Expand All @@ -32,7 +33,7 @@
join_path,
generate_tiff_write,
write_conv_vol,
write_small_intermediate
write_raw_intermediate
)
from ouroboros.helpers.shapes import DataRange, ImgSliceC

Expand Down Expand Up @@ -127,19 +128,16 @@ def _process(self, input_data: any) -> tuple[any, None] | tuple[None, any]:

straightened_volume_path = new_straightened_volume_path

# Write huge temp files (need to address)
full_bounding_box = BoundingBox.bound_boxes(volume_cache.bounding_boxes)
write_shape = np.flip(full_bounding_box.get_shape()).tolist()
print(f"\nFront Projection Shape: {FPShape}")
print(f"\nBack Projection Shape (Z/Y/X):{write_shape}")

pipeline_input.output_file_path = (f"{config.output_file_name}_"
f"{'_'.join(map(str, full_bounding_box.get_min(np.uint32)))}")
folder_path = Path(config.output_file_folder, pipeline_input.output_file_path)
folder_path.mkdir(exist_ok=True, parents=True)

i_path = Path(config.output_file_folder,
f"{config.output_file_name}_t_{'_'.join(map(str, full_bounding_box.get_min(np.uint32)))}")
# Intermediate Path
i_path = Path(config.output_file_folder, f"{os.getpid()}_{config.output_file_name}")

if config.make_single_file:
is_big_tiff = calculate_gigabytes_from_dimensions(
Expand Down Expand Up @@ -189,12 +187,12 @@ def _process(self, input_data: any) -> tuple[any, None] | tuple[None, any]:
for chunk, _, chunk_rects, _, index in chunk_range.get_iter(chunk_iter):
bp_futures.append(executor.submit(
process_chunk,
config,
straightened_volume_path,
chunk_rects,
chunk,
index,
full_bounding_box
config=config,
straightened_volume_path=straightened_volume_path,
chunk_rects=chunk_rects,
chunk=chunk,
index=index,
full_bounding_box=full_bounding_box
))

# Track what's written.
Expand All @@ -206,8 +204,8 @@ def _process(self, input_data: any) -> tuple[any, None] | tuple[None, any]:
def note_written(write_future):
nonlocal pages_written
pages_written += 1
self.update_progress((np.sum(processed) / len(chunk_range)) * (2 / 3)
+ (pages_written / num_pages) * (1 / 3))
self.update_progress((np.sum(processed) / len(chunk_range)) * (exec_procs / self.num_processes)
+ (pages_written / num_pages) * (write_procs / self.num_processes))
for key, value in write_future.result().items():
self.add_timing(key, value)

Expand All @@ -222,8 +220,8 @@ def note_written(write_future):

# Update the progress bar
processed[index] = 1
self.update_progress((np.sum(processed) / len(chunk_range)) * (2 / 3)
+ (pages_written / num_pages) * (1 / 3))
self.update_progress((np.sum(processed) / len(chunk_range)) * (exec_procs / self.num_processes)
+ (pages_written / num_pages) * (write_procs / self.num_processes))

update_writable_rects(processed, slice_rects, min_dim, writeable, DEFAULT_CHUNK_SIZE)

Expand All @@ -233,14 +231,14 @@ def note_written(write_future):
for index in write:
write_futures.append(write_executor.submit(
write_conv_vol,
tif_write(tifffile.imwrite),
i_path.joinpath(f"i_{index:05}"),
ImgSliceC(*write_shape[1:], channels),
bool if config.make_backprojection_binary else np.uint16,
scaling_factors,
folder_path,
index,
config.upsample_order
writer=tif_write(tifffile.imwrite),
source_path=i_path.joinpath(f"i_{index:05}.dat"),
shape=ImgSliceC(*write_shape[1:], channels),
dtype=bool if config.make_backprojection_binary else np.uint16,
scaling=scaling_factors,
target_folder=folder_path,
index=index,
interpolation=config.upsample_order
))
write_futures[-1].add_done_callback(note_written)

Expand Down Expand Up @@ -271,8 +269,7 @@ def note_written(write_future):

if config.make_single_file:
shutil.rmtree(folder_path)
shutil.rmtree(Path(config.output_file_folder,
f"{config.output_file_name}_t_{'_'.join(map(str, full_bounding_box.get_min(np.uint32)))}"))
shutil.rmtree(i_path)

return None

Expand Down Expand Up @@ -320,7 +317,7 @@ def process_chunk(

if values.nbytes == 0:
# No data to write from this chunk, so return as such.
durations["total_process"] = [time.perf_counter() - start_total]
durations["total_chunk_process"] = [time.perf_counter() - start_total]
return durations, index, []

# Save the data
Expand All @@ -336,7 +333,9 @@ def process_chunk(
"target_rows": full_bounding_box.get_shape()[0],
"offset_columns": offset[1],
"offset_rows": offset[2],
"channel_count": np.uint32(1 if len(slices.shape) < 4 else slices.shape[-1]),
}
type_ar = np.array([yx_vals.dtype.str, values.dtype.str, weights.dtype.str], dtype='S8')
durations["split"] = [time.perf_counter() - start]

# Gets slices off full array corresponding to each Z value.
Expand All @@ -347,27 +346,32 @@ def process_chunk(
durations["stack"] = [time.perf_counter() - start]
start = time.perf_counter()

file_path = Path(config.output_file_folder,
f"{config.output_file_name}_t_{'_'.join(map(str, full_bounding_box.get_min(np.uint32)))}")
file_path.mkdir(exist_ok=True, parents=True)
i_path = Path(config.output_file_folder, f"{os.getppid()}_{config.output_file_name}")
i_path.mkdir(exist_ok=True, parents=True)

def write_z(i, z_slice):
def write_z(target, z_slice):
write_raw_intermediate(target,
np.fromiter(offset_dict.values(), dtype=np.uint32, count=5).tobytes(),
np.uint32(len(yx_vals[z_slice])).tobytes(),
type_ar.tobytes(),
yx_vals[z_slice].tobytes(), values[z_slice].tobytes(), weights[z_slice].tobytes())

def make_z(i, z_slice):
offset_z = z_stack[i] + offset[0]
file_path.joinpath(f"i_{offset_z:05}").mkdir(exist_ok=True, parents=True)
write_small_intermediate(file_path.joinpath(f"i_{offset_z:05}", f"{index}.tif"),
np.fromiter(offset_dict.values(), dtype=np.uint32, count=4),
yx_vals[z_slice], np.atleast_2d(values)[:, z_slice], weights[z_slice])
z_path = i_path.joinpath(f"i_{offset_z:05}.dat")
with FileLock(z_path.with_suffix(".lock")):
write_z(open(z_path, "ab"), z_slice)

with ThreadPool(12) as pool:
pool.starmap(write_z, enumerate(z_slices))
pool.starmap(make_z, enumerate(z_slices))

durations["write_intermediate"] = [time.perf_counter() - start]
except BaseException as be:
print(f"Error on BP: {be}")
traceback.print_tb(be.__traceback__, file=sys.stderr)
raise be

durations["total_process"] = [time.perf_counter() - start_total]
durations["total_chunk_process"] = [time.perf_counter() - start_total]

return durations, index, z_stack + offset[0]

Expand Down
Loading