Skip to content

Commit fce8f84

Browse files
authored
Integrate clustertools (#67)
* use cluster futures for all tools which used ParallelExecutor before * fix redundant argument * unify jobs and processes arg * clean up code and fix test * format * add missing args * remove explicit checking for --jobs parameter * use v1.0 of cluster_tools * update readme * fix how args is None was handled * reformat * use namedtuple to mimic args * reformat * fix missing import
1 parent 9073703 commit fce8f84

File tree

9 files changed

+209
-126
lines changed

9 files changed

+209
-126
lines changed

README.md

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -75,6 +75,10 @@ python -m wkcuber.compress --layer_name segmentation data/target data/target_com
7575
python -m wkcuber.metadata --name great_dataset --scale 11.24,11.24,25 data/target
7676
```
7777

78+
### Parallelization
79+
80+
Most tasks can be configured to be executed in a parallelized manner. Via `--distribution_strategy` you can pass `multiprocessing` or `slurm`. The first can be further configured with `--jobs` and the latter via `--job_resources='{"mem": "10M"}'`. Use `--help` to get more information.
81+
7882
## Test data credits
7983
Excerpts for testing purposes have been sampled from:
8084
- Dow Jacobo Hossain Siletti Hudspeth (2018). **Connectomics of the zebrafish's lateral-line neuromast reveals wiring and miswiring in a simple microcircuit.** eLife. [DOI:10.7554/eLife.33988](https://elifesciences.org/articles/33988)

requirements.txt

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -6,3 +6,4 @@ pytest
66
wkw>=0.0.6
77
requests
88
black
9+
git+git://github.com/scalableminds/[email protected]#egg=cluster_tools

wkcuber/__main__.py

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -6,7 +6,7 @@
66
from .downsampling import downsample_mags, DEFAULT_EDGE_LEN
77
from .compress import compress_mag_inplace
88
from .metadata import write_webknossos_metadata
9-
from .utils import add_verbose_flag, add_jobs_flag
9+
from .utils import add_verbose_flag, add_distribution_flags
1010
from .mag import Mag
1111

1212

@@ -66,7 +66,7 @@ def create_parser():
6666
)
6767

6868
add_verbose_flag(parser)
69-
add_jobs_flag(parser)
69+
add_distribution_flags(parser)
7070

7171
return parser
7272

@@ -84,10 +84,11 @@ def create_parser():
8484
args.dtype,
8585
args.batch_size,
8686
args.jobs,
87+
args,
8788
)
8889

8990
if not args.no_compress:
90-
compress_mag_inplace(args.target_path, args.layer_name, Mag(1), args.jobs)
91+
compress_mag_inplace(args.target_path, args.layer_name, Mag(1), args.jobs, args)
9192

9293
downsample_mags(
9394
args.target_path,

wkcuber/compress.py

Lines changed: 28 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -10,10 +10,11 @@
1010

1111
from .utils import (
1212
add_verbose_flag,
13-
add_jobs_flag,
1413
open_wkw,
1514
WkwDatasetInfo,
16-
ParallelExecutor,
15+
add_distribution_flags,
16+
get_executor_for_args,
17+
wait_and_ensure_success,
1718
)
1819
from .metadata import detect_resolutions
1920
from typing import List
@@ -44,8 +45,8 @@ def create_parser():
4445
"--mag", "-m", nargs="*", help="Magnification level", default=None
4546
)
4647

47-
add_jobs_flag(parser)
4848
add_verbose_flag(parser)
49+
add_distribution_flags(parser)
4950

5051
return parser
5152

@@ -71,7 +72,7 @@ def compress_file_job(source_path, target_path):
7172
raise exc
7273

7374

74-
def compress_mag(source_path, layer_name, target_path, mag: Mag, jobs):
75+
def compress_mag(source_path, layer_name, target_path, mag: Mag, jobs, args=None):
7576
if path.exists(path.join(target_path, layer_name, str(mag))):
7677
logging.error("Target path '{}' already exists".format(target_path))
7778
exit(1)
@@ -80,18 +81,26 @@ def compress_mag(source_path, layer_name, target_path, mag: Mag, jobs):
8081
target_mag_path = path.join(target_path, layer_name, str(mag))
8182
logging.info("Compressing mag {0} in '{1}'".format(str(mag), target_mag_path))
8283

83-
with open_wkw(source_wkw_info) as source_wkw, ParallelExecutor(jobs) as pool:
84+
with open_wkw(source_wkw_info) as source_wkw:
8485
source_wkw.compress(target_mag_path)
85-
for file in source_wkw.list_files():
86-
rel_file = path.relpath(file, source_wkw.root)
87-
pool.submit(compress_file_job, file, path.join(target_mag_path, rel_file))
86+
with get_executor_for_args(args) as executor:
87+
futures = []
88+
for file in source_wkw.list_files():
89+
rel_file = path.relpath(file, source_wkw.root)
90+
futures.append(
91+
executor.submit(
92+
compress_file_job, file, path.join(target_mag_path, rel_file)
93+
)
94+
)
8895

89-
logging.info("Mag {0} succesfully compressed".format(str(mag)))
96+
wait_and_ensure_success(futures)
9097

98+
logging.info("Mag {0} successfully compressed".format(str(mag)))
9199

92-
def compress_mag_inplace(target_path, layer_name, mag: Mag, jobs):
100+
101+
def compress_mag_inplace(target_path, layer_name, mag: Mag, jobs, args=None):
93102
compress_target_path = "{}.compress-{}".format(target_path, uuid4())
94-
compress_mag(target_path, layer_name, compress_target_path, mag, jobs)
103+
compress_mag(target_path, layer_name, compress_target_path, mag, jobs, args)
95104

96105
shutil.rmtree(path.join(target_path, layer_name, str(mag)))
97106
shutil.move(
@@ -102,7 +111,7 @@ def compress_mag_inplace(target_path, layer_name, mag: Mag, jobs):
102111

103112

104113
def compress_mags(
105-
source_path, layer_name, target_path=None, mags: List[Mag] = None, jobs=1
114+
source_path, layer_name, target_path=None, mags: List[Mag] = None, jobs=1, args=None
106115
):
107116
with_tmp_dir = target_path is None
108117
target_path = source_path + ".tmp" if with_tmp_dir else target_path
@@ -112,7 +121,7 @@ def compress_mags(
112121
mags.sort()
113122

114123
for mag in mags:
115-
compress_mag(source_path, layer_name, target_path, mag, jobs)
124+
compress_mag(source_path, layer_name, target_path, mag, jobs, args)
116125

117126
if with_tmp_dir:
118127
makedirs(path.join(source_path + ".bak", layer_name), exist_ok=True)
@@ -138,5 +147,10 @@ def compress_mags(
138147
if args.verbose:
139148
logging.basicConfig(level=logging.DEBUG)
140149
compress_mags(
141-
args.source_path, args.layer_name, args.target_path, args.mag, int(args.jobs)
150+
args.source_path,
151+
args.layer_name,
152+
args.target_path,
153+
args.mag,
154+
int(args.jobs),
155+
args,
142156
)

wkcuber/convert_knossos.py

Lines changed: 27 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -6,14 +6,15 @@
66
from os import path
77

88
from .utils import (
9-
add_jobs_flag,
109
add_verbose_flag,
1110
open_wkw,
1211
open_knossos,
1312
WkwDatasetInfo,
1413
KnossosDatasetInfo,
15-
ParallelExecutor,
16-
pool_get_lock,
14+
ensure_wkw,
15+
add_distribution_flags,
16+
get_executor_for_args,
17+
wait_and_ensure_success,
1718
)
1819
from .knossos import KnossosDataset, CUBE_EDGE_LEN
1920

@@ -45,8 +46,8 @@ def create_parser():
4546

4647
parser.add_argument("--mag", "-m", help="Magnification level", type=int, default=1)
4748

48-
add_jobs_flag(parser)
4949
add_verbose_flag(parser)
50+
add_distribution_flags(parser)
5051

5152
return parser
5253

@@ -58,7 +59,7 @@ def convert_cube_job(cube_xyz, source_knossos_info, target_wkw_info):
5859
size = (CUBE_EDGE_LEN,) * 3
5960

6061
with open_knossos(source_knossos_info) as source_knossos, open_wkw(
61-
target_wkw_info, pool_get_lock()
62+
target_wkw_info
6263
) as target_wkw:
6364
cube_data = source_knossos.read(offset, size)
6465
target_wkw.write(offset, cube_data)
@@ -69,23 +70,30 @@ def convert_cube_job(cube_xyz, source_knossos_info, target_wkw_info):
6970
)
7071

7172

72-
def convert_knossos(source_path, target_path, layer_name, dtype, mag=1, jobs=1):
73+
def convert_knossos(
74+
source_path, target_path, layer_name, dtype, mag=1, jobs=1, args=None
75+
):
7376
source_knossos_info = KnossosDatasetInfo(source_path, dtype)
7477
target_wkw_info = WkwDatasetInfo(target_path, layer_name, dtype, mag)
7578

76-
with open_knossos(source_knossos_info) as source_knossos, ParallelExecutor(
77-
jobs
78-
) as pool:
79-
knossos_cubes = list(source_knossos.list_cubes())
80-
if len(knossos_cubes) == 0:
81-
logging.error("No input KNOSSOS cubes found.")
82-
exit(1)
79+
ensure_wkw(target_wkw_info)
8380

84-
knossos_cubes.sort()
85-
for cube_xyz in knossos_cubes:
86-
pool.submit(
87-
convert_cube_job, cube_xyz, source_knossos_info, target_wkw_info
88-
)
81+
with open_knossos(source_knossos_info) as source_knossos:
82+
with get_executor_for_args(args) as executor:
83+
knossos_cubes = list(source_knossos.list_cubes())
84+
if len(knossos_cubes) == 0:
85+
logging.error("No input KNOSSOS cubes found.")
86+
exit(1)
87+
88+
knossos_cubes.sort()
89+
futures = []
90+
for cube_xyz in knossos_cubes:
91+
futures.append(
92+
executor.submit(
93+
convert_cube_job, cube_xyz, source_knossos_info, target_wkw_info
94+
)
95+
)
96+
wait_and_ensure_success(futures)
8997

9098

9199
if __name__ == "__main__":
@@ -101,4 +109,5 @@ def convert_knossos(source_path, target_path, layer_name, dtype, mag=1, jobs=1):
101109
args.dtype,
102110
args.mag,
103111
args.jobs,
112+
args,
104113
)

wkcuber/cubing.py

Lines changed: 28 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -3,16 +3,18 @@
33
import numpy as np
44
from argparse import ArgumentParser
55
from os import path
6+
import cluster_tools
67

78
from .utils import (
89
get_chunks,
910
find_files,
1011
add_verbose_flag,
11-
add_jobs_flag,
1212
open_wkw,
13+
ensure_wkw,
1314
WkwDatasetInfo,
14-
ParallelExecutor,
15-
pool_get_lock,
15+
add_distribution_flags,
16+
get_executor_for_args,
17+
wait_and_ensure_success,
1618
)
1719
from .image_readers import image_reader
1820

@@ -51,7 +53,7 @@ def create_parser():
5153
)
5254

5355
add_verbose_flag(parser)
54-
add_jobs_flag(parser)
56+
add_distribution_flags(parser)
5557

5658
return parser
5759

@@ -91,9 +93,7 @@ def cubing_job(
9193
if len(z_batches) == 0:
9294
return
9395

94-
with open_wkw(
95-
target_wkw_info, pool_get_lock(), num_channels=num_channels
96-
) as target_wkw:
96+
with open_wkw(target_wkw_info, num_channels=num_channels) as target_wkw:
9797
# Iterate over batches of continuous z sections
9898
# The batches have a maximum size of `batch_size`
9999
# Batched iterations allows to utilize IO more efficiently
@@ -144,7 +144,9 @@ def cubing_job(
144144
raise exc
145145

146146

147-
def cubing(source_path, target_path, layer_name, dtype, batch_size, jobs) -> dict:
147+
def cubing(
148+
source_path, target_path, layer_name, dtype, batch_size, jobs, args=None
149+
) -> dict:
148150

149151
target_wkw_info = WkwDatasetInfo(target_path, layer_name, dtype, 1)
150152
source_files = find_source_filenames(source_path)
@@ -155,23 +157,31 @@ def cubing(source_path, target_path, layer_name, dtype, batch_size, jobs) -> dic
155157
num_z = len(source_files)
156158

157159
logging.info("Found source files: count={} size={}x{}".format(num_z, num_x, num_y))
158-
with ParallelExecutor(jobs) as pool:
160+
161+
ensure_wkw(target_wkw_info, num_channels=num_channels)
162+
163+
with get_executor_for_args(args) as executor:
164+
futures = []
159165
# We iterate over all z sections
160166
for z in range(0, num_z, BLOCK_LEN):
161167
# Prepare z batches
162168
max_z = min(num_z, z + BLOCK_LEN)
163169
z_batch = list(range(z, max_z))
164170
# Execute
165-
pool.submit(
166-
cubing_job,
167-
target_wkw_info,
168-
z_batch,
169-
source_files[z:max_z],
170-
batch_size,
171-
(num_x, num_y),
172-
num_channels,
171+
futures.append(
172+
executor.submit(
173+
cubing_job,
174+
target_wkw_info,
175+
z_batch,
176+
source_files[z:max_z],
177+
batch_size,
178+
(num_x, num_y),
179+
num_channels,
180+
)
173181
)
174182

183+
wait_and_ensure_success(futures)
184+
175185
# Return Bounding Box
176186
return {"topLeft": [0, 0, 0], "width": num_x, "height": num_y, "depth": num_z}
177187

@@ -189,4 +199,5 @@ def cubing(source_path, target_path, layer_name, dtype, batch_size, jobs) -> dic
189199
args.dtype,
190200
args.batch_size,
191201
args.jobs,
202+
args=args,
192203
)

0 commit comments

Comments
 (0)