14
14
and/or the lease has been lost.
15
15
16
16
"""
17
+ from __future__ import annotations
18
+
17
19
import re
18
20
import sys
21
+ from typing import TYPE_CHECKING , cast , ContextManager
22
+ import uuid
19
23
20
24
try :
21
25
from time import monotonic as now
22
26
except ImportError :
23
27
from time import time as now
24
- import uuid
25
28
26
29
import six
27
30
38
41
RetryFailedError ,
39
42
)
40
43
44
+ if TYPE_CHECKING :
45
+ from kazoo .client import KazooClient
46
+ from kazoo .protocol .states import WatchedEvent
47
+ from typing import (
48
+ Any ,
49
+ List ,
50
+ Optional ,
51
+ Sequence ,
52
+ Type ,
53
+ Union ,
54
+ )
55
+ from typing_extensions import Literal
56
+
41
57
42
58
class _Watch (object ):
43
- def __init__ (self , duration = None ):
59
+ def __init__ (self , duration : Optional [ float ] = None ):
44
60
self .duration = duration
45
- self .started_at = None
61
+ self .started_at : Optional [ float ] = None
46
62
47
- def start (self ):
63
+ def start (self ) -> None :
48
64
self .started_at = now ()
49
65
50
- def leftover (self ):
66
+ def leftover (self ) -> Optional [ float ] :
51
67
if self .duration is None :
52
68
return None
53
69
else :
54
- elapsed = now () - self .started_at
70
+ elapsed = now () - cast ( float , self .started_at )
55
71
return max (0 , self .duration - elapsed )
56
72
57
73
58
- class Lock (object ):
74
+ class Lock (ContextManager [ None ] ):
59
75
"""Kazoo Lock
60
76
61
77
Example usage with a :class:`~kazoo.client.KazooClient` instance:
@@ -84,7 +100,13 @@ class Lock(object):
84
100
# sequence number. Involved in read/write locks.
85
101
_EXCLUDE_NAMES = ["__lock__" ]
86
102
87
- def __init__ (self , client , path , identifier = None , extra_lock_patterns = ()):
103
+ def __init__ (
104
+ self ,
105
+ client : KazooClient ,
106
+ path : str ,
107
+ identifier : Optional [str ] = None ,
108
+ extra_lock_patterns : Sequence [str ] = (),
109
+ ):
88
110
"""Create a Kazoo lock.
89
111
90
112
:param client: A :class:`~kazoo.client.KazooClient` instance.
@@ -116,7 +138,7 @@ def __init__(self, client, path, identifier=None, extra_lock_patterns=()):
116
138
# some data is written to the node. this can be queried via
117
139
# contenders() to see who is contending for the lock
118
140
self .data = str (identifier or "" ).encode ("utf-8" )
119
- self .node = None
141
+ self .node : Optional [ str ] = None
120
142
121
143
self .wake_event = client .handler .event_object ()
122
144
@@ -132,20 +154,25 @@ def __init__(self, client, path, identifier=None, extra_lock_patterns=()):
132
154
self .assured_path = False
133
155
self .cancelled = False
134
156
self ._retry = KazooRetry (
135
- max_tries = None , sleep_func = client .handler .sleep_func
157
+ max_tries = - 1 , sleep_func = client .handler .sleep_func
136
158
)
137
159
self ._acquire_method_lock = client .handler .lock_object ()
138
160
139
- def _ensure_path (self ):
161
+ def _ensure_path (self ) -> None :
140
162
self .client .ensure_path (self .path )
141
163
self .assured_path = True
142
164
143
- def cancel (self ):
165
+ def cancel (self ) -> None :
144
166
"""Cancel a pending lock acquire."""
145
167
self .cancelled = True
146
168
self .wake_event .set ()
147
169
148
- def acquire (self , blocking = True , timeout = None , ephemeral = True ):
170
+ def acquire (
171
+ self ,
172
+ blocking : bool = True ,
173
+ timeout : Optional [float ] = None ,
174
+ ephemeral : bool = True ,
175
+ ) -> bool :
149
176
"""
150
177
Acquire the lock. By defaults blocks and waits forever.
151
178
@@ -212,11 +239,13 @@ def acquire(self, blocking=True, timeout=None, ephemeral=True):
212
239
finally :
213
240
self ._acquire_method_lock .release ()
214
241
215
- def _watch_session (self , state ) :
242
+ def _watch_session (self , state : KazooState ) -> bool :
216
243
self .wake_event .set ()
217
244
return True
218
245
219
- def _inner_acquire (self , blocking , timeout , ephemeral = True ):
246
+ def _inner_acquire (
247
+ self , blocking : bool , timeout : Optional [float ], ephemeral : bool = True
248
+ ) -> bool :
220
249
221
250
# wait until it's our chance to get it..
222
251
if self .is_acquired :
@@ -274,10 +303,10 @@ def _inner_acquire(self, blocking, timeout, ephemeral=True):
274
303
finally :
275
304
self .client .remove_listener (self ._watch_session )
276
305
277
- def _watch_predecessor (self , event ) :
306
+ def _watch_predecessor (self , event : WatchedEvent ) -> None :
278
307
self .wake_event .set ()
279
308
280
- def _get_predecessor (self , node ) :
309
+ def _get_predecessor (self , node : str ) -> Optional [ str ] :
281
310
"""returns `node`'s predecessor or None
282
311
283
312
Note: This handle the case where the current lock is not a contender
@@ -286,7 +315,7 @@ def _get_predecessor(self, node):
286
315
"""
287
316
node_sequence = node [len (self .prefix ) :]
288
317
children = self .client .get_children (self .path )
289
- found_self = False
318
+ found_self : Union [ Literal [ False ], re . Match [ bytes ]] = False
290
319
# Filter out the contenders using the computed regex
291
320
contender_matches = []
292
321
for child in children :
@@ -301,7 +330,7 @@ def _get_predecessor(self, node):
301
330
if child == node :
302
331
# Remember the node's match object so we can short circuit
303
332
# below.
304
- found_self = match
333
+ found_self = cast ( re . Match [ bytes ], match )
305
334
306
335
if found_self is False : # pragma: nocover
307
336
# somehow we aren't in the childrens -- probably we are
@@ -317,42 +346,42 @@ def _get_predecessor(self, node):
317
346
sorted_matches = sorted (contender_matches , key = lambda m : m .groups ())
318
347
return sorted_matches [- 1 ].string
319
348
320
- def _find_node (self ):
349
+ def _find_node (self ) -> Optional [ str ] :
321
350
children = self .client .get_children (self .path )
322
351
for child in children :
323
352
if child .startswith (self .prefix ):
324
353
return child
325
354
return None
326
355
327
- def _delete_node (self , node ) :
356
+ def _delete_node (self , node : str ) -> None :
328
357
self .client .delete (self .path + "/" + node )
329
358
330
- def _best_effort_cleanup (self ):
359
+ def _best_effort_cleanup (self ) -> None :
331
360
try :
332
361
node = self .node or self ._find_node ()
333
362
if node :
334
363
self ._delete_node (node )
335
364
except KazooException : # pragma: nocover
336
365
pass
337
366
338
- def release (self ):
367
+ def release (self ) -> bool :
339
368
"""Release the lock immediately."""
340
369
return self .client .retry (self ._inner_release )
341
370
342
- def _inner_release (self ):
371
+ def _inner_release (self ) -> bool :
343
372
if not self .is_acquired :
344
373
return False
345
374
346
375
try :
347
- self ._delete_node (self .node )
376
+ self ._delete_node (cast ( str , self .node ) )
348
377
except NoNodeError : # pragma: nocover
349
378
pass
350
379
351
380
self .is_acquired = False
352
381
self .node = None
353
382
return True
354
383
355
- def contenders (self ):
384
+ def contenders (self ) -> List [ str ] :
356
385
"""Return an ordered list of the current contenders for the
357
386
lock.
358
387
@@ -388,7 +417,7 @@ def contenders(self):
388
417
for match in sorted (contender_matches , key = lambda m : m .groups ())
389
418
]
390
419
# Retrieve all the contender nodes data (preserving order).
391
- contenders = []
420
+ contenders : List [ str ] = []
392
421
for node in contender_nodes :
393
422
try :
394
423
data , stat = self .client .get (self .path + "/" + node )
@@ -399,10 +428,15 @@ def contenders(self):
399
428
400
429
return contenders
401
430
402
- def __enter__ (self ):
431
+ def __enter__ (self ) -> None :
403
432
self .acquire ()
404
433
405
- def __exit__ (self , exc_type , exc_value , traceback ):
434
+ def __exit__ (
435
+ self ,
436
+ exc_type : Optional [Type [BaseException ]],
437
+ exc_value : Optional [BaseException ],
438
+ traceback : Any ,
439
+ ) -> None :
406
440
self .release ()
407
441
408
442
@@ -466,7 +500,7 @@ class ReadLock(Lock):
466
500
_EXCLUDE_NAMES = ["__lock__" ]
467
501
468
502
469
- class Semaphore (object ):
503
+ class Semaphore (ContextManager [ None ] ):
470
504
"""A Zookeeper-based Semaphore
471
505
472
506
This synchronization primitive operates in the same manner as the
@@ -501,7 +535,13 @@ class Semaphore(object):
501
535
502
536
"""
503
537
504
- def __init__ (self , client , path , identifier = None , max_leases = 1 ):
538
+ def __init__ (
539
+ self ,
540
+ client : KazooClient ,
541
+ path : str ,
542
+ identifier : Optional [str ] = None ,
543
+ max_leases : int = 1 ,
544
+ ):
505
545
"""Create a Kazoo Lock
506
546
507
547
:param client: A :class:`~kazoo.client.KazooClient` instance.
@@ -537,7 +577,7 @@ def __init__(self, client, path, identifier=None, max_leases=1):
537
577
self .cancelled = False
538
578
self ._session_expired = False
539
579
540
- def _ensure_path (self ):
580
+ def _ensure_path (self ) -> None :
541
581
result = self .client .ensure_path (self .path )
542
582
self .assured_path = True
543
583
if result is True :
@@ -558,12 +598,14 @@ def _ensure_path(self):
558
598
else :
559
599
self .client .set (self .path , str (self .max_leases ).encode ("utf-8" ))
560
600
561
- def cancel (self ):
601
+ def cancel (self ) -> None :
562
602
"""Cancel a pending semaphore acquire."""
563
603
self .cancelled = True
564
604
self .wake_event .set ()
565
605
566
- def acquire (self , blocking = True , timeout = None ):
606
+ def acquire (
607
+ self , blocking : bool = True , timeout : Optional [float ] = None
608
+ ) -> bool :
567
609
"""Acquire the semaphore. By defaults blocks and waits forever.
568
610
569
611
:param blocking: Block until semaphore is obtained or
@@ -601,7 +643,9 @@ def acquire(self, blocking=True, timeout=None):
601
643
602
644
return self .is_acquired
603
645
604
- def _inner_acquire (self , blocking , timeout = None ):
646
+ def _inner_acquire (
647
+ self , blocking : bool , timeout : Optional [float ] = None
648
+ ) -> bool :
605
649
"""Inner loop that runs from the top anytime a command hits a
606
650
retryable Zookeeper exception."""
607
651
self ._session_expired = False
@@ -642,10 +686,10 @@ def _inner_acquire(self, blocking, timeout=None):
642
686
finally :
643
687
lock .release ()
644
688
645
- def _watch_lease_change (self , event ) :
689
+ def _watch_lease_change (self , event : WatchedEvent ) -> None :
646
690
self .wake_event .set ()
647
691
648
- def _get_lease (self , data = None ) :
692
+ def _get_lease (self ) -> bool :
649
693
# Make sure the session is still valid
650
694
if self ._session_expired :
651
695
raise ForceRetryError ("Retry on session loss at top" )
@@ -674,25 +718,26 @@ def _get_lease(self, data=None):
674
718
# Return current state
675
719
return self .is_acquired
676
720
677
- def _watch_session (self , state ) :
721
+ def _watch_session (self , state : KazooState ) -> Optional [ Literal [ True ]] :
678
722
if state == KazooState .LOST :
679
723
self ._session_expired = True
680
724
self .wake_event .set ()
681
725
682
726
# Return true to de-register
683
727
return True
728
+ return None
684
729
685
- def _best_effort_cleanup (self ):
730
+ def _best_effort_cleanup (self ) -> None :
686
731
try :
687
732
self .client .delete (self .create_path )
688
733
except KazooException : # pragma: nocover
689
734
pass
690
735
691
- def release (self ):
736
+ def release (self ) -> bool :
692
737
"""Release the lease immediately."""
693
738
return self .client .retry (self ._inner_release )
694
739
695
- def _inner_release (self ):
740
+ def _inner_release (self ) -> bool :
696
741
if not self .is_acquired :
697
742
return False
698
743
try :
@@ -702,7 +747,7 @@ def _inner_release(self):
702
747
self .is_acquired = False
703
748
return True
704
749
705
- def lease_holders (self ):
750
+ def lease_holders (self ) -> List [ str ] :
706
751
"""Return an unordered list of the current lease holders.
707
752
708
753
.. note::
@@ -716,7 +761,7 @@ def lease_holders(self):
716
761
717
762
children = self .client .get_children (self .path )
718
763
719
- lease_holders = []
764
+ lease_holders : List [ str ] = []
720
765
for child in children :
721
766
try :
722
767
data , stat = self .client .get (self .path + "/" + child )
@@ -725,8 +770,13 @@ def lease_holders(self):
725
770
pass
726
771
return lease_holders
727
772
728
- def __enter__ (self ):
773
+ def __enter__ (self ) -> None :
729
774
self .acquire ()
730
775
731
- def __exit__ (self , exc_type , exc_value , traceback ):
776
+ def __exit__ (
777
+ self ,
778
+ exc_type : Optional [Type [BaseException ]],
779
+ exc_value : Optional [BaseException ],
780
+ traceback : Any ,
781
+ ) -> None :
732
782
self .release ()
0 commit comments