11
11
# no need to worry about serializing the lock
12
12
SerializableLock = threading .Lock # type: ignore
13
13
14
- try :
15
- from dask .distributed import Lock as DistributedLock
16
- except ImportError :
17
- DistributedLock = None # type: ignore
18
-
19
14
20
15
# Locks used by multiple backends.
21
16
# Neither HDF5 nor the netCDF-C library are thread-safe.
@@ -41,14 +36,6 @@ def _get_multiprocessing_lock(key):
41
36
return multiprocessing .Lock ()
42
37
43
38
44
- _LOCK_MAKERS = {
45
- None : _get_threaded_lock ,
46
- "threaded" : _get_threaded_lock ,
47
- "multiprocessing" : _get_multiprocessing_lock ,
48
- "distributed" : DistributedLock ,
49
- }
50
-
51
-
52
39
def _get_lock_maker (scheduler = None ):
53
40
"""Returns an appropriate function for creating resource locks.
54
41
@@ -61,7 +48,23 @@ def _get_lock_maker(scheduler=None):
61
48
--------
62
49
dask.utils.get_scheduler_lock
63
50
"""
64
- return _LOCK_MAKERS [scheduler ]
51
+
52
+ if scheduler is None :
53
+ return _get_threaded_lock
54
+ elif scheduler == "threaded" :
55
+ return _get_threaded_lock
56
+ elif scheduler == "multiprocessing" :
57
+ return _get_multiprocessing_lock
58
+ elif scheduler == "distributed" :
59
+ # Lazy import distributed since it is can add a significant
60
+ # amount of time to import
61
+ try :
62
+ from dask .distributed import Lock as DistributedLock
63
+ except ImportError :
64
+ DistributedLock = None # type: ignore
65
+ return DistributedLock
66
+ else :
67
+ raise KeyError (scheduler )
65
68
66
69
67
70
def _get_scheduler (get = None , collection = None ) -> str | None :
@@ -128,15 +131,12 @@ def acquire(lock, blocking=True):
128
131
if blocking :
129
132
# no arguments needed
130
133
return lock .acquire ()
131
- elif DistributedLock is not None and isinstance (lock , DistributedLock ):
132
- # distributed.Lock doesn't support the blocking argument yet:
133
- # https://github.com/dask/distributed/pull/2412
134
- return lock .acquire (timeout = 0 )
135
134
else :
136
135
# "blocking" keyword argument not supported for:
137
136
# - threading.Lock on Python 2.
138
137
# - dask.SerializableLock with dask v1.0.0 or earlier.
139
138
# - multiprocessing.Lock calls the argument "block" instead.
139
+ # - dask.distributed.Lock uses the blocking argument as the first one
140
140
return lock .acquire (blocking )
141
141
142
142
0 commit comments