15
15
import abc
16
16
import json
17
17
import logging
18
+ import threading
18
19
import time
19
20
import uuid
20
21
@@ -394,6 +395,9 @@ class NodeRequest(BaseModel):
394
395
def __init__ (self , id = None ):
395
396
super (NodeRequest , self ).__init__ (id )
396
397
self .lock = None
398
+ # Local thread lock that is acquired when we are manipulating
399
+ # the ZK lock.
400
+ self ._thread_lock = threading .Lock ()
397
401
self .declined_by = []
398
402
self .node_types = []
399
403
self .nodes = []
@@ -510,6 +514,9 @@ def __init__(self, id=None):
510
514
super (Node , self ).__init__ (id )
511
515
# Local lock object; not serialized
512
516
self .lock = None
517
+ # Local thread lock that is acquired when we are manipulating
518
+ # the ZK lock.
519
+ self ._thread_lock = threading .Lock ()
513
520
# Cached list of lock contenders; not serialized (and possibly
514
521
# not up to date; use for status listings only).
515
522
self .lock_contenders = set ()
@@ -2138,22 +2145,23 @@ def lockNodeRequest(self, request, blocking=True, timeout=None):
2138
2145
log = get_annotated_logger (self .log , event_id = request .event_id ,
2139
2146
node_request_id = request .id )
2140
2147
path = self ._requestLockPath (request .id )
2141
- try :
2142
- lock = Lock (self .kazoo_client , path )
2143
- have_lock = lock .acquire (blocking , timeout )
2144
- except kze .LockTimeout :
2145
- raise npe .TimeoutException (
2146
- "Timeout trying to acquire lock %s" % path )
2147
- except kze .NoNodeError :
2148
- have_lock = False
2149
- log .error ("Request not found for locking: %s" , request )
2148
+ with request ._thread_lock :
2149
+ try :
2150
+ lock = Lock (self .kazoo_client , path )
2151
+ have_lock = lock .acquire (blocking , timeout )
2152
+ except kze .LockTimeout :
2153
+ raise npe .TimeoutException (
2154
+ "Timeout trying to acquire lock %s" % path )
2155
+ except kze .NoNodeError :
2156
+ have_lock = False
2157
+ log .error ("Request not found for locking: %s" , request )
2150
2158
2151
- # If we aren't blocking, it's possible we didn't get the lock
2152
- # because someone else has it.
2153
- if not have_lock :
2154
- raise npe .ZKLockException ("Did not get lock on %s" % path )
2159
+ # If we aren't blocking, it's possible we didn't get the lock
2160
+ # because someone else has it.
2161
+ if not have_lock :
2162
+ raise npe .ZKLockException ("Did not get lock on %s" % path )
2155
2163
2156
- request .lock = lock
2164
+ request .lock = lock
2157
2165
2158
2166
# Do an in-place update of the node request so we have the latest data
2159
2167
self .updateNodeRequest (request )
@@ -2171,8 +2179,9 @@ def unlockNodeRequest(self, request):
2171
2179
if request .lock is None :
2172
2180
raise npe .ZKLockException (
2173
2181
"Request %s does not hold a lock" % request )
2174
- request .lock .release ()
2175
- request .lock = None
2182
+ with request ._thread_lock :
2183
+ request .lock .release ()
2184
+ request .lock = None
2176
2185
2177
2186
def lockNode (self , node , blocking = True , timeout = None ,
2178
2187
ephemeral = True , identifier = None ):
@@ -2199,22 +2208,23 @@ def lockNode(self, node, blocking=True, timeout=None,
2199
2208
and could not get the lock, or a lock is already held.
2200
2209
'''
2201
2210
path = self ._nodeLockPath (node .id )
2202
- try :
2203
- lock = Lock (self .kazoo_client , path , identifier )
2204
- have_lock = lock .acquire (blocking , timeout , ephemeral )
2205
- except kze .LockTimeout :
2206
- raise npe .TimeoutException (
2207
- "Timeout trying to acquire lock %s" % path )
2208
- except kze .NoNodeError :
2209
- have_lock = False
2210
- self .log .error ("Node not found for locking: %s" , node )
2211
+ with node ._thread_lock :
2212
+ try :
2213
+ lock = Lock (self .kazoo_client , path , identifier )
2214
+ have_lock = lock .acquire (blocking , timeout , ephemeral )
2215
+ except kze .LockTimeout :
2216
+ raise npe .TimeoutException (
2217
+ "Timeout trying to acquire lock %s" % path )
2218
+ except kze .NoNodeError :
2219
+ have_lock = False
2220
+ self .log .error ("Node not found for locking: %s" , node )
2211
2221
2212
- # If we aren't blocking, it's possible we didn't get the lock
2213
- # because someone else has it.
2214
- if not have_lock :
2215
- raise npe .ZKLockException ("Did not get lock on %s" % path )
2222
+ # If we aren't blocking, it's possible we didn't get the lock
2223
+ # because someone else has it.
2224
+ if not have_lock :
2225
+ raise npe .ZKLockException ("Did not get lock on %s" % path )
2216
2226
2217
- node .lock = lock
2227
+ node .lock = lock
2218
2228
2219
2229
# Do an in-place update of the node so we have the latest data.
2220
2230
self .updateNode (node )
@@ -2231,8 +2241,9 @@ def unlockNode(self, node):
2231
2241
'''
2232
2242
if node .lock is None :
2233
2243
raise npe .ZKLockException ("Node %s does not hold a lock" % node )
2234
- node .lock .release ()
2235
- node .lock = None
2244
+ with node ._thread_lock :
2245
+ node .lock .release ()
2246
+ node .lock = None
2236
2247
2237
2248
def forceUnlockNode (self , node ):
2238
2249
'''
0 commit comments