@@ -16,21 +16,9 @@ def __exit__(self, *args, **kwargs):
16
16
pass
17
17
18
18
19
- GIT_LOCK_THREAD = TRLock ()
20
- GIT_LOCK_PROCESS = DummyLock ()
21
- GIT_LOCK_DASK = DummyLock ()
22
-
23
-
24
- @contextlib .contextmanager
25
- def lock_git_operation ():
26
- """
27
- A context manager to lock git operations - it can be acquired once per thread, once per process,
28
- and once per dask worker.
29
- Note that this is a reentrant lock, so it can be acquired multiple times by the same thread/process/worker.
30
- """
31
-
32
- with GIT_LOCK_THREAD , GIT_LOCK_PROCESS , GIT_LOCK_DASK :
33
- yield
19
+ TRLOCK = TRLock ()
20
+ PRLOCK = DummyLock ()
21
+ DRLOCK = DummyLock ()
34
22
35
23
36
24
logger = logging .getLogger (__name__ )
@@ -39,12 +27,10 @@ def lock_git_operation():
39
27
class DaskRLock (DaskLock ):
40
28
"""A reentrant lock for dask that is always blocking and never times out."""
41
29
42
- def __init__ (self , * args , ** kwargs ):
43
- super ().__init__ (* args , ** kwargs )
44
- self ._rcount = 0
45
- self ._rdata = None
30
+ def acquire (self ):
31
+ if not hasattr (self , "_rcount" ):
32
+ self ._rcount = 0
46
33
47
- def acquire (self , * args ):
48
34
self ._rcount += 1
49
35
50
36
if self ._rcount == 1 :
@@ -53,29 +39,29 @@ def acquire(self, *args):
53
39
return self ._rdata
54
40
55
41
def release (self ):
56
- if self ._rcount == 0 :
42
+ if not hasattr ( self , "_rcount" ) or self ._rcount == 0 :
57
43
raise RuntimeError ("Lock not acquired so cannot be released!" )
58
44
59
45
self ._rcount -= 1
60
46
61
47
if self ._rcount == 0 :
62
- self . _rdata = None
48
+ delattr ( self , "_rdata" )
63
49
return super ().release ()
64
50
else :
65
51
return None
66
52
67
53
68
54
def _init_process (lock ):
69
- global GIT_LOCK_PROCESS
70
- GIT_LOCK_PROCESS = lock
55
+ global PRLOCK
56
+ PRLOCK = lock
71
57
72
58
73
59
def _init_dask (lock ):
74
- global GIT_LOCK_DASK
75
- # it appears we have to construct the lock by name instead
60
+ global DRLOCK
61
+ # it appears we have to construct the locak by name instead
76
62
# of passing the object itself
77
63
# otherwise dask uses a regular lock
78
- GIT_LOCK_DASK = DaskRLock (name = lock )
64
+ DRLOCK = DaskRLock (name = lock )
79
65
80
66
81
67
@contextlib .contextmanager
@@ -84,8 +70,8 @@ def executor(kind: str, max_workers: int, daemon=True) -> typing.Iterator[Execut
84
70
85
71
This allows us to easily use other executors as needed.
86
72
"""
87
- global GIT_LOCK_DASK
88
- global GIT_LOCK_PROCESS
73
+ global DRLOCK
74
+ global PRLOCK
89
75
90
76
if kind == "thread" :
91
77
with ThreadPoolExecutor (max_workers = max_workers ) as pool_t :
@@ -99,7 +85,7 @@ def executor(kind: str, max_workers: int, daemon=True) -> typing.Iterator[Execut
99
85
initargs = (lock ,),
100
86
) as pool_p :
101
87
yield pool_p
102
- GIT_LOCK_PROCESS = DummyLock ()
88
+ PRLOCK = DummyLock ()
103
89
elif kind in ["dask" , "dask-process" , "dask-thread" ]:
104
90
import dask
105
91
import distributed
@@ -115,6 +101,6 @@ def executor(kind: str, max_workers: int, daemon=True) -> typing.Iterator[Execut
115
101
with distributed .Client (cluster ) as client :
116
102
client .run (_init_dask , "cftick" )
117
103
yield ClientExecutor (client )
118
- GIT_LOCK_DASK = DummyLock ()
104
+ DRLOCK = DummyLock ()
119
105
else :
120
106
raise NotImplementedError ("That kind is not implemented" )
0 commit comments