14
14
and/or the lease has been lost.
15
15
16
16
"""
17
+ import re
17
18
import sys
18
19
19
20
try :
@@ -83,35 +84,38 @@ class Lock(object):
83
84
# sequence number. Involved in read/write locks.
84
85
_EXCLUDE_NAMES = ["__lock__" ]
85
86
86
- def __init__ (
87
- self , client , path , identifier = None , additional_lock_patterns = ()
88
- ):
87
+ def __init__ (self , client , path , identifier = None , extra_lock_patterns = ()):
89
88
"""Create a Kazoo lock.
90
89
91
90
:param client: A :class:`~kazoo.client.KazooClient` instance.
92
91
:param path: The lock path to use.
93
92
:param identifier: Name to use for this lock contender. This can be
94
93
useful for querying to see who the current lock
95
94
contenders are.
96
- :param additional_lock_patterns : Strings that will be used to
97
- identify other znode in the path
98
- that should be considered contenders
99
- for this lock.
100
- Use this for cross-implementation
101
- compatibility.
95
+ :param extra_lock_patterns : Strings that will be used to
96
+ identify other znode in the path
97
+ that should be considered contenders
98
+ for this lock.
99
+ Use this for cross-implementation
100
+ compatibility.
102
101
103
102
.. versionadded:: 2.7.1
104
- The additional_lock_patterns option.
103
+ The extra_lock_patterns option.
105
104
"""
106
105
self .client = client
107
106
self .path = path
108
107
self ._exclude_names = set (
109
- self ._EXCLUDE_NAMES + list (additional_lock_patterns )
108
+ self ._EXCLUDE_NAMES + list (extra_lock_patterns )
109
+ )
110
+ self ._contenders_re = re .compile (
111
+ r"(?:{patterns})(-?\d{{10}})$" .format (
112
+ patterns = "|" .join (self ._exclude_names )
113
+ )
110
114
)
111
115
112
116
# some data is written to the node. this can be queried via
113
117
# contenders() to see who is contending for the lock
114
- self .data = str (identifier or "" ).encode (' utf-8' )
118
+ self .data = str (identifier or "" ).encode (" utf-8" )
115
119
self .node = None
116
120
117
121
self .wake_event = client .handler .event_object ()
@@ -186,6 +190,7 @@ def _acquire_lock():
186
190
return False
187
191
if not locked :
188
192
# Lock acquire doesn't take a timeout, so simulate it...
193
+ # XXX: This is not true in Py3 >= 3.2
189
194
try :
190
195
locked = retry (_acquire_lock )
191
196
except RetryFailedError :
@@ -255,18 +260,8 @@ def _inner_acquire(self, blocking, timeout, ephemeral=True):
255
260
if self .cancelled :
256
261
raise CancelledError ()
257
262
258
- children = self ._get_sorted_children ()
259
-
260
- try :
261
- our_index = children .index (node )
262
- except ValueError : # pragma: nocover
263
- # somehow we aren't in the children -- probably we are
264
- # recovering from a session failure and our ephemeral
265
- # node was removed
266
- raise ForceRetryError ()
267
-
268
- predecessor = self .predecessor (children , our_index )
269
- if not predecessor :
263
+ predecessor = self ._get_predecessor (node )
264
+ if predecessor is None :
270
265
return True
271
266
272
267
if not blocking :
@@ -289,36 +284,44 @@ def _inner_acquire(self, blocking, timeout, ephemeral=True):
289
284
finally :
290
285
self .client .remove_listener (self ._watch_session )
291
286
292
- def predecessor (self , children , index ):
293
- for c in reversed (children [:index ]):
294
- if any (n in c for n in self ._exclude_names ):
295
- return c
296
- return None
297
-
298
287
def _watch_predecessor (self , event ):
299
288
self .wake_event .set ()
300
289
301
- def _get_sorted_children (self ):
290
+ def _get_predecessor (self , node ):
291
+ """returns `node`'s predecessor or None
292
+
293
+ Note: This handle the case where the current lock is not a contender
294
+ (e.g. rlock), this and also edge cases where the lock's ephemeral node
295
+ is gone.
296
+ """
302
297
children = self .client .get_children (self .path )
298
+ found_self = False
299
+ # Filter out the contenders using the computed regex
300
+ contender_matches = []
301
+ for child in children :
302
+ match = self ._contenders_re .search (child )
303
+ if match is not None :
304
+ contender_matches .append (match )
305
+ if child == node :
306
+ # Remember the node's match object so we can short circuit
307
+ # below.
308
+ found_self = match
309
+
310
+ if found_self is False : # pragma: nocover
311
+ # somehow we aren't in the childrens -- probably we are
312
+ # recovering from a session failure and our ephemeral
313
+ # node was removed.
314
+ raise ForceRetryError ()
315
+
316
+ predecessor = None
317
+ # Sort the contenders using the sequence number extracted by the regex,
318
+ # then extract the original string.
319
+ for match in sorted (contender_matches , key = lambda m : m .groups ()):
320
+ if match is found_self :
321
+ break
322
+ predecessor = match .string
303
323
304
- # Node names are prefixed by a type: strip the prefix first, which may
305
- # be one of multiple values in case of a read-write lock, and return
306
- # only the sequence number (as a string since it is padded and will
307
- # sort correctly anyway).
308
- #
309
- # In some cases, the lock path may contain nodes with other prefixes
310
- # (eg. in case of a lease), just sort them last ('~' sorts after all
311
- # ASCII digits).
312
- def _seq (c ):
313
- for name in self ._exclude_names :
314
- idx = c .find (name )
315
- if idx != - 1 :
316
- return c [idx + len (name ):]
317
- # Sort unknown node names eg. "lease_holder" last.
318
- return '~'
319
-
320
- children .sort (key = _seq )
321
- return children
324
+ return predecessor
322
325
323
326
def _find_node (self ):
324
327
children = self .client .get_children (self .path )
@@ -369,16 +372,37 @@ def contenders(self):
369
372
if not self .assured_path :
370
373
self ._ensure_path ()
371
374
372
- children = self ._get_sorted_children ()
373
-
374
- contenders = []
375
+ children = self .client .get_children (self .path )
376
+ # We want all contenders, including self (this is especially important
377
+ # for r/w locks). This is similar to the logic of `_get_predecessor`
378
+ # except we include our own pattern.
379
+ all_contenders_re = re .compile (
380
+ r"(?:{patterns})(-?\d{{10}})$" .format (
381
+ patterns = "|" .join (self ._exclude_names | {self ._NODE_NAME })
382
+ )
383
+ )
384
+ # Filter out the contenders using the computed regex
385
+ contender_matches = []
375
386
for child in children :
387
+ match = all_contenders_re .search (child )
388
+ if match is not None :
389
+ contender_matches .append (match )
390
+ # Sort the contenders using the sequence number extracted by the regex,
391
+ # then extract the original string.
392
+ contender_nodes = [
393
+ match .string
394
+ for match in sorted (contender_matches , key = lambda m : m .groups ())
395
+ ]
396
+ # Retrieve all the contender nodes data (preserving order).
397
+ contenders = []
398
+ for node in contender_nodes :
376
399
try :
377
- data , stat = self .client .get (self .path + "/" + child )
400
+ data , stat = self .client .get (self .path + "/" + node )
378
401
if data is not None :
379
- contenders .append (data .decode (' utf-8' ))
402
+ contenders .append (data .decode (" utf-8" ))
380
403
except NoNodeError : # pragma: nocover
381
404
pass
405
+
382
406
return contenders
383
407
384
408
def __enter__ (self ):
@@ -508,12 +532,12 @@ def __init__(self, client, path, identifier=None, max_leases=1):
508
532
509
533
# some data is written to the node. this can be queried via
510
534
# contenders() to see who is contending for the lock
511
- self .data = str (identifier or "" ).encode (' utf-8' )
535
+ self .data = str (identifier or "" ).encode (" utf-8" )
512
536
self .max_leases = max_leases
513
537
self .wake_event = client .handler .event_object ()
514
538
515
539
self .create_path = self .path + "/" + uuid .uuid4 ().hex
516
- self .lock_path = path + '-' + ' __lock__'
540
+ self .lock_path = path + "-" + " __lock__"
517
541
self .is_acquired = False
518
542
self .assured_path = False
519
543
self .cancelled = False
@@ -526,7 +550,7 @@ def _ensure_path(self):
526
550
# node did already exist
527
551
data , _ = self .client .get (self .path )
528
552
try :
529
- leases = int (data .decode (' utf-8' ))
553
+ leases = int (data .decode (" utf-8" ))
530
554
except (ValueError , TypeError ):
531
555
# ignore non-numeric data, maybe the node data is used
532
556
# for other purposes
@@ -538,7 +562,7 @@ def _ensure_path(self):
538
562
% (leases , self .max_leases )
539
563
)
540
564
else :
541
- self .client .set (self .path , str (self .max_leases ).encode (' utf-8' ))
565
+ self .client .set (self .path , str (self .max_leases ).encode (" utf-8" ))
542
566
543
567
def cancel (self ):
544
568
"""Cancel a pending semaphore acquire."""
@@ -702,7 +726,7 @@ def lease_holders(self):
702
726
for child in children :
703
727
try :
704
728
data , stat = self .client .get (self .path + "/" + child )
705
- lease_holders .append (data .decode (' utf-8' ))
729
+ lease_holders .append (data .decode (" utf-8" ))
706
730
except NoNodeError : # pragma: nocover
707
731
pass
708
732
return lease_holders
0 commit comments