Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
87 changes: 50 additions & 37 deletions flamingo_tools/data_conversion.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,16 +5,17 @@

from glob import glob
from pathlib import Path
from typing import Optional, List, Dict
from typing import Optional, List, Dict, Tuple

import numpy as np
import pybdv
import tifffile

from cluster_tools.utils.volume_utils import write_format_metadata
from elf.io import open_file
from skimage.transform import rescale

from .file_utils import read_tif, read_raw


def _read_resolution_and_unit_flamingo(mdata_path):
resolution = None
Expand Down Expand Up @@ -60,7 +61,27 @@ def _read_start_position_flamingo(path):
return start_position


def read_metadata_flamingo(metadata_path, offset=None, parse_affine=False):
def read_metadata_flamingo(
metadata_path: str,
offset: Optional[np.ndarray] = None,
parse_affine: bool = False
) -> Tuple[List[float], str, List[float]]:
"""Read acquisition metadata from a flamingo metadata file.

This will read the resolution, the physical unit, and optionally the
voxel grid transformation from the metadata file. The voxel grid transformation
places tile at their correct tile position.

Args:
metadata_path: The path to the metadata file.
offset: The spatial offset of this data.
parse_affine: Whether to read the affine transformation from the metadata.

Returns:
The resolution / voxel size of the data.
The physical unit of the voxel size.
The affine voxel grid transformation of the data.
"""
resolution, unit = None, None

resolution, unit = _read_resolution_and_unit_flamingo(metadata_path)
Expand Down Expand Up @@ -109,7 +130,7 @@ def _pos_to_trafo(pos):


# TODO derive the scale factors from the shape rather than hard-coding it to 5 levels
def derive_scale_factors(shape):
def _derive_scale_factors(shape):
scale_factors = [[2, 2, 2]] * 5
return scale_factors

Expand Down Expand Up @@ -147,11 +168,25 @@ def _to_ome_zarr(data, out_path, scale_factors, timepoint, setup_id, attributes,
)


def flamingo_filename_parser(file_path, name_mapping):
def flamingo_filename_parser(file_path: str, name_mapping: Optional[Dict]) -> Tuple[int, Dict[str, str], str]:
"""Parse the name of flamingo output files.

This maps the filenames to the corresponding timepoint, the BigStitcher
compatible attributes, and the id (name) of the attributes.

Args:
file_path: The path to the flamingo data.
name_mapping: Optional mapping of parsed attributes to their actual names.

Returns:
The timepoint of this data.
The dictionary mapping attribute names to their values.
The normalized attribute names.
"""
filename = os.path.basename(file_path)

# Extract the timepoint.
match = re.search(r'_t(\d+)_', filename)
match = re.search(r"_t(\d+)_", filename)
if match:
timepoint = int(match.group(1))
else:
Expand All @@ -163,25 +198,25 @@ def flamingo_filename_parser(file_path, name_mapping):
name_mapping = {}

# Extract the channel.
match = re.search(r'_C(\d+)_', filename)
match = re.search(r"_C(\d+)_", filename)
channel = int(match.group(1)) if match else 0
channel_mapping = name_mapping.get("channel", {})
attributes["channel"] = {"id": channel, "name": channel_mapping.get(channel, str(channel))}

# Extract the tile.
match = re.search(r'_R(\d+)_', filename)
match = re.search(r"_R(\d+)_", filename)
tile = int(match.group(1)) if match else 0
tile_mapping = name_mapping.get("tile", {})
attributes["tile"] = {"id": tile, "name": tile_mapping.get(tile, str(tile))}

# Extract the illumination.
match = re.search(r'_I(\d+)_', filename)
match = re.search(r"_I(\d+)_", filename)
illumination = int(match.group(1)) if match else 0
illumination_mapping = name_mapping.get("illumination", {})
attributes["illumination"] = {"id": illumination, "name": illumination_mapping.get(illumination, str(illumination))}

# Extract D. TODO what is this?
match = re.search(r'_D(\d+)_', filename)
match = re.search(r"_D(\d+)_", filename)
D = int(match.group(1)) if match else 0
D_mapping = name_mapping.get("D", {})
attributes["D"] = {"id": D, "name": D_mapping.get(D, str(D))}
Expand All @@ -207,35 +242,11 @@ def _write_missing_views(out_path):
tree.write(xml_path)


