15
15
16
16
"""
17
17
import re
18
- from time import monotonic as now
19
18
import uuid
19
+ from time import monotonic as now
20
+ from typing import TYPE_CHECKING, Any, ContextManager, List, Optional, Sequence, Type, Union, cast
21
+
22
+ from kazoo.exceptions import (CancelledError, KazooException, LockTimeout,
23
+ NoNodeError)
24
+ from kazoo.protocol.states import KazooState, WatchedEvent
25
+ from kazoo.retry import ForceRetryError, KazooRetry, RetryFailedError
20
26
21
- from kazoo.exceptions import (
22
- CancelledError,
23
- KazooException,
24
- LockTimeout,
25
- NoNodeError,
26
- )
27
- from kazoo.protocol.states import KazooState
28
- from kazoo.retry import (
29
- ForceRetryError,
30
- KazooRetry,
31
- RetryFailedError,
32
- )
27
+ if TYPE_CHECKING:
28
+ from kazoo.client import KazooClient
29
+ from typing_extensions import Literal
33
30
34
31
35
32
class _Watch(object):
36
- def __init__(self, duration= None):
33
+ def __init__(self, duration: Optional[float] = None):
37
34
self.duration = duration
38
- self.started_at = None
35
+ self.started_at: Optional[float] = None
39
36
40
- def start(self):
37
+ def start(self) -> None :
41
38
self.started_at = now()
42
39
43
- def leftover(self):
40
+ def leftover(self) -> Optional[float] :
44
41
if self.duration is None:
45
42
return None
46
43
else:
47
- elapsed = now() - self.started_at
44
+ elapsed = now() - cast(float, self.started_at)
48
45
return max(0, self.duration - elapsed)
49
46
50
47
51
- class Lock(object ):
48
+ class Lock(ContextManager[None] ):
52
49
"""Kazoo Lock
53
50
54
51
Example usage with a :class:`~kazoo.client.KazooClient` instance:
@@ -77,7 +74,13 @@ class Lock(object):
77
74
# sequence number. Involved in read/write locks.
78
75
_EXCLUDE_NAMES = ["__lock__"]
79
76
80
- def __init__(self, client, path, identifier=None, extra_lock_patterns=()):
77
+ def __init__(
78
+ self,
79
+ client: KazooClient,
80
+ path: str,
81
+ identifier: Optional[str] = None,
82
+ extra_lock_patterns: Sequence[str] = (),
83
+ ):
81
84
"""Create a Kazoo lock.
82
85
83
86
:param client: A :class:`~kazoo.client.KazooClient` instance.
@@ -109,7 +112,7 @@ def __init__(self, client, path, identifier=None, extra_lock_patterns=()):
109
112
# some data is written to the node. this can be queried via
110
113
# contenders() to see who is contending for the lock
111
114
self.data = str(identifier or "").encode("utf-8")
112
- self.node = None
115
+ self.node: Optional[str] = None
113
116
114
117
self.wake_event = client.handler.event_object()
115
118
@@ -125,20 +128,25 @@ def __init__(self, client, path, identifier=None, extra_lock_patterns=()):
125
128
self.assured_path = False
126
129
self.cancelled = False
127
130
self._retry = KazooRetry(
128
- max_tries=None , sleep_func=client.handler.sleep_func
131
+ max_tries=-1 , sleep_func=client.handler.sleep_func
129
132
)
130
133
self._acquire_method_lock = client.handler.lock_object()
131
134
132
- def _ensure_path(self):
135
+ def _ensure_path(self) -> None :
133
136
self.client.ensure_path(self.path)
134
137
self.assured_path = True
135
138
136
- def cancel(self):
139
+ def cancel(self) -> None :
137
140
"""Cancel a pending lock acquire."""
138
141
self.cancelled = True
139
142
self.wake_event.set()
140
143
141
- def acquire(self, blocking=True, timeout=None, ephemeral=True):
144
+ def acquire(
145
+ self,
146
+ blocking: bool = True,
147
+ timeout: Optional[float] = None,
148
+ ephemeral: bool = True,
149
+ ) -> bool:
142
150
"""
143
151
Acquire the lock. By defaults blocks and waits forever.
144
152
@@ -204,11 +212,13 @@ def acquire(self, blocking=True, timeout=None, ephemeral=True):
204
212
finally:
205
213
self._acquire_method_lock.release()
206
214
207
- def _watch_session(self, state) :
215
+ def _watch_session(self, state: KazooState) -> bool :
208
216
self.wake_event.set()
209
217
return True
210
218
211
- def _inner_acquire(self, blocking, timeout, ephemeral=True):
219
+ def _inner_acquire(
220
+ self, blocking: bool, timeout: Optional[float], ephemeral: bool=True
221
+ ) -> bool:
212
222
213
223
# wait until it's our chance to get it..
214
224
if self.is_acquired:
@@ -266,10 +276,10 @@ def _inner_acquire(self, blocking, timeout, ephemeral=True):
266
276
finally:
267
277
self.client.remove_listener(self._watch_session)
268
278
269
- def _watch_predecessor(self, event) :
279
+ def _watch_predecessor(self, event: WatchedEvent) -> None :
270
280
self.wake_event.set()
271
281
272
- def _get_predecessor(self, node) :
282
+ def _get_predecessor(self, node: str) -> Optional[str] :
273
283
"""returns `node`'s predecessor or None
274
284
275
285
Note: This handle the case where the current lock is not a contender
@@ -278,7 +288,7 @@ def _get_predecessor(self, node):
278
288
"""
279
289
node_sequence = node[len(self.prefix) :]
280
290
children = self.client.get_children(self.path)
281
- found_self = False
291
+ found_self: Union[Literal[False],re.Match[bytes]] = False
282
292
# Filter out the contenders using the computed regex
283
293
contender_matches = []
284
294
for child in children:
@@ -293,7 +303,7 @@ def _get_predecessor(self, node):
293
303
if child == node:
294
304
# Remember the node's match object so we can short circuit
295
305
# below.
296
- found_self = match
306
+ found_self = cast(re.Match[bytes], match)
297
307
298
308
if found_self is False: # pragma: nocover
299
309
# somehow we aren't in the childrens -- probably we are
@@ -309,42 +319,42 @@ def _get_predecessor(self, node):
309
319
sorted_matches = sorted(contender_matches, key=lambda m: m.groups())
310
320
return sorted_matches[-1].string
311
321
312
- def _find_node(self):
322
+ def _find_node(self) -> Optional[str] :
313
323
children = self.client.get_children(self.path)
314
324
for child in children:
315
325
if child.startswith(self.prefix):
316
326
return child
317
327
return None
318
328
319
- def _delete_node(self, node) :
329
+ def _delete_node(self, node: str) -> None :
320
330
self.client.delete(self.path + "/" + node)
321
331
322
- def _best_effort_cleanup(self):
332
+ def _best_effort_cleanup(self) -> None :
323
333
try:
324
334
node = self.node or self._find_node()
325
335
if node:
326
336
self._delete_node(node)
327
337
except KazooException: # pragma: nocover
328
338
pass
329
339
330
- def release(self):
340
+ def release(self) -> bool :
331
341
"""Release the lock immediately."""
332
342
return self.client.retry(self._inner_release)
333
343
334
- def _inner_release(self):
344
+ def _inner_release(self) -> bool :
335
345
if not self.is_acquired:
336
346
return False
337
347
338
348
try:
339
- self._delete_node(self.node)
349
+ self._delete_node(cast(str, self.node) )
340
350
except NoNodeError: # pragma: nocover
341
351
pass
342
352
343
353
self.is_acquired = False
344
354
self.node = None
345
355
return True
346
356
347
- def contenders(self):
357
+ def contenders(self) -> List[str] :
348
358
"""Return an ordered list of the current contenders for the
349
359
lock.
350
360
@@ -380,7 +390,7 @@ def contenders(self):
380
390
for match in sorted(contender_matches, key=lambda m: m.groups())
381
391
]
382
392
# Retrieve all the contender nodes data (preserving order).
383
- contenders = []
393
+ contenders: List[str] = []
384
394
for node in contender_nodes:
385
395
try:
386
396
data, stat = self.client.get(self.path + "/" + node)
@@ -391,10 +401,10 @@ def contenders(self):
391
401
392
402
return contenders
393
403
394
- def __enter__(self):
404
+ def __enter__(self) -> None :
395
405
self.acquire()
396
406
397
- def __exit__(self, exc_type, exc_value, traceback) :
407
+ def __exit__(self, exc_type: Optional[Type[BaseException]] , exc_value: Optional[BaseException] , traceback: Any) -> None :
398
408
self.release()
399
409
400
410
@@ -458,7 +468,7 @@ class ReadLock(Lock):
458
468
_EXCLUDE_NAMES = ["__lock__"]
459
469
460
470
461
- class Semaphore(object ):
471
+ class Semaphore(ContextManager[None] ):
462
472
"""A Zookeeper-based Semaphore
463
473
464
474
This synchronization primitive operates in the same manner as the
@@ -493,7 +503,13 @@ class Semaphore(object):
493
503
494
504
"""
495
505
496
- def __init__(self, client, path, identifier=None, max_leases=1):
506
+ def __init__(
507
+ self,
508
+ client: KazooClient,
509
+ path: str,
510
+ identifier: Optional[str] = None,
511
+ max_leases: int = 1,
512
+ ):
497
513
"""Create a Kazoo Lock
498
514
499
515
:param client: A :class:`~kazoo.client.KazooClient` instance.
@@ -529,7 +545,7 @@ def __init__(self, client, path, identifier=None, max_leases=1):
529
545
self.cancelled = False
530
546
self._session_expired = False
531
547
532
- def _ensure_path(self):
548
+ def _ensure_path(self) -> None :
533
549
result = self.client.ensure_path(self.path)
534
550
self.assured_path = True
535
551
if result is True:
@@ -550,12 +566,14 @@ def _ensure_path(self):
550
566
else:
551
567
self.client.set(self.path, str(self.max_leases).encode("utf-8"))
552
568
553
- def cancel(self):
569
+ def cancel(self) -> None :
554
570
"""Cancel a pending semaphore acquire."""
555
571
self.cancelled = True
556
572
self.wake_event.set()
557
573
558
- def acquire(self, blocking=True, timeout=None):
574
+ def acquire(
575
+ self, blocking: bool = True, timeout: Optional[float] = None
576
+ ) -> bool:
559
577
"""Acquire the semaphore. By defaults blocks and waits forever.
560
578
561
579
:param blocking: Block until semaphore is obtained or
@@ -593,7 +611,7 @@ def acquire(self, blocking=True, timeout=None):
593
611
594
612
return self.is_acquired
595
613
596
- def _inner_acquire(self, blocking, timeout= None):
614
+ def _inner_acquire(self, blocking: bool , timeout: Optional[float] = None) -> bool :
597
615
"""Inner loop that runs from the top anytime a command hits a
598
616
retryable Zookeeper exception."""
599
617
self._session_expired = False
@@ -634,10 +652,10 @@ def _inner_acquire(self, blocking, timeout=None):
634
652
finally:
635
653
lock.release()
636
654
637
- def _watch_lease_change(self, event) :
655
+ def _watch_lease_change(self, event: WatchedEvent) -> None :
638
656
self.wake_event.set()
639
657
640
- def _get_lease(self, data=None) :
658
+ def _get_lease(self) -> bool :
641
659
# Make sure the session is still valid
642
660
if self._session_expired:
643
661
raise ForceRetryError("Retry on session loss at top")
@@ -666,25 +684,26 @@ def _get_lease(self, data=None):
666
684
# Return current state
667
685
return self.is_acquired
668
686
669
- def _watch_session(self, state) :
687
+ def _watch_session(self, state: KazooState) -> Optional[Literal[True]] :
670
688
if state == KazooState.LOST:
671
689
self._session_expired = True
672
690
self.wake_event.set()
673
691
674
692
# Return true to de-register
675
693
return True
694
+ return None
676
695
677
- def _best_effort_cleanup(self):
696
+ def _best_effort_cleanup(self) -> None :
678
697
try:
679
698
self.client.delete(self.create_path)
680
699
except KazooException: # pragma: nocover
681
700
pass
682
701
683
- def release(self):
702
+ def release(self) -> bool :
684
703
"""Release the lease immediately."""
685
704
return self.client.retry(self._inner_release)
686
705
687
- def _inner_release(self):
706
+ def _inner_release(self) -> bool :
688
707
if not self.is_acquired:
689
708
return False
690
709
try:
@@ -694,7 +713,7 @@ def _inner_release(self):
694
713
self.is_acquired = False
695
714
return True
696
715
697
- def lease_holders(self):
716
+ def lease_holders(self) -> List[str] :
698
717
"""Return an unordered list of the current lease holders.
699
718
700
719
.. note::
@@ -708,7 +727,7 @@ def lease_holders(self):
708
727
709
728
children = self.client.get_children(self.path)
710
729
711
- lease_holders = []
730
+ lease_holders: List[str] = []
712
731
for child in children:
713
732
try:
714
733
data, stat = self.client.get(self.path + "/" + child)
@@ -717,8 +736,8 @@ def lease_holders(self):
717
736
pass
718
737
return lease_holders
719
738
720
- def __enter__(self):
739
+ def __enter__(self) -> None :
721
740
self.acquire()
722
741
723
- def __exit__(self, exc_type, exc_value, traceback) :
742
+ def __exit__(self, exc_type: Optional[Type[BaseException]] , exc_value: Optional[BaseException] , traceback: Any) -> None :
724
743
self.release()
0 commit comments