Skip to content
This repository was archived by the owner on Dec 1, 2025. It is now read-only.

Commit 5bef90b

Browse files
JacksonMaxfieldanl
andauthored
feature/add-aics-cluster-check (#2)
* Add AICS cluster deep check script * Add README to descript why script exists and how to run * Add comment to README about where the dask worker logs will be saved * Fix lint error * Fix typo in scripts README * Fix typo in check_aics_cluster, resource -> resources * Convert to pytest for parametrize deep checks * Rename file from check to test * Fix bugs in test script * Fix lint and bugs in test script * Remove extra logging config * Switch from log to print * Remove extra caplog * Try using pytest.main * Try calling the tests plainly * Switch from print to logging and switch from pytest to itertools * Remove pytest from deepcheck requirements * Import itertools * Add log statement for which cluster currently running * Fix minor log statement organization * Update README * Add time sleep after each deep check runs * Only run a single test parameter set on the large worker check * Run black on script * Change cluster config parameter sets * Fix typo in README Co-authored-by: Andy Leonard <[email protected]> * Add exception handling * Remove extra logging statements * Record IO time results, log the best config, and save all results * Black and flake8 * Drop n workers on large workers check to match updated node availablity * Move scheduler work stealing to immediately after import * Remove n workers 256 checks * Upgrade distributed version pin * Remove dask worker stealing as it doesn't matter for these tests * Lower time.sleep amount * Cleanup a bit of how the checks are ran * Adding logging comment about IO iterations start * Fix completion time return Co-authored-by: Andy Leonard <[email protected]>
1 parent aaf8360 commit 5bef90b

File tree

6 files changed

+294
-11
lines changed

6 files changed

+294
-11
lines changed

aics_dask_utils/__init__.py

Lines changed: 2 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,8 @@
22

33
"""Top-level package for AICS Dask Utils."""
44

5+
from .distributed_handler import DEFAULT_MAX_THREADS, DistributedHandler # noqa: F401
6+
57
__author__ = "Jackson Maxfield Brown"
68
__email__ = "[email protected]"
79
# Do not edit this string manually, always use bumpversion
@@ -11,6 +13,3 @@
1113

1214
def get_module_version():
1315
return __version__
14-
15-
16-
from .distributed_handler import DEFAULT_MAX_THREADS, DistributedHandler # noqa: F401

scripts/README.md

Lines changed: 24 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,24 @@
1+
# Scripts
2+
3+
## check_aics_cluster.py
4+
5+
A script to run deep checks on the AICS SLURM cluster and its interactions with Dask.
6+
It checks:
7+
* All SLURM nodes (or at least many SLURM nodes)
8+
* `distributed.Client.wait_for_workers` to ensure that all nodes start up as workers
9+
* IO checks with `aicsimageio>=3.2.0` to ensure that Dask IO isn't failing on any worker
10+
* Timings for completing the 10000 IO iterations against the various cluster configs
11+
12+
### Full Commands to Run (start on SLURM master)
13+
```bash
14+
srun -c 8 --mem 40GB -p aics_cpu_general --pty bash
15+
git clone [email protected]:AllenCellModeling/aics_dask_utils.git
16+
cd aics_dask_utils
17+
conda create --name aics_dask_utils python=3.7 -y
18+
conda activate aics_dask_utils
19+
pip install -e .[deepcheck]
20+
python scripts/check_aics_cluster.py
21+
```
22+
23+
Dask worker logs from each check will be placed in:
24+
`.dask_logs/{cluster_creation_time}/{config}/{test_type}`

scripts/resources/example.ome.tiff

1.93 MB
Binary file not shown.

scripts/test_aics_cluster.py

Lines changed: 255 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,255 @@
1+
#!/usr/bin/env python
2+
# -*- coding: utf-8 -*-
3+
4+
import itertools
5+
import json
6+
import logging
7+
import signal
8+
import time
9+
from datetime import datetime
10+
from pathlib import Path
11+
12+
import dask.config
13+
from aicsimageio import AICSImage
14+
from dask_jobqueue import SLURMCluster
15+
from distributed import Client
16+
from imageio import imwrite
17+
18+
from aics_dask_utils import DistributedHandler
19+
20+
###############################################################################
21+
22+
logging.basicConfig(
23+
level=logging.INFO,
24+
format="[%(levelname)4s: %(module)s:%(lineno)4s %(asctime)s] %(message)s",
25+
)
26+
log = logging.getLogger(__name__)
27+
28+
###############################################################################
29+
# Test function definitions
30+
31+
32+
def spawn_cluster(
33+
cluster_type: str, cores_per_worker: int, memory_per_worker: str, n_workers: int
34+
) -> Client:
35+
# Create or get log dir
36+
log_dir_name = f"c_{cores_per_worker}-mem_{memory_per_worker}-workers_{n_workers}"
37+
log_dir_time = datetime.now().isoformat().split(".")[0] # Do not include ms
38+
log_dir = Path(
39+
f".dask_logs/{log_dir_time}/{log_dir_name}/{cluster_type}"
40+
).expanduser()
41+
# Log dir settings
42+
log_dir.mkdir(parents=True, exist_ok=True)
43+
44+
# Create cluster
45+
log.info("Creating SLURMCluster")
46+
cluster = SLURMCluster(
47+
cores=cores_per_worker,
48+
memory=memory_per_worker,
49+
queue="aics_cpu_general",
50+
walltime="10:00:00",
51+
local_directory=str(log_dir),
52+
log_directory=str(log_dir),
53+
)
54+
55+
# Create client connection
56+
client = Client(cluster)
57+
log.info(f"Dask dashboard available at: {cluster.dashboard_link}")
58+
59+
return client
60+
61+
62+
def signal_handler(signum, frame):
63+
raise TimeoutError()
64+
65+
66+
def run_wait_for_workers_check(client: Client, timeout: int, n_workers: int):
67+
log.info("Checking wait for workers...")
68+
69+
# `client.wait_for_workers` is a blocking function, this signal library
70+
# allows wrapping blocking statements in handlers to check for other stuff
71+
try:
72+
# Setup signal check for timeout duration
73+
signal.signal(signal.SIGALRM, signal_handler)
74+
signal.alarm(timeout)
75+
76+
# Actual wait for workers
77+
client.cluster.scale(n_workers)
78+
client.wait_for_workers(n_workers)
79+
80+
# Close signal
81+
signal.alarm(0)
82+
log.info("Successfully waited for workers!")
83+
except TimeoutError:
84+
log.error("Timed out wait for workers check")
85+
86+
log.info("Wait for workers check done.")
87+
88+
89+
def run_iteration(file: Path, save_path: Path) -> Path:
90+
# Read image
91+
img = AICSImage(file)
92+
93+
# Select middle slice of structure channel
94+
data = img.get_image_data(
95+
"YX", S=0, T=0, C=img.get_channel_names().index("structure"), Z=img.size_z // 2,
96+
)
97+
98+
# Write out image as png
99+
imwrite(save_path, data)
100+
101+
return save_path
102+
103+
104+
def run_image_read_checks(client: Client, n_workers: int):
105+
log.info("Checking IO iterations...")
106+
107+
# Spawn workers if they aren't all there
108+
client.cluster.scale(n_workers)
109+
110+
# Get test image path
111+
source_image = Path(__file__).parent / "resources" / "example.ome.tiff"
112+
113+
# Log time duration
114+
start = time.perf_counter()
115+
116+
# Run check iterations
117+
with DistributedHandler(client.cluster.scheduler_address) as handler:
118+
handler.batched_map(
119+
run_iteration,
120+
[source_image for i in range(10000)],
121+
[source_image.parent / f"{i}.png" for i in range(10000)],
122+
)
123+
124+
completion_time = time.perf_counter() - start
125+
log.info(f"IO checks completed in: {completion_time} seconds")
126+
127+
return completion_time
128+
129+
130+
def deep_cluster_check(
131+
cores_per_worker: int,
132+
memory_per_worker: str,
133+
n_workers: int,
134+
timeout: int = 120, # seconds
135+
):
136+
completion_time = None
137+
try:
138+
log.info(f"Running tests with config: {locals()}")
139+
140+
log.info("Spawning SLURMCluster...")
141+
client = spawn_cluster(
142+
cluster_type="wait_for_workers",
143+
cores_per_worker=cores_per_worker,
144+
memory_per_worker=memory_per_worker,
145+
n_workers=n_workers,
146+
)
147+
148+
run_wait_for_workers_check(client=client, timeout=timeout, n_workers=n_workers)
149+
completion_time = run_image_read_checks(client=client, n_workers=n_workers)
150+
151+
log.info("Tearing down cluster.")
152+
client.shutdown()
153+
client.close()
154+
log.info("-" * 80)
155+
156+
log.info("Waiting a bit for full cluster teardown")
157+
time.sleep(30)
158+
159+
log.info("All checks complete")
160+
except Exception as e:
161+
log.error(f"An error occurred:")
162+
log.error(e)
163+
164+
return completion_time
165+
166+
167+
########################################################################################
168+
# Actual tests
169+
170+
171+
def test_small_workers():
172+
"""
173+
Run the deep cluster check with small workers.
174+
Memory per worker is set to 4 * cores per worker.
175+
Timeout is default to deep cluster check default.
176+
177+
This is to test the scaling of Dask on SLURM.
178+
"""
179+
# Run tests
180+
results = []
181+
params = itertools.product([1, 2, 4], [12, 64, 128])
182+
for cores_per_worker, n_workers in params:
183+
completion_time = deep_cluster_check(
184+
cores_per_worker=cores_per_worker,
185+
memory_per_worker=f"{cores_per_worker * 4}GB",
186+
n_workers=n_workers,
187+
)
188+
log.info("=" * 80)
189+
190+
results.append(
191+
{
192+
"cores_per_worker": cores_per_worker,
193+
"n_workers": n_workers,
194+
"completion_time": completion_time,
195+
}
196+
)
197+
198+
# Get best config
199+
best = None
200+
for config in results:
201+
if config["completion_time"] is not None:
202+
if (
203+
# Handle starting case
204+
(best is None)
205+
# Handle new better case
206+
or (
207+
best is not None
208+
and config["completion_time"] < best["completion_time"]
209+
)
210+
):
211+
best = config
212+
213+
# Log best config
214+
log.info("=" * 80)
215+
log.info(f"Cluster config with lowest IO completion_time: {best}")
216+
log.info("=" * 80)
217+
218+
# Save results
219+
with open("aics_cluster_time_results.json", "w") as write_out:
220+
json.dump(results, write_out)
221+
222+
223+
def test_large_workers():
224+
"""
225+
Run the deep cluster check with small workers.
226+
Cores per worker is set to 1 for simplicitly.
227+
Memory per worker is set 160GB for all tests to lock down a single node.
228+
N Workers is set to 20, the number of nodes listed as "IDLE" + "MIX" from `sinfo`.
229+
Timeout is default to deep cluster check default.
230+
231+
This is to test that all nodes of the cluster are available.
232+
"""
233+
deep_cluster_check(
234+
cores_per_worker=1, memory_per_worker="160GB", n_workers=20,
235+
)
236+
log.info("=" * 80)
237+
238+
239+
###############################################################################
240+
# Runner
241+
242+
243+
def main():
244+
test_large_workers()
245+
log.info("=" * 80)
246+
log.info("All nodes checked, moving on to cluster config checks.")
247+
log.info("=" * 80)
248+
test_small_workers()
249+
250+
251+
###############################################################################
252+
# Allow caller to directly run this module (usually in development scenarios)
253+
254+
if __name__ == "__main__":
255+
main()

setup.py

Lines changed: 12 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -46,20 +46,30 @@
4646
]
4747

