|
1 | 1 | from __future__ import annotations |
2 | 2 |
|
3 | 3 | import argparse |
| 4 | +import asyncio |
4 | 5 | import time |
5 | 6 | from asyncio import Future |
6 | 7 | from typing import Callable, Iterable, Any |
7 | 8 |
|
8 | 9 | from dask.distributed import LocalCluster, Client |
| 10 | +from distributed import connect |
9 | 11 |
|
10 | 12 | from pyhdx.config import cfg |
11 | 13 | from pyhdx.support import select_config |
@@ -75,12 +77,22 @@ def default_cluster(**kwargs): |
75 | 77 | def verify_cluster(scheduler_address, timeout="2s"): |
76 | 78 | """Check if a valid dask scheduler is running at the provided scheduler_address""" |
77 | 79 | try: |
78 | | - client = Client(scheduler_address, timeout=timeout) |
| 80 | + asyncio.run(connect(scheduler_address, timeout=timeout)) |
79 | 81 | return True |
80 | | - except (TimeoutError, IOError): |
| 82 | + except (TimeoutError, OSError): |
| 83 | + return False |
| 84 | + |
| 85 | + |
| 86 | +def verify_cluster_async(scheduler_address, timeout="2s"): |
| 87 | + """Check if a valid dask scheduler is running at the provided scheduler_address""" |
| 88 | + try: |
| 89 | + asyncio.run(connect(scheduler_address, timeout=timeout)) |
| 90 | + return True |
| 91 | + except (TimeoutError, OSError): |
81 | 92 | return False |
82 | 93 |
|
83 | 94 |
|
| 95 | + |
84 | 96 | def blocking_cluster(): |
85 | 97 | """Start a dask LocalCluster and block until iterrupted""" |
86 | 98 | parser = argparse.ArgumentParser(description="Start a new Dask local cluster") |
|
0 commit comments