Skip to content

Commit 19060da

Browse files
authored
Merge pull request #18 from fractal-analytics-platform/rechunking_task
Add rechunking task
2 parents 438bf73 + fb30ade commit 19060da

File tree

9 files changed

+482
-7
lines changed

9 files changed

+482
-7
lines changed

.github/workflows/build_and_test.yml

Lines changed: 1 addition & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -17,10 +17,8 @@ jobs:
1717
strategy:
1818
matrix:
1919
os: [ubuntu-22.04, macos-latest]
20-
python-version: ["3.9", "3.10", "3.11"]
20+
python-version: ["3.10", "3.11", "3.12"]
2121
exclude:
22-
- os: macos-latest
23-
python-version: '3.9'
2422
- os: macos-latest
2523
python-version: '3.10'
2624
name: "Core, Python ${{ matrix.python-version }}, ${{ matrix.os }}"

pyproject.toml

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -25,9 +25,9 @@ authors = [
2525
]
2626

2727
# Required Python version and dependencies
28-
requires-python = ">=3.9"
28+
requires-python = ">=3.10"
2929
dependencies = [
30-
"fractal-tasks-core==1.3.4"
30+
"fractal-tasks-core==1.4.0","ngio==0.1.4",
3131
]
3232

3333
# Optional dependencies (e.g. for `pip install -e ".[dev]"`, see

src/fractal_helper_tasks/__FRACTAL_MANIFEST__.json

Lines changed: 68 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -33,7 +33,7 @@
3333
"default": false,
3434
"title": "Overwrite Input",
3535
"type": "boolean",
36-
"description": "Whether"
36+
"description": "Whether the existing iamge should be overwritten with the new OME-Zarr without the T dimension."
3737
}
3838
},
3939
"required": [
@@ -128,6 +128,73 @@
128128
"title": "Convert2dSegmentationTo3d"
129129
},
130130
"docs_link": "https://github.com/jluethi/fractal-helper-tasks"
131+
},
132+
{
133+
"name": "Rechunk OME-Zarr",
134+
"tags": [
135+
"Rechunking",
136+
"Many files"
137+
],
138+
"docs_info": "### Purpose\n- Rechunks OME-Zarr to new chunking parameters: Changes whether the array is stored as many small files or few larger files.\n- Optionally applies the same rechunking to label images.\n\n### Outputs\n- A **new Zarr image** that is rechunked.\n",
139+
"executable_parallel": "rechunk_zarr.py",
140+
"meta_parallel": {
141+
"cpus_per_task": 1,
142+
"mem": 4000
143+
},
144+
"args_schema_parallel": {
145+
"additionalProperties": false,
146+
"properties": {
147+
"zarr_url": {
148+
"title": "Zarr Url",
149+
"type": "string",
150+
"description": "Path or url to the individual OME-Zarr image to be processed. (standard argument for Fractal tasks, managed by Fractal server)."
151+
},
152+
"chunk_sizes": {
153+
"additionalProperties": {
154+
"type": "integer"
155+
},
156+
"title": "Chunk Sizes",
157+
"type": "object",
158+
"description": "Dictionary of chunk sizes to adapt. One can set any of the t, c, z, y, x axes that exist in the input image to be resized to a different chunk size. For example, {\"y\": 4000, \"x\": 4000} will set a new x & y chunking while maintaining the other chunk sizes. {\"z\": 10} will just change the Z chunking while keeping all other chunk sizes the same as the input."
159+
},
160+
"suffix": {
161+
"default": "rechunked",
162+
"title": "Suffix",
163+
"type": "string",
164+
"description": "Suffix of the rechunked image."
165+
},
166+
"rechunk_labels": {
167+
"default": true,
168+
"title": "Rechunk Labels",
169+
"type": "boolean",
170+
"description": "Whether to apply the same rechunking to all label images of the OME-Zarr as well."
171+
},
172+
"rebuild_pyramids": {
173+
"default": true,
174+
"title": "Rebuild Pyramids",
175+
"type": "boolean",
176+
"description": "Whether pyramids are built fresh in the rechunked image. This has a small performance overhead, but ensures that this task is save against off-by-one issues when pyramid levels aren't easily downsampled by 2."
177+
},
178+
"overwrite_input": {
179+
"default": true,
180+
"title": "Overwrite Input",
181+
"type": "boolean",
182+
"description": "Whether the old image without rechunking should be overwritten (to avoid duplicating the data needed)."
183+
},
184+
"overwrite": {
185+
"default": false,
186+
"title": "Overwrite",
187+
"type": "boolean",
188+
"description": "Whether to overwrite potential pre-existing output with the name zarr_url_suffix."
189+
}
190+
},
191+
"required": [
192+
"zarr_url"
193+
],
194+
"type": "object",
195+
"title": "RechunkZarr"
196+
},
197+
"docs_link": "https://github.com/jluethi/fractal-helper-tasks"
131198
}
132199
],
133200
"has_args_schemas": true,
Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,6 @@
1+
### Purpose
2+
- Rechunks OME-Zarr to new chunking parameters: Changes whether the array is stored as many small files or few larger files.
3+
- Optionally applies the same rechunking to label images.
4+
5+
### Outputs
6+
- A **new Zarr image** that is rechunked.

