diff --git a/src/charm.py b/src/charm.py index f0a119d5db..5281ee6411 100755 --- a/src/charm.py +++ b/src/charm.py @@ -825,6 +825,7 @@ def _update_members_ips(self, ip_to_add: str = None, ip_to_remove: str = None) - elif ip_to_remove: ips.remove(ip_to_remove) self._peers.data[self.app]["members_ips"] = json.dumps(ips) + self._observer.restart_observer() @retry( stop=stop_after_delay(60), @@ -859,6 +860,7 @@ def _unit_ip(self) -> str: def _on_cluster_topology_change(self, _): """Updates endpoints and (optionally) certificates when the cluster topology changes.""" logger.info("Cluster topology changed") + self._observer.restart_observer() if self.primary_endpoint: self._update_relation_endpoints() self.unit.status = ActiveStatus() diff --git a/src/cluster.py b/src/cluster.py index a45e6f889b..98e807f4f2 100644 --- a/src/cluster.py +++ b/src/cluster.py @@ -16,7 +16,6 @@ from charms.operator_libs_linux.v2 import snap from jinja2 import Template from tenacity import ( - AttemptManager, RetryError, Retrying, retry, @@ -210,7 +209,7 @@ def get_member_ip(self, member_name: str) -> str: # Request info from cluster endpoint (which returns all members of the cluster). for attempt in Retrying(stop=stop_after_attempt(2 * len(self.peers_ips) + 1)): with attempt: - url = self._get_alternative_patroni_url(attempt) + url = self._get_alternative_patroni_url(attempt.retry_state.attempt_number) cluster_status = requests.get( f"{url}/{PATRONI_CLUSTER_STATUS_ENDPOINT}", verify=self.verify, @@ -234,7 +233,7 @@ def get_member_status(self, member_name: str) -> str: # Request info from cluster endpoint (which returns all members of the cluster). for attempt in Retrying(stop=stop_after_attempt(2 * len(self.peers_ips) + 1)): with attempt: - url = self._get_alternative_patroni_url(attempt) + url = self._get_alternative_patroni_url(attempt.retry_state.attempt_number) cluster_status = requests.get( f"{url}/{PATRONI_CLUSTER_STATUS_ENDPOINT}", verify=self.verify, @@ -259,7 +258,9 @@ def get_primary(self, unit_name_pattern=False, alternative_endpoints: List[str] # Request info from cluster endpoint (which returns all members of the cluster). for attempt in Retrying(stop=stop_after_attempt(2 * len(self.peers_ips) + 1)): with attempt: - url = self._get_alternative_patroni_url(attempt, alternative_endpoints) + url = self._get_alternative_patroni_url( + attempt.retry_state.attempt_number, alternative_endpoints + ) cluster_status = requests.get( f"{url}/{PATRONI_CLUSTER_STATUS_ENDPOINT}", verify=self.verify, @@ -289,7 +290,7 @@ def get_standby_leader( # Request info from cluster endpoint (which returns all members of the cluster). for attempt in Retrying(stop=stop_after_attempt(2 * len(self.peers_ips) + 1)): with attempt: - url = self._get_alternative_patroni_url(attempt) + url = self._get_alternative_patroni_url(attempt.retry_state.attempt_number) cluster_status = requests.get( f"{url}/{PATRONI_CLUSTER_STATUS_ENDPOINT}", verify=self.verify, @@ -313,7 +314,7 @@ def get_sync_standby_names(self) -> List[str]: # Request info from cluster endpoint (which returns all members of the cluster). for attempt in Retrying(stop=stop_after_attempt(2 * len(self.peers_ips) + 1)): with attempt: - url = self._get_alternative_patroni_url(attempt) + url = self._get_alternative_patroni_url(attempt.retry_state.attempt_number) r = requests.get(f"{url}/cluster", verify=self.verify, auth=self._patroni_auth) for member in r.json()["members"]: if member["role"] == "sync_standby": @@ -321,7 +322,7 @@ def get_sync_standby_names(self) -> List[str]: return sync_standbys def _get_alternative_patroni_url( - self, attempt: AttemptManager, alternative_endpoints: List[str] = None + self, attempt_number: int, alternative_endpoints: List[str] = None ) -> str: """Get an alternative REST API URL from another member each time. @@ -330,9 +331,9 @@ def _get_alternative_patroni_url( """ if alternative_endpoints is not None: return self._patroni_url.replace( - self.unit_ip, alternative_endpoints[attempt.retry_state.attempt_number - 1] + self.unit_ip, alternative_endpoints[attempt_number - 1] ) - attempt_number = attempt.retry_state.attempt_number + attempt_number = attempt_number if attempt_number > 1: url = self._patroni_url # Build the URL using http and later using https for each peer. diff --git a/src/cluster_topology_observer.py b/src/cluster_topology_observer.py index 31fd591e4a..a43c82633d 100644 --- a/src/cluster_topology_observer.py +++ b/src/cluster_topology_observer.py @@ -54,9 +54,16 @@ def __init__(self, charm: CharmBase, run_cmd: str): self._charm = charm self._run_cmd = run_cmd - def start_observer(self): + def restart_observer(self): + """Restart the cluster topology observer process.""" + self.stop_observer() + self.start_observer(skip_status_check=True) + + def start_observer(self, skip_status_check: bool = False): """Start the cluster topology observer running in a new process.""" - if not isinstance(self._charm.unit.status, ActiveStatus) or self._charm._peers is None: + if not skip_status_check and ( + not isinstance(self._charm.unit.status, ActiveStatus) or self._charm._peers is None + ): return if "observer-pid" in self._charm._peers.data[self._charm.unit]: # Double check that the PID exists @@ -80,6 +87,10 @@ def start_observer(self): "/usr/bin/python3", "src/cluster_topology_observer.py", self._charm._patroni._patroni_url, + ",".join([ + self._charm._patroni._get_alternative_patroni_url(number) + for number in range(2 * len(self._charm._peer_members_ips) + 1)[1:] + ]), f"{self._charm._patroni.verify}", self._run_cmd, self._charm.unit.name, @@ -129,30 +140,45 @@ def main(): Watch the Patroni API cluster info. When changes are detected, dispatch the change event. """ - patroni_url, verify, run_cmd, unit, charm_dir = sys.argv[1:] + patroni_url, alternative_patroni_urls, verify, run_cmd, unit, charm_dir = sys.argv[1:] previous_cluster_topology = {} + urls = [patroni_url] + list(filter(None, alternative_patroni_urls.split(","))) while True: - cluster_status = requests.get( - f"{patroni_url}/{PATRONI_CLUSTER_STATUS_ENDPOINT}", - verify=verify, - timeout=API_REQUEST_TIMEOUT, - ) - current_cluster_topology = { - member["name"]: member["role"] for member in cluster_status.json()["members"] - } - - # If it's the first time the cluster topology was retrieved, then store it and use - # it for subsequent checks. - if not previous_cluster_topology: - previous_cluster_topology = current_cluster_topology - # If the cluster topology changed, dispatch a charm event to handle this change. - elif current_cluster_topology != previous_cluster_topology: - previous_cluster_topology = current_cluster_topology - dispatch(run_cmd, unit, charm_dir) - - # Wait some time before checking again for a cluster topology change. - sleep(30) + for url in urls: + try: + cluster_status = requests.get( + f"{url}/{PATRONI_CLUSTER_STATUS_ENDPOINT}", + verify=verify, + timeout=API_REQUEST_TIMEOUT, + ) + except Exception as e: + with open(LOG_FILE_PATH, "a") as log_file: + log_file.write( + f"Failed to get cluster status when using {url}: {e} - {type(e)}\n" + ) + if url == urls[-1]: + with open(LOG_FILE_PATH, "a") as log_file: + log_file.write("No more peers to try to get the cluster status from.\n") + break + else: + continue + else: + current_cluster_topology = { + member["name"]: member["role"] for member in cluster_status.json()["members"] + } + + # If it's the first time the cluster topology was retrieved, then store it and use + # it for subsequent checks. + if not previous_cluster_topology: + previous_cluster_topology = current_cluster_topology + # If the cluster topology changed, dispatch a charm event to handle this change. + elif current_cluster_topology != previous_cluster_topology: + previous_cluster_topology = current_cluster_topology + dispatch(run_cmd, unit, charm_dir) + + # Wait some time before checking again for a cluster topology change. + sleep(30) if __name__ == "__main__": diff --git a/tests/unit/test_charm.py b/tests/unit/test_charm.py index 80ec9f1975..a46f7cf665 100644 --- a/tests/unit/test_charm.py +++ b/tests/unit/test_charm.py @@ -201,7 +201,7 @@ def test_primary_endpoint_no_peers(harness): def test_on_leader_elected(harness): - with patch( + with patch("charm.ClusterTopologyObserver.start_observer") as _start_observer, patch( "charm.PostgresqlOperatorCharm._update_relation_endpoints", new_callable=PropertyMock ) as _update_relation_endpoints, patch( "charm.PostgresqlOperatorCharm.primary_endpoint", @@ -565,6 +565,7 @@ def test_enable_disable_extensions(harness, caplog): def test_on_start(harness): with ( + patch("charm.ClusterTopologyObserver.start_observer") as _start_observer, patch( "charm.PostgresqlOperatorCharm._restart_services_after_reboot" ) as _restart_services_after_reboot, @@ -727,6 +728,7 @@ def test_on_start_replica(harness): def test_on_start_no_patroni_member(harness): with ( + patch("charm.ClusterTopologyObserver.start_observer") as _start_observer, patch("subprocess.check_output", return_value=b"C"), patch("charm.snap.SnapCache") as _snap_cache, patch("charm.PostgresqlOperatorCharm.postgresql") as _postgresql, @@ -776,7 +778,10 @@ def test_on_start_after_blocked_state(harness): def test_on_get_password(harness): - with patch("charm.PostgresqlOperatorCharm.update_config"): + with ( + patch("charm.ClusterTopologyObserver.start_observer") as _start_observer, + patch("charm.PostgresqlOperatorCharm.update_config"), + ): rel_id = harness.model.get_relation(PEER).id # Create a mock event and set passwords in peer relation data. harness.set_leader(True) @@ -1327,6 +1332,7 @@ def test_on_cluster_topology_change(harness): patch( "charm.PostgresqlOperatorCharm.primary_endpoint", new_callable=PropertyMock ) as _primary_endpoint, + patch("charm.ClusterTopologyObserver.restart_observer") as _restart_observer, ): # Mock the property value. _primary_endpoint.side_effect = [None, "1.1.1.1"] @@ -1350,6 +1356,7 @@ def test_on_cluster_topology_change_keep_blocked(harness): patch( "charm.PostgresqlOperatorCharm._update_relation_endpoints" ) as _update_relation_endpoints, + patch("charm.ClusterTopologyObserver.restart_observer") as _restart_observer, ): harness.model.unit.status = WaitingStatus(PRIMARY_NOT_REACHABLE_MESSAGE) @@ -1371,6 +1378,7 @@ def test_on_cluster_topology_change_clear_blocked(harness): patch( "charm.PostgresqlOperatorCharm._update_relation_endpoints" ) as _update_relation_endpoints, + patch("charm.ClusterTopologyObserver.restart_observer") as _restart_observer, ): harness.model.unit.status = WaitingStatus(PRIMARY_NOT_REACHABLE_MESSAGE) diff --git a/tests/unit/test_cluster.py b/tests/unit/test_cluster.py index 370f47c147..bae7d53dea 100644 --- a/tests/unit/test_cluster.py +++ b/tests/unit/test_cluster.py @@ -85,19 +85,14 @@ def patroni(harness, peers_ips): def test_get_alternative_patroni_url(peers_ips, patroni): - # Mock tenacity attempt. - retry = tenacity.Retrying() - retry_state = tenacity.RetryCallState(retry, None, None, None) - attempt = tenacity.AttemptManager(retry_state) - # Test the first URL that is returned (it should have the current unit IP). - url = patroni._get_alternative_patroni_url(attempt) + attempt_number = 1 + url = patroni._get_alternative_patroni_url(attempt_number) tc.assertEqual(url, f"http://{patroni.unit_ip}:8008") # Test returning the other servers URLs. - for attempt_number in range(attempt.retry_state.attempt_number + 1, len(peers_ips) + 2): - attempt.retry_state.attempt_number = attempt_number - url = patroni._get_alternative_patroni_url(attempt) + for attempt_number in range(attempt_number + 1, len(peers_ips) + 2): + url = patroni._get_alternative_patroni_url(attempt_number) assert url.split("http://")[1].split(":8008")[0] in peers_ips diff --git a/tests/unit/test_cluster_topology_observer.py b/tests/unit/test_cluster_topology_observer.py index 3a40668854..f19ace8a33 100644 --- a/tests/unit/test_cluster_topology_observer.py +++ b/tests/unit/test_cluster_topology_observer.py @@ -1,7 +1,7 @@ # Copyright 2023 Canonical Ltd. # See LICENSE file for licensing details. import signal -from typing import Optional +from typing import List, Optional from unittest.mock import Mock, PropertyMock, patch import pytest @@ -51,6 +51,10 @@ def _on_cluster_topology_change(self, _) -> None: def _patroni(self) -> Patroni: return Mock(_patroni_url="http://1.1.1.1:8008/", verify=True) + @property + def _peer_members_ips(self) -> List[str]: + return [] + @property def _peers(self) -> Optional[Relation]: return None