@@ -16,9 +16,21 @@ def __exit__(self, *args, **kwargs):
16
16
pass
17
17
18
18
19
- TRLOCK = TRLock ()
20
- PRLOCK = DummyLock ()
21
- DRLOCK = DummyLock ()
19
+ GIT_LOCK_THREAD = TRLock ()
20
+ GIT_LOCK_PROCESS = DummyLock ()
21
+ GIT_LOCK_DASK = DummyLock ()
22
+
23
+
24
+ @contextlib .contextmanager
25
+ def lock_git_operation ():
26
+ """
27
+ A context manager to lock git operations - it can be acquired once per thread, once per process,
28
+ and once per dask worker.
29
+ Note that this is a reentrant lock, so it can be acquired multiple times by the same thread/process/worker.
30
+ """
31
+
32
+ with GIT_LOCK_THREAD , GIT_LOCK_PROCESS , GIT_LOCK_DASK :
33
+ yield
22
34
23
35
24
36
logger = logging .getLogger (__name__ )
@@ -27,10 +39,12 @@ def __exit__(self, *args, **kwargs):
27
39
class DaskRLock (DaskLock ):
28
40
"""A reentrant lock for dask that is always blocking and never times out."""
29
41
30
- def acquire (self ):
31
- if not hasattr (self , "_rcount" ):
32
- self ._rcount = 0
42
+ def __init__ (self , * args , ** kwargs ):
43
+ super ().__init__ (* args , ** kwargs )
44
+ self ._rcount = 0
45
+ self ._rdata = None
33
46
47
+ def acquire (self , * args ):
34
48
self ._rcount += 1
35
49
36
50
if self ._rcount == 1 :
@@ -39,29 +53,29 @@ def acquire(self):
39
53
return self ._rdata
40
54
41
55
def release (self ):
42
- if not hasattr ( self , "_rcount" ) or self ._rcount == 0 :
56
+ if self ._rcount == 0 :
43
57
raise RuntimeError ("Lock not acquired so cannot be released!" )
44
58
45
59
self ._rcount -= 1
46
60
47
61
if self ._rcount == 0 :
48
- delattr ( self , " _rdata" )
62
+ self . _rdata = None
49
63
return super ().release ()
50
64
else :
51
65
return None
52
66
53
67
54
68
def _init_process (lock ):
55
- global PRLOCK
56
- PRLOCK = lock
69
+ global GIT_LOCK_PROCESS
70
+ GIT_LOCK_PROCESS = lock
57
71
58
72
59
73
def _init_dask (lock ):
60
- global DRLOCK
61
- # it appears we have to construct the locak by name instead
74
+ global GIT_LOCK_DASK
75
+ # it appears we have to construct the lock by name instead
62
76
# of passing the object itself
63
77
# otherwise dask uses a regular lock
64
- DRLOCK = DaskRLock (name = lock )
78
+ GIT_LOCK_DASK = DaskRLock (name = lock )
65
79
66
80
67
81
@contextlib .contextmanager
@@ -70,8 +84,8 @@ def executor(kind: str, max_workers: int, daemon=True) -> typing.Iterator[Execut
70
84
71
85
This allows us to easily use other executors as needed.
72
86
"""
73
- global DRLOCK
74
- global PRLOCK
87
+ global GIT_LOCK_DASK
88
+ global GIT_LOCK_PROCESS
75
89
76
90
if kind == "thread" :
77
91
with ThreadPoolExecutor (max_workers = max_workers ) as pool_t :
@@ -85,7 +99,7 @@ def executor(kind: str, max_workers: int, daemon=True) -> typing.Iterator[Execut
85
99
initargs = (lock ,),
86
100
) as pool_p :
87
101
yield pool_p
88
- PRLOCK = DummyLock ()
102
+ GIT_LOCK_PROCESS = DummyLock ()
89
103
elif kind in ["dask" , "dask-process" , "dask-thread" ]:
90
104
import dask
91
105
import distributed
@@ -101,6 +115,6 @@ def executor(kind: str, max_workers: int, daemon=True) -> typing.Iterator[Execut
101
115
with distributed .Client (cluster ) as client :
102
116
client .run (_init_dask , "cftick" )
103
117
yield ClientExecutor (client )
104
- DRLOCK = DummyLock ()
118
+ GIT_LOCK_DASK = DummyLock ()
105
119
else :
106
120
raise NotImplementedError ("That kind is not implemented" )
0 commit comments