def _parse_shape(metadata_file):
depth, height, width = None, None, None

with open(metadata_file, "r") as f:
for line in f.readlines():
line = line.strip().rstrip("\n")
if line.startswith("AOI width"):
width = int(line.split(" ")[-1])
if line.startswith("AOI height"):
height = int(line.split(" ")[-1])
if line.startswith("Number of planes saved"):
depth = int(line.split(" ")[-1])

assert depth is not None
assert height is not None
assert width is not None
return (depth, height, width)


def _load_data(file_path, metadata_file):
if Path(file_path).suffix == ".raw":
shape = _parse_shape(metadata_file)
data = np.memmap(file_path, mode="r", dtype="uint16", shape=shape)
data = read_raw(file_path, metadata_file)
else:
try:
data = tifffile.memmap(file_path, mode="r")
except ValueError:
print(f"Could not memmap the data from {file_path}. Fall back to load it into memory.")
data = tifffile.imread(file_path)
data = read_tif(file_path)
return data


Expand Down Expand Up @@ -360,7 +371,7 @@ def convert_lightsheet_to_bdv(
print(f"Converting tp={timepoint}, channel={attributes['channel']}, tile={attributes['tile']}")
data = _load_data(file_path, metadata_file)
if scale_factors is None:
scale_factors = derive_scale_factors(data.shape)
scale_factors = _derive_scale_factors(data.shape)

if convert_to_ome_zarr:
_to_ome_zarr(data, out_path, scale_factors, timepoint, setup_id, attributes, unit, resolution)
Expand All @@ -387,6 +398,8 @@ def convert_lightsheet_to_bdv(


def convert_lightsheet_to_bdv_cli():
"""@private
"""
import argparse

parser = argparse.ArgumentParser(
Expand Down
85 changes: 85 additions & 0 deletions flamingo_tools/file_utils.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,85 @@
import warnings
from typing import Optional, Union

import imageio.v3 as imageio
import numpy as np
import tifffile
import zarr
from elf.io import open_file


def _parse_shape(metadata_file):
depth, height, width = None, None, None

with open(metadata_file, "r") as f:
for line in f.readlines():
line = line.strip().rstrip("\n")
if line.startswith("AOI width"):
width = int(line.split(" ")[-1])
if line.startswith("AOI height"):
height = int(line.split(" ")[-1])
if line.startswith("Number of planes saved"):
depth = int(line.split(" ")[-1])

assert depth is not None
assert height is not None
assert width is not None
return (depth, height, width)


def read_raw(file_path: str, metadata_file: str) -> np.memmap:
"""Read a raw file written by the flamingo microscope.

Args:
file_path: The file path to the raw file.
metadata_file: The file path to the metadata describing the raw file.
The metadata will be used to determine the shape of the data.

Returns:
The memory-mapped data.
"""
shape = _parse_shape(metadata_file)
return np.memmap(file_path, mode="r", dtype="uint16", shape=shape)


def read_tif(file_path: str) -> Union[np.ndarray, np.memmap]:
"""Read a tif file.

Tries to memory map the file. If not possible will load the complete file into memory
and raise a warning.

Args:
file_path: The file path to the tif file.

Returns:
The memory-mapped data. If not possible to memmap, the data in memory.
"""
try:
x = tifffile.memmap(file_path)
except ValueError:
warnings.warn(f"Cannot memmap the tif file at {file_path}. Fall back to loading it into memory.")
x = imageio.imread(file_path)
return x


def read_image_data(input_path: Union[str, zarr.storage.FSStore], input_key: Optional[str]) -> np.typing.ArrayLike:
"""Read flamingo image data, stored in various formats.

Args:
input_path: The file path to the data, or a zarr S3 store for data remotely accessed on S3.
The data can be stored as a tif file, or a zarr/n5 container.
Access via S3 is only supported for a zarr container.
input_key: The key (= internal path) for a zarr or n5 container.
Set it to None if the data is stored in a tif file.

Returns:
The data, loaded either as a numpy mem-map, a numpy array, or a zarr / n5 array.
"""
if input_key is None:
input_ = read_tif(input_path)
elif isinstance(input_path, str):
input_ = open_file(input_path, "r")[input_key]
else:
with zarr.open(input_path, mode="r") as f:
input_ = f[input_key]
return input_
Loading