|
23 | 23 | import typing
|
24 | 24 |
|
25 | 25 | from sambacc import config
|
| 26 | +from sambacc import leader |
26 | 27 | from sambacc import samba_cmds
|
27 | 28 | from sambacc.jfile import ClusterMetaJSONFile
|
28 | 29 | from sambacc.netcmd_loader import template_config
|
@@ -509,6 +510,87 @@ def _save_nodes(path: str, ctdb_nodes: list[str]) -> None:
|
509 | 510 | os.fsync(nffh)
|
510 | 511 |
|
511 | 512 |
|
| 513 | +def monitor_cluster_meta_changes( |
| 514 | + cmeta: ClusterMeta, |
| 515 | + pause_func: typing.Callable, |
| 516 | + *, |
| 517 | + nodes_file_path: typing.Optional[str] = None, |
| 518 | + reload_all: bool = False, |
| 519 | + leader_locator: typing.Optional[leader.LeaderLocator] = None, |
| 520 | +) -> None: |
| 521 | + """Monitor cluster meta for changes, reflecting those changes into ctdb. |
| 522 | +
|
| 523 | + Unlike manage_cluster_meta_updates this function never changes the |
| 524 | + contents of the nodes list in the cluster meta and takes those values |
| 525 | + as a given, assuming some external agent has the correct global view of |
| 526 | + the cluster and is updating it correctly. This function exists to |
| 527 | + translate that content into something ctdb can understand. |
| 528 | + """ |
| 529 | + prev_meta: dict[str, typing.Any] = {} |
| 530 | + if nodes_file_path: |
| 531 | + prev_nodes = read_ctdb_nodes(nodes_file_path) |
| 532 | + else: |
| 533 | + with cmeta.open(locked=True) as cmo: |
| 534 | + meta1 = cmo.load() |
| 535 | + prev_nodes = _cluster_meta_to_ctdb_nodes(meta1.get("nodes", [])) |
| 536 | + _logger.debug("initial cluster meta content: %r", prev_meta) |
| 537 | + _logger.debug("initial nodes content: %r", prev_nodes) |
| 538 | + while True: |
| 539 | + pause_func() |
| 540 | + with cmeta.open(locked=True) as cmo: |
| 541 | + curr_meta = cmo.load() |
| 542 | + if curr_meta == prev_meta: |
| 543 | + _logger.debug("cluster meta content unchanged: %r", curr_meta) |
| 544 | + continue |
| 545 | + _logger.info("cluster meta content changed") |
| 546 | + _logger.debug( |
| 547 | + "cluster meta: previous=%r current=%r", prev_meta, curr_meta |
| 548 | + ) |
| 549 | + prev_meta = curr_meta |
| 550 | + |
| 551 | + # maybe some other metadata changed? |
| 552 | + expected_nodes = _cluster_meta_to_ctdb_nodes( |
| 553 | + curr_meta.get("nodes", []) |
| 554 | + ) |
| 555 | + if prev_nodes == expected_nodes: |
| 556 | + _logger.debug("ctdb nodes list unchanged: %r", expected_nodes) |
| 557 | + continue |
| 558 | + _logger.info("ctdb nodes list changed") |
| 559 | + _logger.debug( |
| 560 | + "nodes list: previous=%r current=%r", prev_nodes, expected_nodes |
| 561 | + ) |
| 562 | + prev_nodes = expected_nodes |
| 563 | + |
| 564 | + if nodes_file_path: |
| 565 | + _logger.info("updating nodes file: %s", nodes_file_path) |
| 566 | + _save_nodes(nodes_file_path, expected_nodes) |
| 567 | + _maybe_reload_nodes(leader_locator, reload_all=reload_all) |
| 568 | + |
| 569 | + |
| 570 | +def _maybe_reload_nodes( |
| 571 | + leader_locator: typing.Optional[leader.LeaderLocator] = None, |
| 572 | + reload_all: bool = False, |
| 573 | +) -> None: |
| 574 | + """Issue a reloadnodes command if leader_locator is available and |
| 575 | + node is leader or reload_all is true. |
| 576 | + """ |
| 577 | + if reload_all: |
| 578 | + _logger.info("running: ctdb reloadnodes") |
| 579 | + subprocess.check_call(list(samba_cmds.ctdb["reloadnodes"])) |
| 580 | + return |
| 581 | + if leader_locator is None: |
| 582 | + _logger.warning("no leader locator: not calling reloadnodes") |
| 583 | + return |
| 584 | + # use the leader locator to only issue the reloadnodes command once |
| 585 | + # for a change instead of all the nodes "spamming" the cluster |
| 586 | + with leader_locator as ll: |
| 587 | + if ll.is_leader(): |
| 588 | + _logger.info("running: ctdb reloadnodes") |
| 589 | + subprocess.check_call(list(samba_cmds.ctdb["reloadnodes"])) |
| 590 | + else: |
| 591 | + _logger.info("node is not leader. skipping reloadnodes") |
| 592 | + |
| 593 | + |
512 | 594 | def ensure_ctdbd_etc_files(
|
513 | 595 | etc_path: str = ETC_DIR, src_path: str = SHARE_DIR
|
514 | 596 | ) -> None:
|
|
0 commit comments