|
17 | 17 | #
|
18 | 18 |
|
19 | 19 | import argparse
|
| 20 | +import contextlib |
20 | 21 | import logging
|
21 | 22 | import os
|
22 | 23 | import socket
|
@@ -311,28 +312,15 @@ def ctdb_manage_nodes(ctx: Context) -> None:
|
311 | 312 | expected_pnn = np.node_number or 0
|
312 | 313 | waiter = np.cluster_meta_waiter()
|
313 | 314 |
|
314 |
| - errors = 0 |
| 315 | + limiter = ErrorLimiter("ctdb_manage_nodes", 10, pause_func=waiter.wait) |
315 | 316 | while True:
|
316 |
| - try: |
| 317 | + with limiter.catch(): |
317 | 318 | ctdb.monitor_cluster_meta_updates(
|
318 | 319 | cmeta=np.cluster_meta(),
|
319 | 320 | pnn=expected_pnn,
|
320 | 321 | real_path=np.persistent_path,
|
321 | 322 | pause_func=waiter.wait,
|
322 | 323 | )
|
323 |
| - errors = 0 |
324 |
| - except KeyboardInterrupt: |
325 |
| - raise |
326 |
| - except Exception as err: |
327 |
| - _logger.error( |
328 |
| - f"error during manage_nodes: {err}, count={errors}", |
329 |
| - exc_info=True, |
330 |
| - ) |
331 |
| - errors += 1 |
332 |
| - if errors > 10: |
333 |
| - _logger.error(f"too many retries ({errors}). giving up") |
334 |
| - raise |
335 |
| - waiter.wait() |
336 | 324 |
|
337 | 325 |
|
338 | 326 | def _ctdb_must_have_node_args(parser: argparse.ArgumentParser) -> None:
|
@@ -411,3 +399,43 @@ def ctdb_rados_mutex(ctx: Context) -> None:
|
411 | 399 | cmd = cmd["-n", namespace]
|
412 | 400 | _logger.debug("executing command: %r", cmd)
|
413 | 401 | samba_cmds.execute(cmd) # replaces process
|
| 402 | + |
| 403 | + |
| 404 | +class ErrorLimiter: |
| 405 | + def __init__( |
| 406 | + self, |
| 407 | + name: str, |
| 408 | + limit: int, |
| 409 | + *, |
| 410 | + pause_func: typing.Optional[typing.Callable] = None, |
| 411 | + ) -> None: |
| 412 | + self.name = name |
| 413 | + self.limit = limit |
| 414 | + self.errors = 0 |
| 415 | + self.pause_func = pause_func |
| 416 | + |
| 417 | + def post_catch(self): |
| 418 | + if self.pause_func is not None: |
| 419 | + self.pause_func() |
| 420 | + |
| 421 | + @contextlib.contextmanager |
| 422 | + def catch(self) -> typing.Iterator[None]: |
| 423 | + try: |
| 424 | + _logger.debug( |
| 425 | + "error limiter proceeding: %s: errors=%r", |
| 426 | + self.name, |
| 427 | + self.errors, |
| 428 | + ) |
| 429 | + yield |
| 430 | + except KeyboardInterrupt: |
| 431 | + raise |
| 432 | + except Exception as err: |
| 433 | + _logger.error( |
| 434 | + f"error during {self.name}: {err}, count={self.errors}", |
| 435 | + exc_info=True, |
| 436 | + ) |
| 437 | + self.errors += 1 |
| 438 | + if self.errors > self.limit: |
| 439 | + _logger.error(f"too many retries ({self.errors}). giving up") |
| 440 | + raise |
| 441 | + self.post_catch() |
0 commit comments