@@ -164,6 +164,19 @@ def update_synchronous_node_count(self) -> None:
164
164
if r .status_code != 200 :
165
165
raise UpdateSyncNodeCountError (f"received { r .status_code } " )
166
166
167
+ def get_cluster (
168
+ self , attempt : AttemptManager , alternative_endpoints : list [str ] | None = None
169
+ ) -> dict [str , str | int ]:
170
+ """Call the cluster endpoint."""
171
+ url = self ._get_alternative_patroni_url (attempt , alternative_endpoints )
172
+ r = requests .get (
173
+ f"{ url } /cluster" ,
174
+ verify = self ._verify ,
175
+ auth = self ._patroni_auth ,
176
+ timeout = PATRONI_TIMEOUT ,
177
+ )
178
+ return r .json ()
179
+
167
180
def get_primary (
168
181
self , unit_name_pattern = False , alternative_endpoints : list [str ] | None = None
169
182
) -> str :
@@ -180,11 +193,7 @@ def get_primary(
180
193
# Request info from cluster endpoint (which returns all members of the cluster).
181
194
for attempt in Retrying (stop = stop_after_attempt (len (self ._endpoints ) + 1 )):
182
195
with attempt :
183
- url = self ._get_alternative_patroni_url (attempt , alternative_endpoints )
184
- r = requests .get (
185
- f"{ url } /cluster" , verify = self ._verify , timeout = 5 , auth = self ._patroni_auth
186
- )
187
- for member in r .json ()["members" ]:
196
+ for member in self .get_cluster (attempt , alternative_endpoints )["members" ]:
188
197
if member ["role" ] == "leader" :
189
198
primary = member ["name" ]
190
199
if unit_name_pattern :
@@ -209,14 +218,7 @@ def get_standby_leader(
209
218
# Request info from cluster endpoint (which returns all members of the cluster).
210
219
for attempt in Retrying (stop = stop_after_attempt (len (self ._endpoints ) + 1 )):
211
220
with attempt :
212
- url = self ._get_alternative_patroni_url (attempt )
213
- r = requests .get (
214
- f"{ url } /cluster" ,
215
- verify = self ._verify ,
216
- auth = self ._patroni_auth ,
217
- timeout = PATRONI_TIMEOUT ,
218
- )
219
- for member in r .json ()["members" ]:
221
+ for member in self .get_cluster (attempt )["members" ]:
220
222
if member ["role" ] == "standby_leader" :
221
223
if check_whether_is_running and member ["state" ] not in RUNNING_STATES :
222
224
logger .warning (f"standby leader { member ['name' ]} is not running" )
@@ -234,30 +236,33 @@ def get_sync_standby_names(self) -> list[str]:
234
236
# Request info from cluster endpoint (which returns all members of the cluster).
235
237
for attempt in Retrying (stop = stop_after_attempt (len (self ._endpoints ) + 1 )):
236
238
with attempt :
237
- url = self ._get_alternative_patroni_url (attempt )
238
- r = requests .get (
239
- f"{ url } /cluster" ,
240
- verify = self ._verify ,
241
- auth = self ._patroni_auth ,
242
- timeout = PATRONI_TIMEOUT ,
243
- )
244
- for member in r .json ()["members" ]:
239
+ for member in self .get_cluster (attempt )["members" ]:
245
240
if member ["role" ] == "sync_standby" :
246
241
sync_standbys .append ("/" .join (member ["name" ].rsplit ("-" , 1 )))
247
242
return sync_standbys
248
243
249
244
@property
250
- @retry (stop = stop_after_attempt (3 ), wait = wait_exponential (multiplier = 1 , min = 2 , max = 10 ))
251
245
def cluster_members (self ) -> set :
252
246
"""Get the current cluster members."""
253
247
# Request info from cluster endpoint (which returns all members of the cluster).
254
- r = requests .get (
255
- f"{ self ._patroni_url } /cluster" ,
256
- verify = self ._verify ,
257
- auth = self ._patroni_auth ,
258
- timeout = PATRONI_TIMEOUT ,
259
- )
260
- return {member ["name" ] for member in r .json ()["members" ]}
248
+ for attempt in Retrying (
249
+ stop = stop_after_attempt (3 ), wait = wait_exponential (multiplier = 1 , min = 2 , max = 10 )
250
+ ):
251
+ with attempt :
252
+ return {member ["name" ] for member in self .get_cluster (attempt )["members" ]}
253
+
254
+ def get_running_cluster_members (self ) -> list [str ]:
255
+ """List running patroni members."""
256
+ try :
257
+ for attempt in Retrying (stop = stop_after_attempt (1 )):
258
+ with attempt :
259
+ return [
260
+ member ["name" ]
261
+ for member in self .get_cluster (attempt )["members" ]
262
+ if member ["state" ] in RUNNING_STATES
263
+ ]
264
+ except Exception :
265
+ return []
261
266
262
267
def are_all_members_ready (self ) -> bool :
263
268
"""Check if all members are correctly running Patroni and PostgreSQL.
@@ -271,17 +276,13 @@ def are_all_members_ready(self) -> bool:
271
276
try :
272
277
for attempt in Retrying (stop = stop_after_delay (10 ), wait = wait_fixed (3 )):
273
278
with attempt :
274
- r = requests .get (
275
- f"{ self ._patroni_url } /cluster" ,
276
- verify = self ._verify ,
277
- auth = self ._patroni_auth ,
278
- timeout = PATRONI_TIMEOUT ,
279
+ return all (
280
+ member ["state" ] in RUNNING_STATES
281
+ for member in self .get_cluster (attempt )["members" ]
279
282
)
280
283
except RetryError :
281
284
return False
282
285
283
- return all (member ["state" ] in RUNNING_STATES for member in r .json ()["members" ])
284
-
285
286
@property
286
287
def is_creating_backup (self ) -> bool :
287
288
"""Returns whether a backup is being created."""
@@ -291,20 +292,13 @@ def is_creating_backup(self) -> bool:
291
292
try :
292
293
for attempt in Retrying (stop = stop_after_delay (10 ), wait = wait_fixed (3 )):
293
294
with attempt :
294
- r = requests .get (
295
- f"{ self ._patroni_url } /cluster" ,
296
- verify = self ._verify ,
297
- auth = self ._patroni_auth ,
298
- timeout = PATRONI_TIMEOUT ,
295
+ return any (
296
+ "tags" in member and member ["tags" ].get ("is_creating_backup" )
297
+ for member in self .get_cluster (attempt )["members" ]
299
298
)
300
299
except RetryError :
301
300
return False
302
301
303
- return any (
304
- "tags" in member and member ["tags" ].get ("is_creating_backup" )
305
- for member in r .json ()["members" ]
306
- )
307
-
308
302
@property
309
303
def is_replication_healthy (self ) -> bool :
310
304
"""Return whether the replication is healthy."""
0 commit comments