4
4
:Status: Unknown
5
5
6
6
"""
7
+ from __future__ import annotations
8
+
7
9
import os
8
10
import socket
11
+ from threading import Event
12
+ from typing import TYPE_CHECKING , cast
9
13
import uuid
10
14
11
15
from kazoo .exceptions import KazooException , NoNodeError , NodeExistsError
12
16
from kazoo .protocol .states import EventType
13
17
18
+ if TYPE_CHECKING :
19
+ from typing import Optional
20
+ from typing_extensions import Literal
21
+
22
+ from kazoo .client import KazooClient
23
+ from kazoo .protocol .states import WatchedEvent
24
+
14
25
15
26
class Barrier (object ):
16
27
"""Kazoo Barrier
@@ -27,7 +38,7 @@ class Barrier(object):
27
38
28
39
"""
29
40
30
- def __init__ (self , client , path ):
41
+ def __init__ (self , client : KazooClient , path : str ):
31
42
"""Create a Kazoo Barrier
32
43
33
44
:param client: A :class:`~kazoo.client.KazooClient` instance.
@@ -37,11 +48,11 @@ def __init__(self, client, path):
37
48
self .client = client
38
49
self .path = path
39
50
40
- def create (self ):
51
+ def create (self ) -> None :
41
52
"""Establish the barrier if it doesn't exist already"""
42
53
self .client .retry (self .client .ensure_path , self .path )
43
54
44
- def remove (self ):
55
+ def remove (self ) -> bool :
45
56
"""Remove the barrier
46
57
47
58
:returns: Whether the barrier actually needed to be removed.
@@ -54,17 +65,17 @@ def remove(self):
54
65
except NoNodeError :
55
66
return False
56
67
57
- def wait (self , timeout = None ):
68
+ def wait (self , timeout : Optional [ float ] = None ) -> bool :
58
69
"""Wait on the barrier to be cleared
59
70
60
71
:returns: True if the barrier has been cleared, otherwise
61
72
False.
62
73
:rtype: bool
63
74
64
75
"""
65
- cleared = self .client .handler .event_object ()
76
+ cleared = cast ( Event , self .client .handler .event_object () )
66
77
67
- def wait_for_clear (event ) :
78
+ def wait_for_clear (event : WatchedEvent ) -> None :
68
79
if event .type == EventType .DELETED :
69
80
cleared .set ()
70
81
@@ -93,7 +104,13 @@ class DoubleBarrier(object):
93
104
94
105
"""
95
106
96
- def __init__ (self , client , path , num_clients , identifier = None ):
107
+ def __init__ (
108
+ self ,
109
+ client : KazooClient ,
110
+ path : str ,
111
+ num_clients : int ,
112
+ identifier : Optional [str ] = None ,
113
+ ):
97
114
"""Create a Double Barrier
98
115
99
116
:param client: A :class:`~kazoo.client.KazooClient` instance.
@@ -118,7 +135,7 @@ def __init__(self, client, path, num_clients, identifier=None):
118
135
self .node_name = uuid .uuid4 ().hex
119
136
self .create_path = self .path + "/" + self .node_name
120
137
121
- def enter (self ):
138
+ def enter (self ) -> None :
122
139
"""Enter the barrier, blocks until all nodes have entered"""
123
140
try :
124
141
self .client .retry (self ._inner_enter )
@@ -128,7 +145,7 @@ def enter(self):
128
145
self ._best_effort_cleanup ()
129
146
self .participating = False
130
147
131
- def _inner_enter (self ):
148
+ def _inner_enter (self ) -> Literal [ True ] :
132
149
# make sure our barrier parent node exists
133
150
if not self .assured_path :
134
151
self .client .ensure_path (self .path )
@@ -145,7 +162,7 @@ def _inner_enter(self):
145
162
except NodeExistsError :
146
163
pass
147
164
148
- def created (event ) :
165
+ def created (event : WatchedEvent ) -> None :
149
166
if event .type == EventType .CREATED :
150
167
ready .set ()
151
168
@@ -159,7 +176,7 @@ def created(event):
159
176
self .client .ensure_path (self .path + "/ready" )
160
177
return True
161
178
162
- def leave (self ):
179
+ def leave (self ) -> None :
163
180
"""Leave the barrier, blocks until all nodes have left"""
164
181
try :
165
182
self .client .retry (self ._inner_leave )
@@ -168,7 +185,7 @@ def leave(self):
168
185
self ._best_effort_cleanup ()
169
186
self .participating = False
170
187
171
- def _inner_leave (self ):
188
+ def _inner_leave (self ) -> Literal [ True ] :
172
189
# Delete the ready node if its around
173
190
try :
174
191
self .client .delete (self .path + "/ready" )
@@ -188,7 +205,7 @@ def _inner_leave(self):
188
205
189
206
ready = self .client .handler .event_object ()
190
207
191
- def deleted (event ) :
208
+ def deleted (event : WatchedEvent ) -> None :
192
209
if event .type == EventType .DELETED :
193
210
ready .set ()
194
211
@@ -214,7 +231,7 @@ def deleted(event):
214
231
# Wait for the lowest to be deleted
215
232
ready .wait ()
216
233
217
- def _best_effort_cleanup (self ):
234
+ def _best_effort_cleanup (self ) -> None :
218
235
try :
219
236
self .client .retry (self .client .delete , self .create_path )
220
237
except NoNodeError :
0 commit comments