15
15
16
16
"""
17
17
import re
18
+ import uuid
18
19
import sys
19
20
20
21
try :
21
22
from time import monotonic as now
22
23
except ImportError :
23
24
from time import time as now
24
- import uuid
25
25
26
26
import six
27
+ from typing import (
28
+ TYPE_CHECKING ,
29
+ Any ,
30
+ ContextManager ,
31
+ List ,
32
+ Optional ,
33
+ Sequence ,
34
+ Type ,
35
+ Union ,
36
+ cast ,
37
+ )
27
38
28
39
from kazoo .exceptions import (
29
40
CancelledError ,
38
49
RetryFailedError ,
39
50
)
40
51
52
+ if TYPE_CHECKING :
53
+ from kazoo .client import KazooClient
54
+ from typing_extensions import Literal
55
+
41
56
42
57
class _Watch (object ):
43
- def __init__ (self , duration = None ):
58
+ def __init__ (self , duration : Optional [ float ] = None ):
44
59
self .duration = duration
45
- self .started_at = None
60
+ self .started_at : Optional [ float ] = None
46
61
47
- def start (self ):
62
+ def start (self ) -> None :
48
63
self .started_at = now ()
49
64
50
- def leftover (self ):
65
+ def leftover (self ) -> Optional [ float ] :
51
66
if self .duration is None :
52
67
return None
53
68
else :
54
- elapsed = now () - self .started_at
69
+ elapsed = now () - cast ( float , self .started_at )
55
70
return max (0 , self .duration - elapsed )
56
71
57
72
58
- class Lock (object ):
73
+ class Lock (ContextManager [ None ] ):
59
74
"""Kazoo Lock
60
75
61
76
Example usage with a :class:`~kazoo.client.KazooClient` instance:
@@ -84,7 +99,13 @@ class Lock(object):
84
99
# sequence number. Involved in read/write locks.
85
100
_EXCLUDE_NAMES = ["__lock__" ]
86
101
87
- def __init__ (self , client , path , identifier = None , extra_lock_patterns = ()):
102
+ def __init__ (
103
+ self ,
104
+ client : KazooClient ,
105
+ path : str ,
106
+ identifier : Optional [str ] = None ,
107
+ extra_lock_patterns : Sequence [str ] = (),
108
+ ):
88
109
"""Create a Kazoo lock.
89
110
90
111
:param client: A :class:`~kazoo.client.KazooClient` instance.
@@ -116,7 +137,7 @@ def __init__(self, client, path, identifier=None, extra_lock_patterns=()):
116
137
# some data is written to the node. this can be queried via
117
138
# contenders() to see who is contending for the lock
118
139
self .data = str (identifier or "" ).encode ("utf-8" )
119
- self .node = None
140
+ self .node : Optional [ str ] = None
120
141
121
142
self .wake_event = client .handler .event_object ()
122
143
@@ -132,20 +153,25 @@ def __init__(self, client, path, identifier=None, extra_lock_patterns=()):
132
153
self .assured_path = False
133
154
self .cancelled = False
134
155
self ._retry = KazooRetry (
135
- max_tries = None , sleep_func = client .handler .sleep_func
156
+ max_tries = - 1 , sleep_func = client .handler .sleep_func
136
157
)
137
158
self ._acquire_method_lock = client .handler .lock_object ()
138
159
139
- def _ensure_path (self ):
160
+ def _ensure_path (self ) -> None :
140
161
self .client .ensure_path (self .path )
141
162
self .assured_path = True
142
163
143
- def cancel (self ):
164
+ def cancel (self ) -> None :
144
165
"""Cancel a pending lock acquire."""
145
166
self .cancelled = True
146
167
self .wake_event .set ()
147
168
148
- def acquire (self , blocking = True , timeout = None , ephemeral = True ):
169
+ def acquire (
170
+ self ,
171
+ blocking : bool = True ,
172
+ timeout : Optional [float ] = None ,
173
+ ephemeral : bool = True ,
174
+ ) -> bool :
149
175
"""
150
176
Acquire the lock. By defaults blocks and waits forever.
151
177
@@ -212,11 +238,13 @@ def acquire(self, blocking=True, timeout=None, ephemeral=True):
212
238
finally :
213
239
self ._acquire_method_lock .release ()
214
240
215
- def _watch_session (self , state ) :
241
+ def _watch_session (self , state : KazooState ) -> bool :
216
242
self .wake_event .set ()
217
243
return True
218
244
219
- def _inner_acquire (self , blocking , timeout , ephemeral = True ):
245
+ def _inner_acquire (
246
+ self , blocking : bool , timeout : Optional [float ], ephemeral : bool = True
247
+ ) -> bool :
220
248
221
249
# wait until it's our chance to get it..
222
250
if self .is_acquired :
@@ -274,10 +302,10 @@ def _inner_acquire(self, blocking, timeout, ephemeral=True):
274
302
finally :
275
303
self .client .remove_listener (self ._watch_session )
276
304
277
- def _watch_predecessor (self , event ) :
305
+ def _watch_predecessor (self , event : WatchedEvent ) -> None :
278
306
self .wake_event .set ()
279
307
280
- def _get_predecessor (self , node ) :
308
+ def _get_predecessor (self , node : str ) -> Optional [ str ] :
281
309
"""returns `node`'s predecessor or None
282
310
283
311
Note: This handle the case where the current lock is not a contender
@@ -286,7 +314,7 @@ def _get_predecessor(self, node):
286
314
"""
287
315
node_sequence = node [len (self .prefix ) :]
288
316
children = self .client .get_children (self .path )
289
- found_self = False
317
+ found_self : Union [ Literal [ False ], re . Match [ bytes ]] = False
290
318
# Filter out the contenders using the computed regex
291
319
contender_matches = []
292
320
for child in children :
@@ -301,7 +329,7 @@ def _get_predecessor(self, node):
301
329
if child == node :
302
330
# Remember the node's match object so we can short circuit
303
331
# below.
304
- found_self = match
332
+ found_self = cast ( re . Match [ bytes ], match )
305
333
306
334
if found_self is False : # pragma: nocover
307
335
# somehow we aren't in the childrens -- probably we are
@@ -317,42 +345,42 @@ def _get_predecessor(self, node):
317
345
sorted_matches = sorted (contender_matches , key = lambda m : m .groups ())
318
346
return sorted_matches [- 1 ].string
319
347
320
- def _find_node (self ):
348
+ def _find_node (self ) -> Optional [ str ] :
321
349
children = self .client .get_children (self .path )
322
350
for child in children :
323
351
if child .startswith (self .prefix ):
324
352
return child
325
353
return None
326
354
327
- def _delete_node (self , node ) :
355
+ def _delete_node (self , node : str ) -> None :
328
356
self .client .delete (self .path + "/" + node )
329
357
330
- def _best_effort_cleanup (self ):
358
+ def _best_effort_cleanup (self ) -> None :
331
359
try :
332
360
node = self .node or self ._find_node ()
333
361
if node :
334
362
self ._delete_node (node )
335
363
except KazooException : # pragma: nocover
336
364
pass
337
365
338
- def release (self ):
366
+ def release (self ) -> bool :
339
367
"""Release the lock immediately."""
340
368
return self .client .retry (self ._inner_release )
341
369
342
- def _inner_release (self ):
370
+ def _inner_release (self ) -> bool :
343
371
if not self .is_acquired :
344
372
return False
345
373
346
374
try :
347
- self ._delete_node (self .node )
375
+ self ._delete_node (cast ( str , self .node ) )
348
376
except NoNodeError : # pragma: nocover
349
377
pass
350
378
351
379
self .is_acquired = False
352
380
self .node = None
353
381
return True
354
382
355
- def contenders (self ):
383
+ def contenders (self ) -> List [ str ] :
356
384
"""Return an ordered list of the current contenders for the
357
385
lock.
358
386
@@ -388,7 +416,7 @@ def contenders(self):
388
416
for match in sorted (contender_matches , key = lambda m : m .groups ())
389
417
]
390
418
# Retrieve all the contender nodes data (preserving order).
391
- contenders = []
419
+ contenders : List [ str ] = []
392
420
for node in contender_nodes :
393
421
try :
394
422
data , stat = self .client .get (self .path + "/" + node )
@@ -399,10 +427,15 @@ def contenders(self):
399
427
400
428
return contenders
401
429
402
- def __enter__ (self ):
430
+ def __enter__ (self ) -> None :
403
431
self .acquire ()
404
432
405
- def __exit__ (self , exc_type , exc_value , traceback ):
433
+ def __exit__ (
434
+ self ,
435
+ exc_type : Optional [Type [BaseException ]],
436
+ exc_value : Optional [BaseException ],
437
+ traceback : Any ,
438
+ ) -> None :
406
439
self .release ()
407
440
408
441
@@ -466,7 +499,7 @@ class ReadLock(Lock):
466
499
_EXCLUDE_NAMES = ["__lock__" ]
467
500
468
501
469
- class Semaphore (object ):
502
+ class Semaphore (ContextManager [ None ] ):
470
503
"""A Zookeeper-based Semaphore
471
504
472
505
This synchronization primitive operates in the same manner as the
@@ -501,7 +534,13 @@ class Semaphore(object):
501
534
502
535
"""
503
536
504
- def __init__ (self , client , path , identifier = None , max_leases = 1 ):
537
+ def __init__ (
538
+ self ,
539
+ client : KazooClient ,
540
+ path : str ,
541
+ identifier : Optional [str ] = None ,
542
+ max_leases : int = 1 ,
543
+ ):
505
544
"""Create a Kazoo Lock
506
545
507
546
:param client: A :class:`~kazoo.client.KazooClient` instance.
@@ -537,7 +576,7 @@ def __init__(self, client, path, identifier=None, max_leases=1):
537
576
self .cancelled = False
538
577
self ._session_expired = False
539
578
540
- def _ensure_path (self ):
579
+ def _ensure_path (self ) -> None :
541
580
result = self .client .ensure_path (self .path )
542
581
self .assured_path = True
543
582
if result is True :
@@ -558,12 +597,14 @@ def _ensure_path(self):
558
597
else :
559
598
self .client .set (self .path , str (self .max_leases ).encode ("utf-8" ))
560
599
561
- def cancel (self ):
600
+ def cancel (self ) -> None :
562
601
"""Cancel a pending semaphore acquire."""
563
602
self .cancelled = True
564
603
self .wake_event .set ()
565
604
566
- def acquire (self , blocking = True , timeout = None ):
605
+ def acquire (
606
+ self , blocking : bool = True , timeout : Optional [float ] = None
607
+ ) -> bool :
567
608
"""Acquire the semaphore. By defaults blocks and waits forever.
568
609
569
610
:param blocking: Block until semaphore is obtained or
@@ -601,7 +642,9 @@ def acquire(self, blocking=True, timeout=None):
601
642
602
643
return self .is_acquired
603
644
604
- def _inner_acquire (self , blocking , timeout = None ):
645
+ def _inner_acquire (
646
+ self , blocking : bool , timeout : Optional [float ] = None
647
+ ) -> bool :
605
648
"""Inner loop that runs from the top anytime a command hits a
606
649
retryable Zookeeper exception."""
607
650
self ._session_expired = False
@@ -642,10 +685,10 @@ def _inner_acquire(self, blocking, timeout=None):
642
685
finally :
643
686
lock .release ()
644
687
645
- def _watch_lease_change (self , event ) :
688
+ def _watch_lease_change (self , event : WatchedEvent ) -> None :
646
689
self .wake_event .set ()
647
690
648
- def _get_lease (self , data = None ) :
691
+ def _get_lease (self ) -> bool :
649
692
# Make sure the session is still valid
650
693
if self ._session_expired :
651
694
raise ForceRetryError ("Retry on session loss at top" )
@@ -674,25 +717,26 @@ def _get_lease(self, data=None):
674
717
# Return current state
675
718
return self .is_acquired
676
719
677
- def _watch_session (self , state ) :
720
+ def _watch_session (self , state : KazooState ) -> Optional [ Literal [ True ]] :
678
721
if state == KazooState .LOST :
679
722
self ._session_expired = True
680
723
self .wake_event .set ()
681
724
682
725
# Return true to de-register
683
726
return True
727
+ return None
684
728
685
- def _best_effort_cleanup (self ):
729
+ def _best_effort_cleanup (self ) -> None :
686
730
try :
687
731
self .client .delete (self .create_path )
688
732
except KazooException : # pragma: nocover
689
733
pass
690
734
691
- def release (self ):
735
+ def release (self ) -> bool :
692
736
"""Release the lease immediately."""
693
737
return self .client .retry (self ._inner_release )
694
738
695
- def _inner_release (self ):
739
+ def _inner_release (self ) -> bool :
696
740
if not self .is_acquired :
697
741
return False
698
742
try :
@@ -702,7 +746,7 @@ def _inner_release(self):
702
746
self .is_acquired = False
703
747
return True
704
748
705
- def lease_holders (self ):
749
+ def lease_holders (self ) -> List [ str ] :
706
750
"""Return an unordered list of the current lease holders.
707
751
708
752
.. note::
@@ -716,7 +760,7 @@ def lease_holders(self):
716
760
717
761
children = self .client .get_children (self .path )
718
762
719
- lease_holders = []
763
+ lease_holders : List [ str ] = []
720
764
for child in children :
721
765
try :
722
766
data , stat = self .client .get (self .path + "/" + child )
@@ -725,8 +769,13 @@ def lease_holders(self):
725
769
pass
726
770
return lease_holders
727
771
728
- def __enter__ (self ):
772
+ def __enter__ (self ) -> None :
729
773
self .acquire ()
730
774
731
- def __exit__ (self , exc_type , exc_value , traceback ):
775
+ def __exit__ (
776
+ self ,
777
+ exc_type : Optional [Type [BaseException ]],
778
+ exc_value : Optional [BaseException ],
779
+ traceback : Any ,
780
+ ) -> None :
732
781
self .release ()
0 commit comments