15
15
16
16
"""
17
17
import re
18
- from time import monotonic as now
19
18
import uuid
19
+ from time import monotonic as now
20
+ from typing import TYPE_CHECKING , Any , ContextManager , List , Optional , Sequence , Type , Union , cast
21
+
22
+ from kazoo .exceptions import (CancelledError , KazooException , LockTimeout ,
23
+ NoNodeError )
24
+ from kazoo .protocol .states import KazooState , WatchedEvent
25
+ from kazoo .retry import ForceRetryError , KazooRetry , RetryFailedError
20
26
21
- from kazoo .exceptions import (
22
- CancelledError ,
23
- KazooException ,
24
- LockTimeout ,
25
- NoNodeError ,
26
- )
27
- from kazoo .protocol .states import KazooState
28
- from kazoo .retry import (
29
- ForceRetryError ,
30
- KazooRetry ,
31
- RetryFailedError ,
32
- )
27
+ if TYPE_CHECKING :
28
+ from kazoo .client import KazooClient
29
+ from typing_extensions import Literal
33
30
34
31
35
32
class _Watch (object ):
36
- def __init__ (self , duration = None ):
33
+ def __init__ (self , duration : Optional [ float ] = None ):
37
34
self .duration = duration
38
- self .started_at = None
35
+ self .started_at : Optional [ float ] = None
39
36
40
- def start (self ):
37
+ def start (self ) -> None :
41
38
self .started_at = now ()
42
39
43
- def leftover (self ):
40
+ def leftover (self ) -> Optional [ float ] :
44
41
if self .duration is None :
45
42
return None
46
43
else :
47
- elapsed = now () - self .started_at
44
+ elapsed = now () - cast ( float , self .started_at )
48
45
return max (0 , self .duration - elapsed )
49
46
50
47
51
- class Lock (object ):
48
+ class Lock (ContextManager [ None ] ):
52
49
"""Kazoo Lock
53
50
54
51
Example usage with a :class:`~kazoo.client.KazooClient` instance:
@@ -77,7 +74,13 @@ class Lock(object):
77
74
# sequence number. Involved in read/write locks.
78
75
_EXCLUDE_NAMES = ["__lock__" ]
79
76
80
- def __init__ (self , client , path , identifier = None , extra_lock_patterns = ()):
77
+ def __init__ (
78
+ self ,
79
+ client : KazooClient ,
80
+ path : str ,
81
+ identifier : Optional [str ] = None ,
82
+ extra_lock_patterns : Sequence [str ] = (),
83
+ ):
81
84
"""Create a Kazoo lock.
82
85
83
86
:param client: A :class:`~kazoo.client.KazooClient` instance.
@@ -109,7 +112,7 @@ def __init__(self, client, path, identifier=None, extra_lock_patterns=()):
109
112
# some data is written to the node. this can be queried via
110
113
# contenders() to see who is contending for the lock
111
114
self .data = str (identifier or "" ).encode ("utf-8" )
112
- self .node = None
115
+ self .node : Optional [ str ] = None
113
116
114
117
self .wake_event = client .handler .event_object ()
115
118
@@ -125,20 +128,25 @@ def __init__(self, client, path, identifier=None, extra_lock_patterns=()):
125
128
self .assured_path = False
126
129
self .cancelled = False
127
130
self ._retry = KazooRetry (
128
- max_tries = None , sleep_func = client .handler .sleep_func
131
+ max_tries = - 1 , sleep_func = client .handler .sleep_func
129
132
)
130
133
self ._acquire_method_lock = client .handler .lock_object ()
131
134
132
- def _ensure_path (self ):
135
+ def _ensure_path (self ) -> None :
133
136
self .client .ensure_path (self .path )
134
137
self .assured_path = True
135
138
136
- def cancel (self ):
139
+ def cancel (self ) -> None :
137
140
"""Cancel a pending lock acquire."""
138
141
self .cancelled = True
139
142
self .wake_event .set ()
140
143
141
- def acquire (self , blocking = True , timeout = None , ephemeral = True ):
144
+ def acquire (
145
+ self ,
146
+ blocking : bool = True ,
147
+ timeout : Optional [float ] = None ,
148
+ ephemeral : bool = True ,
149
+ ) -> bool :
142
150
"""
143
151
Acquire the lock. By defaults blocks and waits forever.
144
152
@@ -204,11 +212,13 @@ def acquire(self, blocking=True, timeout=None, ephemeral=True):
204
212
finally :
205
213
self ._acquire_method_lock .release ()
206
214
207
- def _watch_session (self , state ) :
215
+ def _watch_session (self , state : KazooState ) -> bool :
208
216
self .wake_event .set ()
209
217
return True
210
218
211
- def _inner_acquire (self , blocking , timeout , ephemeral = True ):
219
+ def _inner_acquire (
220
+ self , blocking : bool , timeout : Optional [float ], ephemeral : bool = True
221
+ ) -> bool :
212
222
213
223
# wait until it's our chance to get it..
214
224
if self .is_acquired :
@@ -266,10 +276,10 @@ def _inner_acquire(self, blocking, timeout, ephemeral=True):
266
276
finally :
267
277
self .client .remove_listener (self ._watch_session )
268
278
269
- def _watch_predecessor (self , event ) :
279
+ def _watch_predecessor (self , event : WatchedEvent ) -> None :
270
280
self .wake_event .set ()
271
281
272
- def _get_predecessor (self , node ) :
282
+ def _get_predecessor (self , node : str ) -> Optional [ str ] :
273
283
"""returns `node`'s predecessor or None
274
284
275
285
Note: This handle the case where the current lock is not a contender
@@ -278,7 +288,7 @@ def _get_predecessor(self, node):
278
288
"""
279
289
node_sequence = node [len (self .prefix ) :]
280
290
children = self .client .get_children (self .path )
281
- found_self = False
291
+ found_self : Union [ Literal [ False ], re . Match [ bytes ]] = False
282
292
# Filter out the contenders using the computed regex
283
293
contender_matches = []
284
294
for child in children :
@@ -293,7 +303,7 @@ def _get_predecessor(self, node):
293
303
if child == node :
294
304
# Remember the node's match object so we can short circuit
295
305
# below.
296
- found_self = match
306
+ found_self = cast ( re . Match [ bytes ], match )
297
307
298
308
if found_self is False : # pragma: nocover
299
309
# somehow we aren't in the childrens -- probably we are
@@ -309,42 +319,42 @@ def _get_predecessor(self, node):
309
319
sorted_matches = sorted (contender_matches , key = lambda m : m .groups ())
310
320
return sorted_matches [- 1 ].string
311
321
312
- def _find_node (self ):
322
+ def _find_node (self ) -> Optional [ str ] :
313
323
children = self .client .get_children (self .path )
314
324
for child in children :
315
325
if child .startswith (self .prefix ):
316
326
return child
317
327
return None
318
328
319
- def _delete_node (self , node ) :
329
+ def _delete_node (self , node : str ) -> None :
320
330
self .client .delete (self .path + "/" + node )
321
331
322
- def _best_effort_cleanup (self ):
332
+ def _best_effort_cleanup (self ) -> None :
323
333
try :
324
334
node = self .node or self ._find_node ()
325
335
if node :
326
336
self ._delete_node (node )
327
337
except KazooException : # pragma: nocover
328
338
pass
329
339
330
- def release (self ):
340
+ def release (self ) -> bool :
331
341
"""Release the lock immediately."""
332
342
return self .client .retry (self ._inner_release )
333
343
334
- def _inner_release (self ):
344
+ def _inner_release (self ) -> bool :
335
345
if not self .is_acquired :
336
346
return False
337
347
338
348
try :
339
- self ._delete_node (self .node )
349
+ self ._delete_node (cast ( str , self .node ) )
340
350
except NoNodeError : # pragma: nocover
341
351
pass
342
352
343
353
self .is_acquired = False
344
354
self .node = None
345
355
return True
346
356
347
- def contenders (self ):
357
+ def contenders (self ) -> List [ str ] :
348
358
"""Return an ordered list of the current contenders for the
349
359
lock.
350
360
@@ -380,7 +390,7 @@ def contenders(self):
380
390
for match in sorted (contender_matches , key = lambda m : m .groups ())
381
391
]
382
392
# Retrieve all the contender nodes data (preserving order).
383
- contenders = []
393
+ contenders : List [ str ] = []
384
394
for node in contender_nodes :
385
395
try :
386
396
data , stat = self .client .get (self .path + "/" + node )
@@ -391,10 +401,10 @@ def contenders(self):
391
401
392
402
return contenders
393
403
394
- def __enter__ (self ):
404
+ def __enter__ (self ) -> None :
395
405
self .acquire ()
396
406
397
- def __exit__ (self , exc_type , exc_value , traceback ) :
407
+ def __exit__ (self , exc_type : Optional [ Type [ BaseException ]] , exc_value : Optional [ BaseException ] , traceback : Any ) -> None :
398
408
self .release ()
399
409
400
410
@@ -458,7 +468,7 @@ class ReadLock(Lock):
458
468
_EXCLUDE_NAMES = ["__lock__" ]
459
469
460
470
461
- class Semaphore (object ):
471
+ class Semaphore (ContextManager [ None ] ):
462
472
"""A Zookeeper-based Semaphore
463
473
464
474
This synchronization primitive operates in the same manner as the
@@ -493,7 +503,13 @@ class Semaphore(object):
493
503
494
504
"""
495
505
496
- def __init__ (self , client , path , identifier = None , max_leases = 1 ):
506
+ def __init__ (
507
+ self ,
508
+ client : KazooClient ,
509
+ path : str ,
510
+ identifier : Optional [str ] = None ,
511
+ max_leases : int = 1 ,
512
+ ):
497
513
"""Create a Kazoo Lock
498
514
499
515
:param client: A :class:`~kazoo.client.KazooClient` instance.
@@ -529,7 +545,7 @@ def __init__(self, client, path, identifier=None, max_leases=1):
529
545
self .cancelled = False
530
546
self ._session_expired = False
531
547
532
- def _ensure_path (self ):
548
+ def _ensure_path (self ) -> None :
533
549
result = self .client .ensure_path (self .path )
534
550
self .assured_path = True
535
551
if result is True :
@@ -550,12 +566,14 @@ def _ensure_path(self):
550
566
else :
551
567
self .client .set (self .path , str (self .max_leases ).encode ("utf-8" ))
552
568
553
- def cancel (self ):
569
+ def cancel (self ) -> None :
554
570
"""Cancel a pending semaphore acquire."""
555
571
self .cancelled = True
556
572
self .wake_event .set ()
557
573
558
- def acquire (self , blocking = True , timeout = None ):
574
+ def acquire (
575
+ self , blocking : bool = True , timeout : Optional [float ] = None
576
+ ) -> bool :
559
577
"""Acquire the semaphore. By defaults blocks and waits forever.
560
578
561
579
:param blocking: Block until semaphore is obtained or
@@ -593,7 +611,7 @@ def acquire(self, blocking=True, timeout=None):
593
611
594
612
return self .is_acquired
595
613
596
- def _inner_acquire (self , blocking , timeout = None ):
614
+ def _inner_acquire (self , blocking : bool , timeout : Optional [ float ] = None ) -> bool :
597
615
"""Inner loop that runs from the top anytime a command hits a
598
616
retryable Zookeeper exception."""
599
617
self ._session_expired = False
@@ -634,10 +652,10 @@ def _inner_acquire(self, blocking, timeout=None):
634
652
finally :
635
653
lock .release ()
636
654
637
- def _watch_lease_change (self , event ) :
655
+ def _watch_lease_change (self , event : WatchedEvent ) -> None :
638
656
self .wake_event .set ()
639
657
640
- def _get_lease (self , data = None ) :
658
+ def _get_lease (self ) -> bool :
641
659
# Make sure the session is still valid
642
660
if self ._session_expired :
643
661
raise ForceRetryError ("Retry on session loss at top" )
@@ -666,25 +684,26 @@ def _get_lease(self, data=None):
666
684
# Return current state
667
685
return self .is_acquired
668
686
669
- def _watch_session (self , state ) :
687
+ def _watch_session (self , state : KazooState ) -> Optional [ Literal [ True ]] :
670
688
if state == KazooState .LOST :
671
689
self ._session_expired = True
672
690
self .wake_event .set ()
673
691
674
692
# Return true to de-register
675
693
return True
694
+ return None
676
695
677
- def _best_effort_cleanup (self ):
696
+ def _best_effort_cleanup (self ) -> None :
678
697
try :
679
698
self .client .delete (self .create_path )
680
699
except KazooException : # pragma: nocover
681
700
pass
682
701
683
- def release (self ):
702
+ def release (self ) -> bool :
684
703
"""Release the lease immediately."""
685
704
return self .client .retry (self ._inner_release )
686
705
687
- def _inner_release (self ):
706
+ def _inner_release (self ) -> bool :
688
707
if not self .is_acquired :
689
708
return False
690
709
try :
@@ -694,7 +713,7 @@ def _inner_release(self):
694
713
self .is_acquired = False
695
714
return True
696
715
697
- def lease_holders (self ):
716
+ def lease_holders (self ) -> List [ str ] :
698
717
"""Return an unordered list of the current lease holders.
699
718
700
719
.. note::
@@ -708,7 +727,7 @@ def lease_holders(self):
708
727
709
728
children = self .client .get_children (self .path )
710
729
711
- lease_holders = []
730
+ lease_holders : List [ str ] = []
712
731
for child in children :
713
732
try :
714
733
data , stat = self .client .get (self .path + "/" + child )
@@ -717,8 +736,8 @@ def lease_holders(self):
717
736
pass
718
737
return lease_holders
719
738
720
- def __enter__ (self ):
739
+ def __enter__ (self ) -> None :
721
740
self .acquire ()
722
741
723
- def __exit__ (self , exc_type , exc_value , traceback ) :
742
+ def __exit__ (self , exc_type : Optional [ Type [ BaseException ]] , exc_value : Optional [ BaseException ] , traceback : Any ) -> None :
724
743
self .release ()
0 commit comments