4848
requirements = [
49-
"distributed>=2.12.0",
49+
"distributed>=2.19.0",
50+
]
51+
52+
deepcheck_requirements = [
53+
"aicsimageio>=3.2.0",
54+
"bokeh>=2.0.2",
55+
"dask>=2.16.0",
56+
"dask_jobqueue>=0.7.1",
57+
"imageio>=2.8.0",
5058
]
5159

5260
extra_requirements = {
5361
"test": test_requirements,
5462
"setup": setup_requirements,
5563
"dev": dev_requirements,
5664
"interactive": interactive_requirements,
65+
"deepcheck": deepcheck_requirements,
5766
"all": [
5867
*requirements,
5968
*test_requirements,
6069
*setup_requirements,
6170
*dev_requirements,
6271
*interactive_requirements,
72+
*deepcheck_requirements,
6373
],
6474
}
6575

@@ -76,9 +86,7 @@
7686
"Programming Language :: Python :: 3.8",
7787
],
7888
description="Utility functions and documentation related to Dask and AICS",
79-
entry_points={
80-
"console_scripts": ["my_example=aics_dask_utils.bin.my_example:main"],
81-
},
89+
entry_points={},
8290
install_requires=requirements,
8391
license="Allen Institute Software License",
8492
long_description=readme,

tox.ini

Lines changed: 1 addition & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,3 @@
1-
[flake8]
2-
exclude = aics_dask_utils/vendor/*
3-
41
[tox]
52
skipsdist = True
63
envlist = py36, py37, py38, lint
@@ -13,7 +10,7 @@ markers =
1310
deps =
1411
.[test]
1512
commands =
16-
flake8 aics_dask_utils --count --verbose --max-line-length=127 --show-source --statistics
13+
flake8 aics_dask_utils --count --verbose --show-source --statistics
1714
black --check aics_dask_utils
1815

1916
[testenv]

0 commit comments

Comments
 (0)