diff --git a/.github/workflows/build_and_test.yml b/.github/workflows/build_and_test.yml index 7cec8a9..9f70ea2 100644 --- a/.github/workflows/build_and_test.yml +++ b/.github/workflows/build_and_test.yml @@ -17,10 +17,8 @@ jobs: strategy: matrix: os: [ubuntu-22.04, macos-latest] - python-version: ["3.9", "3.10", "3.11"] + python-version: ["3.10", "3.11", "3.12"] exclude: - - os: macos-latest - python-version: '3.9' - os: macos-latest python-version: '3.10' name: "Core, Python ${{ matrix.python-version }}, ${{ matrix.os }}" diff --git a/pyproject.toml b/pyproject.toml index 91a8cfe..c9d308a 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -25,9 +25,9 @@ authors = [ ] # Required Python version and dependencies -requires-python = ">=3.9" +requires-python = ">=3.10" dependencies = [ - "fractal-tasks-core==1.3.4" + "fractal-tasks-core==1.4.0","ngio==0.1.4", ] # Optional dependencies (e.g. for `pip install -e ".[dev]"`, see diff --git a/src/fractal_helper_tasks/__FRACTAL_MANIFEST__.json b/src/fractal_helper_tasks/__FRACTAL_MANIFEST__.json index 99941bc..60cd415 100644 --- a/src/fractal_helper_tasks/__FRACTAL_MANIFEST__.json +++ b/src/fractal_helper_tasks/__FRACTAL_MANIFEST__.json @@ -33,7 +33,7 @@ "default": false, "title": "Overwrite Input", "type": "boolean", - "description": "Whether" + "description": "Whether the existing iamge should be overwritten with the new OME-Zarr without the T dimension." } }, "required": [ @@ -128,6 +128,73 @@ "title": "Convert2dSegmentationTo3d" }, "docs_link": "https://github.com/jluethi/fractal-helper-tasks" + }, + { + "name": "Rechunk OME-Zarr", + "tags": [ + "Rechunking", + "Many files" + ], + "docs_info": "### Purpose\n- Rechunks OME-Zarr to new chunking parameters: Changes whether the array is stored as many small files or few larger files.\n- Optionally applies the same rechunking to label images.\n\n### Outputs\n- A **new Zarr image** that is rechunked.\n", + "executable_parallel": "rechunk_zarr.py", + "meta_parallel": { + "cpus_per_task": 1, + "mem": 4000 + }, + "args_schema_parallel": { + "additionalProperties": false, + "properties": { + "zarr_url": { + "title": "Zarr Url", + "type": "string", + "description": "Path or url to the individual OME-Zarr image to be processed. (standard argument for Fractal tasks, managed by Fractal server)." + }, + "chunk_sizes": { + "additionalProperties": { + "type": "integer" + }, + "title": "Chunk Sizes", + "type": "object", + "description": "Dictionary of chunk sizes to adapt. One can set any of the t, c, z, y, x axes that exist in the input image to be resized to a different chunk size. For example, {\"y\": 4000, \"x\": 4000} will set a new x & y chunking while maintaining the other chunk sizes. {\"z\": 10} will just change the Z chunking while keeping all other chunk sizes the same as the input." + }, + "suffix": { + "default": "rechunked", + "title": "Suffix", + "type": "string", + "description": "Suffix of the rechunked image." + }, + "rechunk_labels": { + "default": true, + "title": "Rechunk Labels", + "type": "boolean", + "description": "Whether to apply the same rechunking to all label images of the OME-Zarr as well." + }, + "rebuild_pyramids": { + "default": true, + "title": "Rebuild Pyramids", + "type": "boolean", + "description": "Whether pyramids are built fresh in the rechunked image. This has a small performance overhead, but ensures that this task is save against off-by-one issues when pyramid levels aren't easily downsampled by 2." + }, + "overwrite_input": { + "default": true, + "title": "Overwrite Input", + "type": "boolean", + "description": "Whether the old image without rechunking should be overwritten (to avoid duplicating the data needed)." + }, + "overwrite": { + "default": false, + "title": "Overwrite", + "type": "boolean", + "description": "Whether to overwrite potential pre-existing output with the name zarr_url_suffix." + } + }, + "required": [ + "zarr_url" + ], + "type": "object", + "title": "RechunkZarr" + }, + "docs_link": "https://github.com/jluethi/fractal-helper-tasks" } ], "has_args_schemas": true, diff --git a/src/fractal_helper_tasks/dev/docs_info/rechunk_zarr.md b/src/fractal_helper_tasks/dev/docs_info/rechunk_zarr.md new file mode 100644 index 0000000..6f79c9d --- /dev/null +++ b/src/fractal_helper_tasks/dev/docs_info/rechunk_zarr.md @@ -0,0 +1,6 @@ +### Purpose +- Rechunks OME-Zarr to new chunking parameters: Changes whether the array is stored as many small files or few larger files. +- Optionally applies the same rechunking to label images. + +### Outputs +- A **new Zarr image** that is rechunked. diff --git a/src/fractal_helper_tasks/dev/task_list.py b/src/fractal_helper_tasks/dev/task_list.py index 4aa71f5..8289643 100644 --- a/src/fractal_helper_tasks/dev/task_list.py +++ b/src/fractal_helper_tasks/dev/task_list.py @@ -22,4 +22,14 @@ ], docs_info="file:docs_info/2d_to_3d.md", ), + ParallelTask( + name="Rechunk OME-Zarr", + executable="rechunk_zarr.py", + meta={"cpus_per_task": 1, "mem": 4000}, + tags=[ + "Rechunking", + "Many files", + ], + docs_info="file:docs_info/rechunk_zarr.md", + ), ] diff --git a/src/fractal_helper_tasks/drop_t_dimension.py b/src/fractal_helper_tasks/drop_t_dimension.py index 9c59de5..782c34b 100644 --- a/src/fractal_helper_tasks/drop_t_dimension.py +++ b/src/fractal_helper_tasks/drop_t_dimension.py @@ -53,7 +53,8 @@ def drop_t_dimension( (standard argument for Fractal tasks, managed by Fractal server). suffix: Suffix to be used for the new Zarr image. If overwrite_input is True, this file is only temporary. - overwrite_input: Whether + overwrite_input: Whether the existing iamge should be overwritten with + the new OME-Zarr without the T dimension. """ # Normalize zarr_url zarr_url_old = zarr_url.rstrip("/") diff --git a/src/fractal_helper_tasks/rechunk_zarr.py b/src/fractal_helper_tasks/rechunk_zarr.py new file mode 100644 index 0000000..5c48f1f --- /dev/null +++ b/src/fractal_helper_tasks/rechunk_zarr.py @@ -0,0 +1,145 @@ +# Copyright 2025 (C) BioVisionCenter, University of Zurich +# +# Original authors: +# Joel Lüthi +"""Rechunk an existing Zarr.""" + +import logging +import os +import shutil +from typing import Any, Optional + +import ngio +from pydantic import validate_call + +from fractal_helper_tasks.utils import normalize_chunk_size_dict, rechunk_label + +logger = logging.getLogger(__name__) + + +@validate_call +def rechunk_zarr( + *, + zarr_url: str, + chunk_sizes: Optional[dict[str, Optional[int]]] = None, + suffix: str = "rechunked", + rechunk_labels: bool = True, + rebuild_pyramids: bool = True, + overwrite_input: bool = True, + overwrite: bool = False, +) -> dict[str, Any]: + """Drops singleton t dimension. + + Args: + zarr_url: Path or url to the individual OME-Zarr image to be processed. + (standard argument for Fractal tasks, managed by Fractal server). + chunk_sizes: Dictionary of chunk sizes to adapt. One can set any of + the t, c, z, y, x axes that exist in the input image to be resized + to a different chunk size. For example, {"y": 4000, "x": 4000} + will set a new x & y chunking while maintaining the other chunk + sizes. {"z": 10} will just change the Z chunking while keeping + all other chunk sizes the same as the input. + suffix: Suffix of the rechunked image. + rechunk_labels: Whether to apply the same rechunking to all label + images of the OME-Zarr as well. + rebuild_pyramids: Whether pyramids are built fresh in the rechunked + image. This has a small performance overhead, but ensures that + this task is save against off-by-one issues when pyramid levels + aren't easily downsampled by 2. + overwrite_input: Whether the old image without rechunking should be + overwritten (to avoid duplicating the data needed). + overwrite: Whether to overwrite potential pre-existing output with the + name zarr_url_suffix. + """ + logger.info(f"Running `rechunk_zarr` on {zarr_url=} with {chunk_sizes=}.") + + chunk_sizes = normalize_chunk_size_dict(chunk_sizes) + + rechunked_zarr_url = zarr_url + f"_{suffix}" + ngff_image = ngio.NgffImage(zarr_url) + pyramid_paths = ngff_image.levels_paths + highest_res_img = ngff_image.get_image() + axes_names = highest_res_img.dataset.on_disk_axes_names + chunks = highest_res_img.on_disk_dask_array.chunks + + # Compute the chunksize tuple + new_chunksize = [c[0] for c in chunks] + logger.info(f"Initial chunk sizes were: {chunks}") + # Overwrite chunk_size with user-set chunksize + for i, axis in enumerate(axes_names): + if axis in chunk_sizes: + if chunk_sizes[axis] is not None: + new_chunksize[i] = chunk_sizes[axis] + + for axis in chunk_sizes: + if axis not in axes_names: + raise NotImplementedError( + f"Rechunking with {axis=} is specified, but the OME-Zarr only " + f"has the following axes: {axes_names}" + ) + + logger.info(f"Chunk sizes after rechunking will be: {new_chunksize=}") + + new_ngff_image = ngff_image.derive_new_image( + store=rechunked_zarr_url, + name=ngff_image.image_meta.name, + overwrite=overwrite, + copy_labels=not rechunk_labels, + copy_tables=True, + chunks=new_chunksize, + ) + + ngff_image = ngio.NgffImage(zarr_url) + + if rebuild_pyramids: + # Set the highest resolution, then consolidate to build a new pyramid + new_ngff_image.get_image(highest_resolution=True).set_array( + ngff_image.get_image(highest_resolution=True).on_disk_dask_array + ) + new_ngff_image.get_image(highest_resolution=True).consolidate() + else: + for path in pyramid_paths: + new_ngff_image.get_image(path=path).set_array( + ngff_image.get_image(path=path).on_disk_dask_array + ) + + # Copy labels + if rechunk_labels: + chunk_sizes["c"] = None + label_names = ngff_image.labels.list() + for label in label_names: + rechunk_label( + orig_ngff_image=ngff_image, + new_ngff_image=new_ngff_image, + label=label, + chunk_sizes=chunk_sizes, + overwrite=overwrite, + rebuild_pyramids=rebuild_pyramids, + ) + + if overwrite_input: + os.rename(zarr_url, f"{zarr_url}_tmp") + os.rename(rechunked_zarr_url, zarr_url) + shutil.rmtree(f"{zarr_url}_tmp") + return + else: + output = dict( + image_list_updates=[ + dict( + zarr_url=rechunked_zarr_url, + origin=zarr_url, + types=dict(rechunked=True), + ) + ], + filters=dict(types=dict(rechunked=True)), + ) + return output + + +if __name__ == "__main__": + from fractal_tasks_core.tasks._utils import run_fractal_task + + run_fractal_task( + task_function=rechunk_zarr, + logger_name=logger.name, + ) diff --git a/src/fractal_helper_tasks/utils.py b/src/fractal_helper_tasks/utils.py new file mode 100644 index 0000000..3c5c609 --- /dev/null +++ b/src/fractal_helper_tasks/utils.py @@ -0,0 +1,114 @@ +# Copyright 2025 (C) BioVisionCenter, University of Zurich +# +# Original authors: +# Joel Lüthi +"""Utils for helper tasks.""" + +from typing import Optional + +import ngio +from ngio.core.utils import create_empty_ome_zarr_label + + +def normalize_chunk_size_dict(chunk_sizes: dict[str, Optional[int]]): + """Converts all chunk_size axes names to lower case and assert validity. + + Args: + chunk_sizes: Dictionary of chunk sizes that should be adapted. Can + contain new chunk sizes for t, c, z, y & x. + + Returns: + chunk_sizes_norm: Normalized chunk_sizes dict. + """ + chunk_sizes = chunk_sizes or {} + chunk_sizes_norm = {} + for key, value in chunk_sizes.items(): + chunk_sizes_norm[key.lower()] = value + + valid_axes = ["t", "c", "z", "y", "x"] + for axis in chunk_sizes_norm: + if axis not in valid_axes: + raise ValueError( + f"Axis {axis} is not supported. Valid axes choices are " + f"{valid_axes}." + ) + return chunk_sizes_norm + + +def rechunk_label( + orig_ngff_image: ngio.NgffImage, + new_ngff_image: ngio.NgffImage, + label: str, + chunk_sizes: list[int], + overwrite: bool = False, + rebuild_pyramids: bool = True, +): + """Saves a rechunked label image into a new OME-Zarr + + The label image is based on an existing label image in another OME-Zarr. + + Args: + orig_ngff_image: Original OME-Zarr that contains the label image + new_ngff_image: OME-Zarr to which the rechunked label image should be + added. + label: Name of the label image. + chunk_sizes: New chunk sizes that should be applied + overwrite: Whether the label image in `new_ngff_image` should be + overwritten if it already exists. + rebuild_pyramids: Whether pyramids are built fresh in the rechunked + label image. This has a small performance overhead, but ensures + that this task is save against off-by-one issues when pyramid + levels aren't easily downsampled by 2. + """ + old_label = orig_ngff_image.labels.get_label(name=label) + label_level_paths = orig_ngff_image.labels.levels_paths(name=label) + # Compute the chunksize tuple + chunks = old_label.on_disk_dask_array.chunks + new_chunksize = [c[0] for c in chunks] + # Overwrite chunk_size with user-set chunksize + for i, axis in enumerate(old_label.dataset.on_disk_axes_names): + if axis in chunk_sizes: + if chunk_sizes[axis] is not None: + new_chunksize[i] = chunk_sizes[axis] + create_empty_ome_zarr_label( + store=new_ngff_image.store + "/" + "labels" + "/" + label, + on_disk_shape=old_label.on_disk_shape, + chunks=new_chunksize, + dtype=old_label.on_disk_dask_array.dtype, + on_disk_axis=old_label.dataset.on_disk_axes_names, + pixel_sizes=old_label.dataset.pixel_size, + xy_scaling_factor=old_label.metadata.xy_scaling_factor, + z_scaling_factor=old_label.metadata.z_scaling_factor, + time_spacing=old_label.dataset.time_spacing, + time_units=old_label.dataset.time_axis_unit, + levels=label_level_paths, + name=label, + overwrite=overwrite, + version=old_label.metadata.version, + ) + + # Fill in labels .attrs to contain the label name + list_of_labels = new_ngff_image.labels.list() + if label not in list_of_labels: + new_ngff_image.labels._label_group.attrs["labels"] = [ + *list_of_labels, + label, + ] + + if rebuild_pyramids: + # Set the highest resolution, then consolidate to build a new pyramid + new_ngff_image.labels.get_label(name=label, highest_resolution=True).set_array( + orig_ngff_image.labels.get_label( + name=label, highest_resolution=True + ).on_disk_dask_array + ) + new_ngff_image.labels.get_label( + name=label, highest_resolution=True + ).consolidate() + else: + for label_path in label_level_paths: + new_ngff_image.labels.get_label(name=label, path=label_path).set_array( + orig_ngff_image.labels.get_label( + name=label, path=label_path + ).on_disk_dask_array + ) diff --git a/tests/test_rechunk_zarr.py b/tests/test_rechunk_zarr.py new file mode 100644 index 0000000..2e7deb3 --- /dev/null +++ b/tests/test_rechunk_zarr.py @@ -0,0 +1,134 @@ +"""Test copy 2D to 3D segmentation.""" + +import ngio +import pytest + +from fractal_helper_tasks.rechunk_zarr import ( + rechunk_zarr, +) + + +@pytest.mark.parametrize( + "chunk_sizes, output_chunk_sizes", + [ + ({"x": 1000, "y": 1000}, [1, 1, 1000, 1000]), + ({"X": 1000, "Y": 1000}, [1, 1, 1000, 1000]), + ({"x": 6000, "y": 6000}, [1, 1, 2160, 5120]), + ({}, [1, 1, 2160, 2560]), + ({"x": None, "y": None}, [1, 1, 2160, 2560]), + ({"z": 10}, [1, 1, 2160, 2560]), + ({"Z": 10}, [1, 1, 2160, 2560]), + ], +) +def test_rechunk_2d(tmp_zenodo_zarr: list[str], chunk_sizes, output_chunk_sizes): + zarr_url = f"{tmp_zenodo_zarr[1]}/B/03/0" + + rechunk_zarr( + zarr_url=zarr_url, + chunk_sizes=chunk_sizes, + ) + + chunks = ngio.NgffImage(zarr_url).get_image().on_disk_dask_array.chunks + chunk_sizes = [c[0] for c in chunks] + assert chunk_sizes == output_chunk_sizes + + +@pytest.mark.parametrize( + "chunk_sizes, output_chunk_sizes", + [ + ({"x": None, "y": None}, [1, 1, 2160, 2560]), + ({"z": 10}, [1, 2, 2160, 2560]), + ], +) +def test_rechunk_3d(tmp_zenodo_zarr: list[str], chunk_sizes, output_chunk_sizes): + zarr_url = f"{tmp_zenodo_zarr[0]}/B/03/0" + + rechunk_zarr( + zarr_url=zarr_url, + chunk_sizes=chunk_sizes, + ) + + chunks = ngio.NgffImage(zarr_url).get_image().on_disk_dask_array.chunks + chunk_sizes = [c[0] for c in chunks] + assert chunk_sizes == output_chunk_sizes + + +@pytest.mark.parametrize( + "rechunk_labels, output_chunk_sizes", + [ + (True, [1, 300, 300]), + (False, [1, 540, 1280]), + ], +) +def test_rechunk_labels(tmp_zenodo_zarr: list[str], rechunk_labels, output_chunk_sizes): + zarr_url = f"{tmp_zenodo_zarr[1]}/B/03/0" + chunk_sizes = {"x": 300, "y": 300} + + rechunk_zarr( + zarr_url=zarr_url, + chunk_sizes=chunk_sizes, + rechunk_labels=rechunk_labels, + ) + chunks = ( + ngio.NgffImage(zarr_url) + .labels.get_label(name="nuclei", path="0") + .on_disk_dask_array.chunks + ) + chunk_sizes = [c[0] for c in chunks] + assert chunk_sizes == output_chunk_sizes + + +@pytest.mark.parametrize( + "chunk_sizes, error_axes", + [ + ({"test": 1000, "y": 1000}, "test"), + ({"u": 1000}, "u"), + ], +) +def test_invalid_axis(tmp_zenodo_zarr: list[str], chunk_sizes, error_axes): + zarr_url = f"{tmp_zenodo_zarr[1]}/B/03/0" + + with pytest.raises(ValueError) as e: + rechunk_zarr( + zarr_url=zarr_url, + chunk_sizes=chunk_sizes, + ) + assert f"Axis {error_axes} is not supported" in str(e.value) + + +def test_rechunk_no_overwrite_input(tmp_zenodo_zarr: list[str]): + zarr_url = f"{tmp_zenodo_zarr[1]}/B/03/0" + suffix = "rechunked_custom" + new_zarr_url = f"{zarr_url}_{suffix}" + overwrite_input = False + chunk_sizes = {"x": 1000, "y": 1000} + output_chunk_sizes = [1, 1, 1000, 1000] + original_chunk_sizes = [1, 1, 2160, 2560] + + output = rechunk_zarr( + zarr_url=zarr_url, + chunk_sizes=chunk_sizes, + suffix=suffix, + overwrite_input=overwrite_input, + ) + expected_output = dict( + image_list_updates=[ + dict( + zarr_url=new_zarr_url, + origin=zarr_url, + types=dict(rechunked=True), + ) + ], + filters=dict(types=dict(rechunked=True)), + ) + assert expected_output == output + + # Existing zarr should be unchanged, but new zarr should have + # expected chunking + chunks = ngio.NgffImage(zarr_url).get_image().on_disk_dask_array.chunks + chunk_sizes = [c[0] for c in chunks] + assert chunk_sizes == original_chunk_sizes + + chunks = ngio.NgffImage(new_zarr_url).get_image().on_disk_dask_array.chunks + chunk_sizes = [c[0] for c in chunks] + assert chunk_sizes == output_chunk_sizes