Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 1 addition & 3 deletions .github/workflows/build_and_test.yml
Original file line number Diff line number Diff line change
Expand Up @@ -17,10 +17,8 @@ jobs:
strategy:
matrix:
os: [ubuntu-22.04, macos-latest]
python-version: ["3.9", "3.10", "3.11"]
python-version: ["3.10", "3.11", "3.12"]
exclude:
- os: macos-latest
python-version: '3.9'
- os: macos-latest
python-version: '3.10'
name: "Core, Python ${{ matrix.python-version }}, ${{ matrix.os }}"
Expand Down
4 changes: 2 additions & 2 deletions pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -25,9 +25,9 @@ authors = [
]

# Required Python version and dependencies
requires-python = ">=3.9"
requires-python = ">=3.10"
dependencies = [
"fractal-tasks-core==1.3.4"
"fractal-tasks-core==1.4.0","ngio==0.1.4",
]

# Optional dependencies (e.g. for `pip install -e ".[dev]"`, see
Expand Down
69 changes: 68 additions & 1 deletion src/fractal_helper_tasks/__FRACTAL_MANIFEST__.json
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@
"default": false,
"title": "Overwrite Input",
"type": "boolean",
"description": "Whether"
"description": "Whether the existing iamge should be overwritten with the new OME-Zarr without the T dimension."
}
},
"required": [
Expand Down Expand Up @@ -128,6 +128,73 @@
"title": "Convert2dSegmentationTo3d"
},
"docs_link": "https://github.com/jluethi/fractal-helper-tasks"
},
{
"name": "Rechunk OME-Zarr",
"tags": [
"Rechunking",
"Many files"
],
"docs_info": "### Purpose\n- Rechunks OME-Zarr to new chunking parameters: Changes whether the array is stored as many small files or few larger files.\n- Optionally applies the same rechunking to label images.\n\n### Outputs\n- A **new Zarr image** that is rechunked.\n",
"executable_parallel": "rechunk_zarr.py",
"meta_parallel": {
"cpus_per_task": 1,
"mem": 4000
},
"args_schema_parallel": {
"additionalProperties": false,
"properties": {
"zarr_url": {
"title": "Zarr Url",
"type": "string",
"description": "Path or url to the individual OME-Zarr image to be processed. (standard argument for Fractal tasks, managed by Fractal server)."
},
"chunk_sizes": {
"additionalProperties": {
"type": "integer"
},
"title": "Chunk Sizes",
"type": "object",
"description": "Dictionary of chunk sizes to adapt. One can set any of the t, c, z, y, x axes that exist in the input image to be resized to a different chunk size. For example, {\"y\": 4000, \"x\": 4000} will set a new x & y chunking while maintaining the other chunk sizes. {\"z\": 10} will just change the Z chunking while keeping all other chunk sizes the same as the input."
},
"suffix": {
"default": "rechunked",
"title": "Suffix",
"type": "string",
"description": "Suffix of the rechunked image."
},
"rechunk_labels": {
"default": true,
"title": "Rechunk Labels",
"type": "boolean",
"description": "Whether to apply the same rechunking to all label images of the OME-Zarr as well."
},
"rebuild_pyramids": {
"default": true,
"title": "Rebuild Pyramids",
"type": "boolean",
"description": "Whether pyramids are built fresh in the rechunked image. This has a small performance overhead, but ensures that this task is save against off-by-one issues when pyramid levels aren't easily downsampled by 2."
},
"overwrite_input": {
"default": true,
"title": "Overwrite Input",
"type": "boolean",
"description": "Whether the old image without rechunking should be overwritten (to avoid duplicating the data needed)."
},
"overwrite": {
"default": false,
"title": "Overwrite",
"type": "boolean",
"description": "Whether to overwrite potential pre-existing output with the name zarr_url_suffix."
}
},
"required": [
"zarr_url"
],
"type": "object",
"title": "RechunkZarr"
},
"docs_link": "https://github.com/jluethi/fractal-helper-tasks"
}
],
"has_args_schemas": true,
Expand Down
6 changes: 6 additions & 0 deletions src/fractal_helper_tasks/dev/docs_info/rechunk_zarr.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
### Purpose
- Rechunks OME-Zarr to new chunking parameters: Changes whether the array is stored as many small files or few larger files.
- Optionally applies the same rechunking to label images.