src/fractal_helper_tasks/dev/task_list.py

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -22,4 +22,14 @@
2222
],
2323
docs_info="file:docs_info/2d_to_3d.md",
2424
),
25+
ParallelTask(
26+
name="Rechunk OME-Zarr",
27+
executable="rechunk_zarr.py",
28+
meta={"cpus_per_task": 1, "mem": 4000},
29+
tags=[
30+
"Rechunking",
31+
"Many files",
32+
],
33+
docs_info="file:docs_info/rechunk_zarr.md",
34+
),
2535
]

src/fractal_helper_tasks/drop_t_dimension.py

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -53,7 +53,8 @@ def drop_t_dimension(
5353
(standard argument for Fractal tasks, managed by Fractal server).
5454
suffix: Suffix to be used for the new Zarr image. If overwrite_input
5555
is True, this file is only temporary.
56-
overwrite_input: Whether
56+
overwrite_input: Whether the existing iamge should be overwritten with
57+
the new OME-Zarr without the T dimension.
5758
"""
5859
# Normalize zarr_url
5960
zarr_url_old = zarr_url.rstrip("/")
Lines changed: 145 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,145 @@
1+
# Copyright 2025 (C) BioVisionCenter, University of Zurich
2+
#
3+
# Original authors:
4+
# Joel Lüthi <[email protected]>
5+
"""Rechunk an existing Zarr."""
6+
7+
import logging
8+
import os
9+
import shutil
10+
from typing import Any, Optional
11+
12+
import ngio
13+
from pydantic import validate_call
14+
15+
from fractal_helper_tasks.utils import normalize_chunk_size_dict, rechunk_label
16+
17+
logger = logging.getLogger(__name__)
18+
19+
20+
@validate_call
21+
def rechunk_zarr(
22+
*,
23+
zarr_url: str,
24+
chunk_sizes: Optional[dict[str, Optional[int]]] = None,
25+
suffix: str = "rechunked",
26+
rechunk_labels: bool = True,
27+
rebuild_pyramids: bool = True,
28+
overwrite_input: bool = True,
29+
overwrite: bool = False,
30+
) -> dict[str, Any]:
31+
"""Drops singleton t dimension.
32+
33+
Args:
34+
zarr_url: Path or url to the individual OME-Zarr image to be processed.
35+
(standard argument for Fractal tasks, managed by Fractal server).
36+
chunk_sizes: Dictionary of chunk sizes to adapt. One can set any of
37+
the t, c, z, y, x axes that exist in the input image to be resized
38+
to a different chunk size. For example, {"y": 4000, "x": 4000}
39+
will set a new x & y chunking while maintaining the other chunk
40+
sizes. {"z": 10} will just change the Z chunking while keeping
41+
all other chunk sizes the same as the input.
42+
suffix: Suffix of the rechunked image.
43+
rechunk_labels: Whether to apply the same rechunking to all label
44+
images of the OME-Zarr as well.
45+
rebuild_pyramids: Whether pyramids are built fresh in the rechunked
46+
image. This has a small performance overhead, but ensures that
47+
this task is save against off-by-one issues when pyramid levels
48+
aren't easily downsampled by 2.
49+
overwrite_input: Whether the old image without rechunking should be
50+
overwritten (to avoid duplicating the data needed).
51+
overwrite: Whether to overwrite potential pre-existing output with the
52+
name zarr_url_suffix.
53+
"""
54+
logger.info(f"Running `rechunk_zarr` on {zarr_url=} with {chunk_sizes=}.")
55+
56+
chunk_sizes = normalize_chunk_size_dict(chunk_sizes)
57+
58+
rechunked_zarr_url = zarr_url + f"_{suffix}"
59+
ngff_image = ngio.NgffImage(zarr_url)
60+
pyramid_paths = ngff_image.levels_paths
61+
highest_res_img = ngff_image.get_image()
62+
axes_names = highest_res_img.dataset.on_disk_axes_names
63+
chunks = highest_res_img.on_disk_dask_array.chunks
64+
65+
# Compute the chunksize tuple
66+
new_chunksize = [c[0] for c in chunks]
67+
logger.info(f"Initial chunk sizes were: {chunks}")
68+
# Overwrite chunk_size with user-set chunksize
69+
for i, axis in enumerate(axes_names):
70+
if axis in chunk_sizes:
71+
if chunk_sizes[axis] is not None:
72+
new_chunksize[i] = chunk_sizes[axis]
73+
74+
for axis in chunk_sizes:
75+
if axis not in axes_names:
76+
raise NotImplementedError(
77+
f"Rechunking with {axis=} is specified, but the OME-Zarr only "
78+
f"has the following axes: {axes_names}"
79+
)
80+
81+
logger.info(f"Chunk sizes after rechunking will be: {new_chunksize=}")
82+
83+
new_ngff_image = ngff_image.derive_new_image(
84+
store=rechunked_zarr_url,
85+
name=ngff_image.image_meta.name,
86+
overwrite=overwrite,
87+
copy_labels=not rechunk_labels,
88+
copy_tables=True,
89+
chunks=new_chunksize,
90+
)
91+
92+
ngff_image = ngio.NgffImage(zarr_url)
93+
94+
if rebuild_pyramids:
95+
# Set the highest resolution, then consolidate to build a new pyramid
96+
new_ngff_image.get_image(highest_resolution=True).set_array(
97+
ngff_image.get_image(highest_resolution=True).on_disk_dask_array
98+
)
99+
new_ngff_image.get_image(highest_resolution=True).consolidate()
100+
else:
101+
for path in pyramid_paths:
102+
new_ngff_image.get_image(path=path).set_array(
103+
ngff_image.get_image(path=path).on_disk_dask_array
104+
)
105+
106+
# Copy labels
107+
if rechunk_labels:
108+
chunk_sizes["c"] = None
109+
label_names = ngff_image.labels.list()
110+
for label in label_names:
111+
rechunk_label(
112+
orig_ngff_image=ngff_image,
113+
new_ngff_image=new_ngff_image,
114+
label=label,
115+
chunk_sizes=chunk_sizes,
116+
overwrite=overwrite,
117+
rebuild_pyramids=rebuild_pyramids,
118+
)
119+
120+
if overwrite_input:
121+
os.rename(zarr_url, f"{zarr_url}_tmp")
122+
os.rename(rechunked_zarr_url, zarr_url)
123+
shutil.rmtree(f"{zarr_url}_tmp")
124+
return
125+
else:
126+
output = dict(
127+
image_list_updates=[
128+
dict(
129+
zarr_url=rechunked_zarr_url,
130+
origin=zarr_url,
131+
types=dict(rechunked=True),
132+
)
133+
],
134+
filters=dict(types=dict(rechunked=True)),
135+
)
136+
return output
137+
138+
139+
if __name__ == "__main__":
140+
from fractal_tasks_core.tasks._utils import run_fractal_task
141+
142+
run_fractal_task(
143+
task_function=rechunk_zarr,
144+
logger_name=logger.name,
145+
)

src/fractal_helper_tasks/utils.py

Lines changed: 114 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,114 @@
1+
# Copyright 2025 (C) BioVisionCenter, University of Zurich
2+
#
3+
# Original authors:
4+
# Joel Lüthi <[email protected]>
5+
"""Utils for helper tasks."""
6+
7+
from typing import Optional
8+
9+
import ngio
10+
from ngio.core.utils import create_empty_ome_zarr_label
11+
12+
13+
def normalize_chunk_size_dict(chunk_sizes: dict[str, Optional[int]]):
14+
"""Converts all chunk_size axes names to lower case and assert validity.
15+
16+
Args:
17+
chunk_sizes: Dictionary of chunk sizes that should be adapted. Can
18+
contain new chunk sizes for t, c, z, y & x.
19+
20+
Returns:
21+
chunk_sizes_norm: Normalized chunk_sizes dict.
22+
"""
23+
chunk_sizes = chunk_sizes or {}
24+
chunk_sizes_norm = {}
25+
for key, value in chunk_sizes.items():
26+
chunk_sizes_norm[key.lower()] = value
27+
28+
valid_axes = ["t", "c", "z", "y", "x"]
29+
for axis in chunk_sizes_norm:
30+
if axis not in valid_axes:
31+
raise ValueError(
32+
f"Axis {axis} is not supported. Valid axes choices are "
33+
f"{valid_axes}."
34+
)
35+
return chunk_sizes_norm
36+
37+
38+
def rechunk_label(
39+
orig_ngff_image: ngio.NgffImage,
40+
new_ngff_image: ngio.NgffImage,
41+
label: str,
42+
chunk_sizes: list[int],
43+
overwrite: bool = False,
44+
rebuild_pyramids: bool = True,
45+
):
46+
"""Saves a rechunked label image into a new OME-Zarr
47+
48+
The label image is based on an existing label image in another OME-Zarr.
49+
50+
Args:
51+
orig_ngff_image: Original OME-Zarr that contains the label image
52+
new_ngff_image: OME-Zarr to which the rechunked label image should be
53+
added.
54+
label: Name of the label image.
55+
chunk_sizes: New chunk sizes that should be applied
56+
overwrite: Whether the label image in `new_ngff_image` should be
57+
overwritten if it already exists.
58+
rebuild_pyramids: Whether pyramids are built fresh in the rechunked
59+
label image. This has a small performance overhead, but ensures
60+
that this task is save against off-by-one issues when pyramid
61+
levels aren't easily downsampled by 2.
62+
"""
63+
old_label = orig_ngff_image.labels.get_label(name=label)
64+
label_level_paths = orig_ngff_image.labels.levels_paths(name=label)
65+
# Compute the chunksize tuple
66+
chunks = old_label.on_disk_dask_array.chunks
67+
new_chunksize = [c[0] for c in chunks]
68+
# Overwrite chunk_size with user-set chunksize
69+
for i, axis in enumerate(old_label.dataset.on_disk_axes_names):
70+
if axis in chunk_sizes:
71+
if chunk_sizes[axis] is not None:
72+
new_chunksize[i] = chunk_sizes[axis]
73+
create_empty_ome_zarr_label(
74+
store=new_ngff_image.store + "/" + "labels" + "/" + label,
75+
on_disk_shape=old_label.on_disk_shape,
76+
chunks=new_chunksize,
77+
dtype=old_label.on_disk_dask_array.dtype,
78+
on_disk_axis=old_label.dataset.on_disk_axes_names,
79+
pixel_sizes=old_label.dataset.pixel_size,
80+
xy_scaling_factor=old_label.metadata.xy_scaling_factor,
81+
z_scaling_factor=old_label.metadata.z_scaling_factor,
82+
time_spacing=old_label.dataset.time_spacing,
83+
time_units=old_label.dataset.time_axis_unit,
84+
levels=label_level_paths,
85+
name=label,
86+
overwrite=overwrite,
87+
version=old_label.metadata.version,
88+
)
89+
90+
# Fill in labels .attrs to contain the label name
91+
list_of_labels = new_ngff_image.labels.list()
92+
if label not in list_of_labels:
93+
new_ngff_image.labels._label_group.attrs["labels"] = [
94+
*list_of_labels,
95+
label,
96+
]
97+
98+
if rebuild_pyramids:
99+
# Set the highest resolution, then consolidate to build a new pyramid
100+
new_ngff_image.labels.get_label(name=label, highest_resolution=True).set_array(
101+
orig_ngff_image.labels.get_label(
102+
name=label, highest_resolution=True
103+
).on_disk_dask_array
104+
)
105+
new_ngff_image.labels.get_label(
106+
name=label, highest_resolution=True
107+
).consolidate()
108+
else:
109+
for label_path in label_level_paths:
110+
new_ngff_image.labels.get_label(name=label, path=label_path).set_array(
111+
orig_ngff_image.labels.get_label(
112+
name=label, path=label_path
113+
).on_disk_dask_array
114+
)

0 commit comments

Comments
 (0)