15
15
16
16
"""
17
17
import sys
18
+
18
19
try :
19
20
from time import monotonic as now
20
21
except ImportError :
27
28
CancelledError ,
28
29
KazooException ,
29
30
LockTimeout ,
30
- NoNodeError
31
+ NoNodeError ,
31
32
)
32
33
from kazoo .protocol .states import KazooState
33
34
from kazoo .retry import (
34
35
ForceRetryError ,
35
36
KazooRetry ,
36
- RetryFailedError
37
+ RetryFailedError ,
37
38
)
38
39
39
40
@@ -80,20 +81,33 @@ class Lock(object):
80
81
81
82
# Node names which exclude this contender when present at a lower
82
83
# sequence number. Involved in read/write locks.
83
- _EXCLUDE_NAMES = ["__lock__" , "-lock-" ]
84
+ _EXCLUDE_NAMES = ["__lock__" ]
84
85
85
- def __init__ (self , client , path , identifier = None ):
86
+ def __init__ (
87
+ self , client , path , identifier = None , additional_lock_patterns = ()
88
+ ):
86
89
"""Create a Kazoo lock.
87
90
88
91
:param client: A :class:`~kazoo.client.KazooClient` instance.
89
92
:param path: The lock path to use.
90
- :param identifier: Name to use for this lock contender. This
91
- can be useful for querying to see who the
92
- current lock contenders are.
93
-
93
+ :param identifier: Name to use for this lock contender. This can be
94
+ useful for querying to see who the current lock
95
+ contenders are.
96
+ :param additional_lock_patterns: Strings that will be used to
97
+ identify other znode in the path
98
+ that should be considered contenders
99
+ for this lock.
100
+ Use this for cross-implementation
101
+ compatibility.
102
+
103
+ .. versionadded:: 2.7.1
104
+ The additional_lock_patterns option.
94
105
"""
95
106
self .client = client
96
107
self .path = path
108
+ self ._exclude_names = set (
109
+ self ._EXCLUDE_NAMES + list (additional_lock_patterns )
110
+ )
97
111
98
112
# some data is written to the node. this can be queried via
99
113
# contenders() to see who is contending for the lock
@@ -113,8 +127,9 @@ def __init__(self, client, path, identifier=None):
113
127
self .is_acquired = False
114
128
self .assured_path = False
115
129
self .cancelled = False
116
- self ._retry = KazooRetry (max_tries = None ,
117
- sleep_func = client .handler .sleep_func )
130
+ self ._retry = KazooRetry (
131
+ max_tries = None , sleep_func = client .handler .sleep_func
132
+ )
118
133
self ._lock = client .handler .lock_object ()
119
134
120
135
def _ensure_path (self ):
@@ -179,9 +194,12 @@ def _acquire_lock():
179
194
try :
180
195
gotten = False
181
196
try :
182
- gotten = retry (self ._inner_acquire ,
183
- blocking = blocking , timeout = timeout ,
184
- ephemeral = ephemeral )
197
+ gotten = retry (
198
+ self ._inner_acquire ,
199
+ blocking = blocking ,
200
+ timeout = timeout ,
201
+ ephemeral = ephemeral ,
202
+ )
185
203
except RetryFailedError :
186
204
pass
187
205
except KazooException :
@@ -222,8 +240,9 @@ def _inner_acquire(self, blocking, timeout, ephemeral=True):
222
240
self .create_tried = True
223
241
224
242
if not node :
225
- node = self .client .create (self .create_path , self .data ,
226
- ephemeral = ephemeral , sequence = True )
243
+ node = self .client .create (
244
+ self .create_path , self .data , ephemeral = ephemeral , sequence = True
245
+ )
227
246
# strip off path to node
228
247
node = node [len (self .path ) + 1 :]
229
248
@@ -263,14 +282,16 @@ def _inner_acquire(self, blocking, timeout, ephemeral=True):
263
282
else :
264
283
self .wake_event .wait (timeout )
265
284
if not self .wake_event .isSet ():
266
- raise LockTimeout ("Failed to acquire lock on %s after "
267
- "%s seconds" % (self .path , timeout ))
285
+ raise LockTimeout (
286
+ "Failed to acquire lock on %s after %s seconds"
287
+ % (self .path , timeout )
288
+ )
268
289
finally :
269
290
self .client .remove_listener (self ._watch_session )
270
291
271
292
def predecessor (self , children , index ):
272
293
for c in reversed (children [:index ]):
273
- if any (n in c for n in self ._EXCLUDE_NAMES ):
294
+ if any (n in c for n in self ._exclude_names ):
274
295
return c
275
296
return None
276
297
@@ -289,12 +310,13 @@ def _get_sorted_children(self):
289
310
# (eg. in case of a lease), just sort them last ('~' sorts after all
290
311
# ASCII digits).
291
312
def _seq (c ):
292
- for name in [ "__lock__" , "-lock-" , "__rlock__" ] :
313
+ for name in self . _exclude_names :
293
314
idx = c .find (name )
294
315
if idx != - 1 :
295
316
return c [idx + len (name ):]
296
317
# Sort unknown node names eg. "lease_holder" last.
297
318
return '~'
319
+
298
320
children .sort (key = _seq )
299
321
return children
300
322
@@ -391,8 +413,9 @@ class WriteLock(Lock):
391
413
shared lock.
392
414
393
415
"""
416
+
394
417
_NODE_NAME = "__lock__"
395
- _EXCLUDE_NAMES = ["__lock__" , "-lock-" , " __rlock__" ]
418
+ _EXCLUDE_NAMES = ["__lock__" , "__rlock__" ]
396
419
397
420
398
421
class ReadLock (Lock ):
@@ -420,8 +443,9 @@ class ReadLock(Lock):
420
443
shared lock.
421
444
422
445
"""
446
+
423
447
_NODE_NAME = "__rlock__"
424
- _EXCLUDE_NAMES = ["__lock__" , "-lock-" ]
448
+ _EXCLUDE_NAMES = ["__lock__" ]
425
449
426
450
427
451
class Semaphore (object ):
@@ -458,6 +482,7 @@ class Semaphore(object):
458
482
The max_leases check.
459
483
460
484
"""
485
+
461
486
def __init__ (self , client , path , identifier = None , max_leases = 1 ):
462
487
"""Create a Kazoo Lock
463
488
@@ -509,8 +534,8 @@ def _ensure_path(self):
509
534
else :
510
535
if leases != self .max_leases :
511
536
raise ValueError (
512
- "Inconsistent max leases: %s, expected: %s" %
513
- (leases , self .max_leases )
537
+ "Inconsistent max leases: %s, expected: %s"
538
+ % (leases , self .max_leases )
514
539
)
515
540
else :
516
541
self .client .set (self .path , str (self .max_leases ).encode ('utf-8' ))
@@ -548,7 +573,8 @@ def acquire(self, blocking=True, timeout=None):
548
573
549
574
try :
550
575
self .is_acquired = self .client .retry (
551
- self ._inner_acquire , blocking = blocking , timeout = timeout )
576
+ self ._inner_acquire , blocking = blocking , timeout = timeout
577
+ )
552
578
except KazooException :
553
579
# if we did ultimately fail, attempt to clean up
554
580
self ._best_effort_cleanup ()
@@ -590,8 +616,9 @@ def _inner_acquire(self, blocking, timeout=None):
590
616
self .wake_event .wait (w .leftover ())
591
617
if not self .wake_event .isSet ():
592
618
raise LockTimeout (
593
- "Failed to acquire semaphore on %s "
594
- "after %s seconds" % (self .path , timeout ))
619
+ "Failed to acquire semaphore on %s"
620
+ " after %s seconds" % (self .path , timeout )
621
+ )
595
622
else :
596
623
return False
597
624
finally :
@@ -612,8 +639,9 @@ def _get_lease(self, data=None):
612
639
# Get a list of the current potential lock holders. If they change,
613
640
# notify our wake_event object. This is used to unblock a blocking
614
641
# self._inner_acquire call.
615
- children = self .client .get_children (self .path ,
616
- self ._watch_lease_change )
642
+ children = self .client .get_children (
643
+ self .path , self ._watch_lease_change
644
+ )
617
645
618
646
# If there are leases available, acquire one
619
647
if len (children ) < self .max_leases :
0 commit comments