### Outputs
- A **new Zarr image** that is rechunked.
10 changes: 10 additions & 0 deletions src/fractal_helper_tasks/dev/task_list.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,4 +22,14 @@
],
docs_info="file:docs_info/2d_to_3d.md",
),
ParallelTask(
name="Rechunk OME-Zarr",
executable="rechunk_zarr.py",
meta={"cpus_per_task": 1, "mem": 4000},
tags=[
"Rechunking",
"Many files",
],
docs_info="file:docs_info/rechunk_zarr.md",
),
]
3 changes: 2 additions & 1 deletion src/fractal_helper_tasks/drop_t_dimension.py
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,8 @@ def drop_t_dimension(
(standard argument for Fractal tasks, managed by Fractal server).
suffix: Suffix to be used for the new Zarr image. If overwrite_input
is True, this file is only temporary.
overwrite_input: Whether
overwrite_input: Whether the existing iamge should be overwritten with
the new OME-Zarr without the T dimension.
"""
# Normalize zarr_url
zarr_url_old = zarr_url.rstrip("/")
Expand Down
145 changes: 145 additions & 0 deletions src/fractal_helper_tasks/rechunk_zarr.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,145 @@
# Copyright 2025 (C) BioVisionCenter, University of Zurich
#
# Original authors:
# Joel Lüthi <[email protected]>
"""Rechunk an existing Zarr."""

import logging
import os
import shutil
from typing import Any, Optional

import ngio
from pydantic import validate_call

from fractal_helper_tasks.utils import normalize_chunk_size_dict, rechunk_label

logger = logging.getLogger(__name__)


@validate_call
def rechunk_zarr(
*,
zarr_url: str,
chunk_sizes: Optional[dict[str, Optional[int]]] = None,
suffix: str = "rechunked",
rechunk_labels: bool = True,
rebuild_pyramids: bool = True,
overwrite_input: bool = True,
overwrite: bool = False,
) -> dict[str, Any]:
"""Drops singleton t dimension.

Args:
zarr_url: Path or url to the individual OME-Zarr image to be processed.
(standard argument for Fractal tasks, managed by Fractal server).
chunk_sizes: Dictionary of chunk sizes to adapt. One can set any of
the t, c, z, y, x axes that exist in the input image to be resized
to a different chunk size. For example, {"y": 4000, "x": 4000}
will set a new x & y chunking while maintaining the other chunk
sizes. {"z": 10} will just change the Z chunking while keeping
all other chunk sizes the same as the input.
suffix: Suffix of the rechunked image.
rechunk_labels: Whether to apply the same rechunking to all label
images of the OME-Zarr as well.
rebuild_pyramids: Whether pyramids are built fresh in the rechunked
image. This has a small performance overhead, but ensures that
this task is save against off-by-one issues when pyramid levels
aren't easily downsampled by 2.
overwrite_input: Whether the old image without rechunking should be
overwritten (to avoid duplicating the data needed).
overwrite: Whether to overwrite potential pre-existing output with the
name zarr_url_suffix.
"""
logger.info(f"Running `rechunk_zarr` on {zarr_url=} with {chunk_sizes=}.")

chunk_sizes = normalize_chunk_size_dict(chunk_sizes)

rechunked_zarr_url = zarr_url + f"_{suffix}"
ngff_image = ngio.NgffImage(zarr_url)
pyramid_paths = ngff_image.levels_paths
highest_res_img = ngff_image.get_image()
axes_names = highest_res_img.dataset.on_disk_axes_names
chunks = highest_res_img.on_disk_dask_array.chunks

# Compute the chunksize tuple
new_chunksize = [c[0] for c in chunks]
logger.info(f"Initial chunk sizes were: {chunks}")
# Overwrite chunk_size with user-set chunksize
for i, axis in enumerate(axes_names):
if axis in chunk_sizes:
if chunk_sizes[axis] is not None:
new_chunksize[i] = chunk_sizes[axis]

for axis in chunk_sizes:
if axis not in axes_names:
raise NotImplementedError(
f"Rechunking with {axis=} is specified, but the OME-Zarr only "
f"has the following axes: {axes_names}"
)

logger.info(f"Chunk sizes after rechunking will be: {new_chunksize=}")

new_ngff_image = ngff_image.derive_new_image(
store=rechunked_zarr_url,
name=ngff_image.image_meta.name,
overwrite=overwrite,
copy_labels=not rechunk_labels,
copy_tables=True,
chunks=new_chunksize,
)

ngff_image = ngio.NgffImage(zarr_url)

if rebuild_pyramids:
# Set the highest resolution, then consolidate to build a new pyramid
new_ngff_image.get_image(highest_resolution=True).set_array(
ngff_image.get_image(highest_resolution=True).on_disk_dask_array
)
new_ngff_image.get_image(highest_resolution=True).consolidate()
else:
for path in pyramid_paths:
new_ngff_image.get_image(path=path).set_array(
ngff_image.get_image(path=path).on_disk_dask_array
)

# Copy labels
if rechunk_labels:
chunk_sizes["c"] = None
label_names = ngff_image.labels.list()
for label in label_names:
rechunk_label(
orig_ngff_image=ngff_image,
new_ngff_image=new_ngff_image,
label=label,
chunk_sizes=chunk_sizes,
overwrite=overwrite,
rebuild_pyramids=rebuild_pyramids,
)

if overwrite_input:
os.rename(zarr_url, f"{zarr_url}_tmp")
os.rename(rechunked_zarr_url, zarr_url)
shutil.rmtree(f"{zarr_url}_tmp")
return
else:
output = dict(
image_list_updates=[
dict(
zarr_url=rechunked_zarr_url,
origin=zarr_url,
types=dict(rechunked=True),
)
],
filters=dict(types=dict(rechunked=True)),
)
return output


if __name__ == "__main__":
from fractal_tasks_core.tasks._utils import run_fractal_task

run_fractal_task(
task_function=rechunk_zarr,
logger_name=logger.name,
)
114 changes: 114 additions & 0 deletions src/fractal_helper_tasks/utils.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,114 @@
# Copyright 2025 (C) BioVisionCenter, University of Zurich
#
# Original authors:
# Joel Lüthi <[email protected]>
"""Utils for helper tasks."""

from typing import Optional

import ngio
from ngio.core.utils import create_empty_ome_zarr_label


def normalize_chunk_size_dict(chunk_sizes: dict[str, Optional[int]]):
"""Converts all chunk_size axes names to lower case and assert validity.

Args:
chunk_sizes: Dictionary of chunk sizes that should be adapted. Can
contain new chunk sizes for t, c, z, y & x.

Returns:
chunk_sizes_norm: Normalized chunk_sizes dict.
"""
chunk_sizes = chunk_sizes or {}
chunk_sizes_norm = {}
for key, value in chunk_sizes.items():
chunk_sizes_norm[key.lower()] = value

valid_axes = ["t", "c", "z", "y", "x"]
for axis in chunk_sizes_norm:
if axis not in valid_axes:
raise ValueError(
f"Axis {axis} is not supported. Valid axes choices are "
f"{valid_axes}."
)
return chunk_sizes_norm


def rechunk_label(
orig_ngff_image: ngio.NgffImage,
new_ngff_image: ngio.NgffImage,
label: str,
chunk_sizes: list[int],
overwrite: bool = False,
rebuild_pyramids: bool = True,
):
"""Saves a rechunked label image into a new OME-Zarr

The label image is based on an existing label image in another OME-Zarr.

Args:
orig_ngff_image: Original OME-Zarr that contains the label image
new_ngff_image: OME-Zarr to which the rechunked label image should be
added.
label: Name of the label image.
chunk_sizes: New chunk sizes that should be applied
overwrite: Whether the label image in `new_ngff_image` should be
overwritten if it already exists.
rebuild_pyramids: Whether pyramids are built fresh in the rechunked
label image. This has a small performance overhead, but ensures
that this task is save against off-by-one issues when pyramid
levels aren't easily downsampled by 2.
"""
old_label = orig_ngff_image.labels.get_label(name=label)
label_level_paths = orig_ngff_image.labels.levels_paths(name=label)
# Compute the chunksize tuple
chunks = old_label.on_disk_dask_array.chunks
new_chunksize = [c[0] for c in chunks]
# Overwrite chunk_size with user-set chunksize
for i, axis in enumerate(old_label.dataset.on_disk_axes_names):
if axis in chunk_sizes:
if chunk_sizes[axis] is not None:
new_chunksize[i] = chunk_sizes[axis]
create_empty_ome_zarr_label(
store=new_ngff_image.store + "/" + "labels" + "/" + label,
on_disk_shape=old_label.on_disk_shape,
chunks=new_chunksize,
dtype=old_label.on_disk_dask_array.dtype,
on_disk_axis=old_label.dataset.on_disk_axes_names,
pixel_sizes=old_label.dataset.pixel_size,
xy_scaling_factor=old_label.metadata.xy_scaling_factor,
z_scaling_factor=old_label.metadata.z_scaling_factor,
time_spacing=old_label.dataset.time_spacing,
time_units=old_label.dataset.time_axis_unit,
levels=label_level_paths,
name=label,
overwrite=overwrite,
version=old_label.metadata.version,
)

# Fill in labels .attrs to contain the label name
list_of_labels = new_ngff_image.labels.list()
if label not in list_of_labels:
new_ngff_image.labels._label_group.attrs["labels"] = [
*list_of_labels,
label,
]

if rebuild_pyramids:
# Set the highest resolution, then consolidate to build a new pyramid
new_ngff_image.labels.get_label(name=label, highest_resolution=True).set_array(
orig_ngff_image.labels.get_label(
name=label, highest_resolution=True
).on_disk_dask_array
)
new_ngff_image.labels.get_label(
name=label, highest_resolution=True
).consolidate()
else:
for label_path in label_level_paths:
new_ngff_image.labels.get_label(name=label, path=label_path).set_array(
orig_ngff_image.labels.get_label(
name=label, path=label_path
).on_disk_dask_array
)
Loading