4
4
:Status: Unknown
5
5
6
6
"""
7
+ from __future__ import annotations
8
+
7
9
import os
8
10
import socket
11
+ from typing import TYPE_CHECKING
9
12
import uuid
10
13
11
14
from kazoo .exceptions import KazooException , NoNodeError , NodeExistsError
12
15
from kazoo .protocol .states import EventType
13
16
17
+ if TYPE_CHECKING :
18
+ from typing import Optional
19
+ from typing_extensions import Literal
20
+
21
+ from kazoo .client import KazooClient
22
+ from kazoo .protocol .states import WatchedEvent
23
+
14
24
15
25
class Barrier (object ):
16
26
"""Kazoo Barrier
@@ -27,7 +37,7 @@ class Barrier(object):
27
37
28
38
"""
29
39
30
- def __init__ (self , client , path ):
40
+ def __init__ (self , client : KazooClient , path : str ):
31
41
"""Create a Kazoo Barrier
32
42
33
43
:param client: A :class:`~kazoo.client.KazooClient` instance.
@@ -37,11 +47,11 @@ def __init__(self, client, path):
37
47
self .client = client
38
48
self .path = path
39
49
40
- def create (self ):
50
+ def create (self ) -> None :
41
51
"""Establish the barrier if it doesn't exist already"""
42
52
self .client .retry (self .client .ensure_path , self .path )
43
53
44
- def remove (self ):
54
+ def remove (self ) -> bool :
45
55
"""Remove the barrier
46
56
47
57
:returns: Whether the barrier actually needed to be removed.
@@ -54,7 +64,7 @@ def remove(self):
54
64
except NoNodeError :
55
65
return False
56
66
57
- def wait (self , timeout = None ):
67
+ def wait (self , timeout : Optional [ float ] = None ) -> bool :
58
68
"""Wait on the barrier to be cleared
59
69
60
70
:returns: True if the barrier has been cleared, otherwise
@@ -64,7 +74,7 @@ def wait(self, timeout=None):
64
74
"""
65
75
cleared = self .client .handler .event_object ()
66
76
67
- def wait_for_clear (event ) :
77
+ def wait_for_clear (event : WatchedEvent ) -> None :
68
78
if event .type == EventType .DELETED :
69
79
cleared .set ()
70
80
@@ -93,7 +103,13 @@ class DoubleBarrier(object):
93
103
94
104
"""
95
105
96
- def __init__ (self , client , path , num_clients , identifier = None ):
106
+ def __init__ (
107
+ self ,
108
+ client : KazooClient ,
109
+ path : str ,
110
+ num_clients : int ,
111
+ identifier : Optional [str ] = None ,
112
+ ):
97
113
"""Create a Double Barrier
98
114
99
115
:param client: A :class:`~kazoo.client.KazooClient` instance.
@@ -118,7 +134,7 @@ def __init__(self, client, path, num_clients, identifier=None):
118
134
self .node_name = uuid .uuid4 ().hex
119
135
self .create_path = self .path + "/" + self .node_name
120
136
121
- def enter (self ):
137
+ def enter (self ) -> None :
122
138
"""Enter the barrier, blocks until all nodes have entered"""
123
139
try :
124
140
self .client .retry (self ._inner_enter )
@@ -128,7 +144,7 @@ def enter(self):
128
144
self ._best_effort_cleanup ()
129
145
self .participating = False
130
146
131
- def _inner_enter (self ):
147
+ def _inner_enter (self ) -> Literal [ True ] :
132
148
# make sure our barrier parent node exists
133
149
if not self .assured_path :
134
150
self .client .ensure_path (self .path )
@@ -145,7 +161,7 @@ def _inner_enter(self):
145
161
except NodeExistsError :
146
162
pass
147
163
148
- def created (event ) :
164
+ def created (event : WatchedEvent ) -> None :
149
165
if event .type == EventType .CREATED :
150
166
ready .set ()
151
167
@@ -159,7 +175,7 @@ def created(event):
159
175
self .client .ensure_path (self .path + "/ready" )
160
176
return True
161
177
162
- def leave (self ):
178
+ def leave (self ) -> None :
163
179
"""Leave the barrier, blocks until all nodes have left"""
164
180
try :
165
181
self .client .retry (self ._inner_leave )
@@ -168,7 +184,7 @@ def leave(self):
168
184
self ._best_effort_cleanup ()
169
185
self .participating = False
170
186
171
- def _inner_leave (self ):
187
+ def _inner_leave (self ) -> Literal [ True ] :
172
188
# Delete the ready node if its around
173
189
try :
174
190
self .client .delete (self .path + "/ready" )
@@ -188,7 +204,7 @@ def _inner_leave(self):
188
204
189
205
ready = self .client .handler .event_object ()
190
206
191
- def deleted (event ) :
207
+ def deleted (event : WatchedEvent ) -> None :
192
208
if event .type == EventType .DELETED :
193
209
ready .set ()
194
210
@@ -214,7 +230,7 @@ def deleted(event):
214
230
# Wait for the lowest to be deleted
215
231
ready .wait ()
216
232
217
- def _best_effort_cleanup (self ):
233
+ def _best_effort_cleanup (self ) -> None :
218
234
try :
219
235
self .client .retry (self .client .delete , self .create_path )
220
236
except NoNodeError :
0 commit comments