diff --git a/src/charm.py b/src/charm.py index 535bff80e1..29d108b121 100755 --- a/src/charm.py +++ b/src/charm.py @@ -83,6 +83,7 @@ TLS_CERT_FILE, TLS_KEY_FILE, UNIT_SCOPE, + UPGRADE_RELATION, USER, USER_PASSWORD_KEY, ) @@ -96,6 +97,7 @@ PRIMARY_NOT_REACHABLE_MESSAGE = "waiting for primary to be reachable from this unit" EXTENSIONS_DEPENDENCY_MESSAGE = "Unsatisfied plugin dependencies. Please check the logs" +DIFFERENT_VERSIONS_PSQL_BLOCKING_MESSAGE = "Please select the correct version of postgresql to use. You cannot use different versions of postgresql!" Scopes = Literal[APP_SCOPE, UNIT_SCOPE] @@ -144,6 +146,9 @@ def __init__(self, *args): self.framework.observe(self.on.config_changed, self._on_config_changed) self.framework.observe(self.on.get_primary_action, self._on_get_primary) self.framework.observe(self.on[PEER].relation_changed, self._on_peer_relation_changed) + self.framework.observe( + self.on[UPGRADE_RELATION].relation_changed, self._on_upgrade_relation_changed + ) self.framework.observe(self.on.secret_changed, self._on_peer_relation_changed) self.framework.observe(self.on[PEER].relation_departed, self._on_peer_relation_departed) self.framework.observe(self.on.pgdata_storage_detaching, self._on_pgdata_storage_detaching) @@ -497,6 +502,11 @@ def _on_peer_relation_changed(self, event: HookEvent): try: # Update the members of the cluster in the Patroni configuration on this unit. self.update_config() + if self._patroni.cluster_system_id_mismatch(unit_name=self.unit.name): + self.unit.status = BlockedStatus( + "Failed to start postgresql. The storage belongs to a third-party cluster" + ) + return except RetryError: self.unit.status = BlockedStatus("failed to update cluster members on member") return @@ -537,6 +547,19 @@ def _on_peer_relation_changed(self, event: HookEvent): self._update_new_unit_status() + self._validate_database_version() + + def _on_upgrade_relation_changed(self, event: HookEvent): + if not self.unit.is_leader(): + return + + if self.upgrade.idle: + logger.debug("Defer _on_upgrade_relation_changed: upgrade in progress") + event.defer() + return + + self._set_workload_version(self._patroni.get_postgresql_version()) + # Split off into separate function, because of complexity _on_peer_relation_changed def _start_stop_pgbackrest_service(self, event: HookEvent) -> None: # Start or stop the pgBackRest TLS server service when TLS certificate change. @@ -1034,7 +1057,7 @@ def _on_start(self, event: StartEvent) -> None: self.unit_peer_data.update({"ip": self.get_hostname_by_unit(None)}) - self.unit.set_workload_version(self._patroni.get_postgresql_version()) + self._set_workload_version(self._patroni.get_postgresql_version()) # Open port try: @@ -1640,6 +1663,26 @@ def client_relations(self) -> List[Relation]: relations.append(relation) return relations + def _set_workload_version(self, psql_version): + """Record the version of the software running as the workload. Also writes the version into the databags.""" + self.unit.set_workload_version(psql_version) + if self.unit.is_leader(): + self.app_peer_data.update({"database-version": psql_version}) + + def _validate_database_version(self): + """Checking that only one version of Postgres is used.""" + peer_db_version = self.app_peer_data.get("database-version") + + if self.unit.is_leader() and peer_db_version is None: + _psql_version = self._patroni.get_postgresql_version() + if _psql_version is not None: + self.app_peer_data.update({"database-version": _psql_version}) + return + + if peer_db_version != self._patroni.get_postgresql_version(): + self.unit.status = BlockedStatus(DIFFERENT_VERSIONS_PSQL_BLOCKING_MESSAGE) + return + if __name__ == "__main__": main(PostgresqlOperatorCharm) diff --git a/src/cluster.py b/src/cluster.py index 4b08ff5a58..20047bf880 100644 --- a/src/cluster.py +++ b/src/cluster.py @@ -4,6 +4,7 @@ """Helper class used to manage cluster lifecycle.""" +import glob import logging import os import pwd @@ -713,3 +714,39 @@ def update_synchronous_node_count(self, units: int = None) -> None: # Check whether the update was unsuccessful. if r.status_code != 200: raise UpdateSyncNodeCountError(f"received {r.status_code}") + + def cluster_system_id_mismatch(self, unit_name: str) -> bool: + """Check if the Patroni service is down. + + If there is the error storage belongs to third-party cluster in its logs. + + Returns: + "True" if an error occurred due to the fact that the storage belongs to someone else's cluster. + """ + last_log_file = self._last_patroni_log_file() + unit_name = unit_name.replace("/", "-") + if ( + f" CRITICAL: system ID mismatch, node {unit_name} belongs to a different cluster:" + in last_log_file + ): + return True + return False + + def _last_patroni_log_file(self) -> str: + """Get last log file content of Patroni service. + + If there is no available log files, empty line will be returned. + + Returns: + Content of last log file of Patroni service. + """ + log_files = glob.glob(f"{PATRONI_LOGS_PATH}/*.log") + if len(log_files) == 0: + return "" + latest_file = max(log_files, key=os.path.getmtime) + try: + with open(latest_file) as last_log_file: + return last_log_file.read() + except OSError as e: + logger.exception("Failed to read last patroni log file", exc_info=e) + return "" diff --git a/src/constants.py b/src/constants.py index c157accc04..2a727b08d2 100644 --- a/src/constants.py +++ b/src/constants.py @@ -10,6 +10,7 @@ LEGACY_DB = "db" LEGACY_DB_ADMIN = "db-admin" PEER = "database-peers" +UPGRADE_RELATION = "upgrade" ALL_CLIENT_RELATIONS = [DATABASE, LEGACY_DB, LEGACY_DB_ADMIN] ALL_LEGACY_RELATIONS = [LEGACY_DB, LEGACY_DB_ADMIN] API_REQUEST_TIMEOUT = 5 diff --git a/tests/integration/ha_tests/helpers.py b/tests/integration/ha_tests/helpers.py index 98e0589b18..650015cafd 100644 --- a/tests/integration/ha_tests/helpers.py +++ b/tests/integration/ha_tests/helpers.py @@ -41,6 +41,7 @@ PATRONI_SERVICE_DEFAULT_PATH = f"/etc/systemd/system/{SERVICE_NAME}" RESTART_CONDITION = "no" ORIGINAL_RESTART_CONDITION = "always" +SECOND_APPLICATION = "second-cluster" class MemberNotListedOnClusterError(Exception): @@ -897,11 +898,20 @@ def storage_id(ops_test, unit_name): return line.split()[1] -async def add_unit_with_storage(ops_test, app, storage): +async def add_unit_with_storage( + ops_test, app, storage, is_blocked: bool = False, blocked_message: str = "" +): """Adds unit with storage. Note: this function exists as a temporary solution until this issue is resolved: https://github.com/juju/python-libjuju/issues/695 + + Args: + ops_test: The ops test framework instance + app: The name of the application + storage: Unique storage identifier + is_blocked: Checking blocked status + blocked_message: Check message in blocked status """ expected_units = len(ops_test.model.applications[app].units) + 1 prev_units = [unit.name for unit in ops_test.model.applications[app].units] @@ -910,7 +920,22 @@ async def add_unit_with_storage(ops_test, app, storage): return_code, _, _ = await ops_test.juju(*add_unit_cmd) assert return_code == 0, "Failed to add unit with storage" async with ops_test.fast_forward(): - await ops_test.model.wait_for_idle(apps=[app], status="active", timeout=2000) + if is_blocked: + assert ( + is_blocked and blocked_message != "" + ), "The blocked status check should be checked along with the message" + application = ops_test.model.applications[app] + await ops_test.model.block_until( + lambda: any( + unit.workload_status == "blocked" + and unit.workload_status_message == blocked_message + for unit in application.units + ), + # "blocked" in {unit.workload_status for unit in application.units}, + timeout=1500, + ) + else: + await ops_test.model.wait_for_idle(apps=[app], status="active", timeout=1500) assert ( len(ops_test.model.applications[app].units) == expected_units ), "New unit not added to model" @@ -954,6 +979,84 @@ async def reused_full_cluster_recovery_storage(ops_test: OpsTest, unit_name) -> return True +async def get_db_connection(ops_test, dbname, is_primary=True, replica_unit_name=""): + """Returns a PostgreSQL connection string. + + Args: + ops_test: The ops test framework instance + dbname: The name of the database + is_primary: Whether to use a primary unit (default is True, so it uses the primary + replica_unit_name: The name of the replica unit + + Returns: + a PostgreSQL connection string + """ + unit_name = await get_primary(ops_test, APP_NAME) + password = await get_password(ops_test, APP_NAME) + address = get_unit_address(ops_test, unit_name) + if not is_primary and replica_unit_name != "": + unit_name = replica_unit_name + address = ops_test.model.applications[APP_NAME].units[unit_name].public_address + connection_string = ( + f"dbname='{dbname}' user='operator'" + f" host='{address}' password='{password}' connect_timeout=10" + ) + return connection_string, unit_name + + +async def validate_test_data(connection_string): + """Checking test data. + + Args: + connection_string: Database connection string + """ + with psycopg2.connect(connection_string) as connection: + connection.autocommit = True + with connection.cursor() as cursor: + cursor.execute("SELECT data FROM test;") + data = cursor.fetchone() + assert data[0] == "some data" + connection.close() + + +async def create_test_data(connection_string): + """Creating test data in the database. + + Args: + connection_string: Database connection string + """ + with psycopg2.connect(connection_string) as connection: + connection.autocommit = True + with connection.cursor() as cursor: + # Check that it's possible to write and read data from the database that + # was created for the application. + cursor.execute("DROP TABLE IF EXISTS test;") + cursor.execute("CREATE TABLE test(data TEXT);") + cursor.execute("INSERT INTO test(data) VALUES('some data');") + cursor.execute("SELECT data FROM test;") + data = cursor.fetchone() + assert data[0] == "some data" + connection.close() + + +async def get_last_added_unit(ops_test, app, prev_units): + """Returns a unit. + + Args: + ops_test: The ops test framework instance + app: The name of the application + prev_units: List of unit names before adding the last unit + + Returns: + last added unit + """ + curr_units = [unit.name for unit in ops_test.model.applications[app].units] + new_unit = list(set(curr_units) - set(prev_units))[0] + for unit in ops_test.model.applications[app].units: + if new_unit == unit.name: + return unit + + async def is_storage_exists(ops_test: OpsTest, storage_id: str) -> bool: """Returns True if storage exists by provided storage ID.""" complete_command = [ diff --git a/tests/integration/ha_tests/test_restore_cluster.py b/tests/integration/ha_tests/test_restore_cluster.py index d6af07e251..ed071c79f6 100644 --- a/tests/integration/ha_tests/test_restore_cluster.py +++ b/tests/integration/ha_tests/test_restore_cluster.py @@ -1,6 +1,7 @@ #!/usr/bin/env python3 # Copyright 2023 Canonical Ltd. # See LICENSE file for licensing details. +import asyncio import logging import pytest @@ -16,13 +17,13 @@ set_password, ) from .helpers import ( + SECOND_APPLICATION, add_unit_with_storage, reused_full_cluster_recovery_storage, storage_id, ) FIRST_APPLICATION = "first-cluster" -SECOND_APPLICATION = "second-cluster" logger = logging.getLogger(__name__) @@ -37,24 +38,23 @@ async def test_build_and_deploy(ops_test: OpsTest) -> None: charm = await ops_test.build_charm(".") async with ops_test.fast_forward(): # Deploy the first cluster with reusable storage - await ops_test.model.deploy( - charm, - application_name=FIRST_APPLICATION, - num_units=3, - series=CHARM_SERIES, - storage={"pgdata": {"pool": "lxd-btrfs", "size": 2048}}, - config={"profile": "testing"}, + await asyncio.gather( + ops_test.model.deploy( + charm, + application_name=FIRST_APPLICATION, + num_units=3, + series=CHARM_SERIES, + storage={"pgdata": {"pool": "lxd-btrfs", "size": 2048}}, + config={"profile": "testing"}, + ), + ops_test.model.deploy( + charm, + application_name=SECOND_APPLICATION, + num_units=1, + series=CHARM_SERIES, + config={"profile": "testing"}, + ), ) - - # Deploy the second cluster - await ops_test.model.deploy( - charm, - application_name=SECOND_APPLICATION, - num_units=1, - series=CHARM_SERIES, - config={"profile": "testing"}, - ) - await ops_test.model.wait_for_idle(status="active", timeout=1500) # TODO have a better way to bootstrap clusters with existing storage diff --git a/tests/integration/ha_tests/test_self_healing.py b/tests/integration/ha_tests/test_self_healing.py index 63d5b5abaa..fbb46aa445 100644 --- a/tests/integration/ha_tests/test_self_healing.py +++ b/tests/integration/ha_tests/test_self_healing.py @@ -5,6 +5,7 @@ import logging import pytest +from pip._vendor import requests from pytest_operator.plugin import OpsTest from tenacity import Retrying, stop_after_delay, wait_fixed @@ -15,11 +16,13 @@ get_password, get_unit_address, run_command_on_unit, + scale_application, ) from .conftest import APPLICATION_NAME from .helpers import ( METADATA, ORIGINAL_RESTART_CONDITION, + SECOND_APPLICATION, add_unit_with_storage, app_name, are_all_db_processes_down, @@ -27,10 +30,13 @@ change_patroni_setting, change_wal_settings, check_writes, + create_test_data, cut_network_from_unit, cut_network_from_unit_without_ip_change, fetch_cluster_members, get_controller_machine, + get_db_connection, + get_last_added_unit, get_patroni_setting, get_primary, get_unit_ip, @@ -49,6 +55,7 @@ storage_id, storage_type, update_restart_condition, + validate_test_data, wait_network_restore, ) @@ -58,6 +65,9 @@ PATRONI_PROCESS = "/snap/charmed-postgresql/[0-9]*/usr/bin/patroni" POSTGRESQL_PROCESS = "postgres" DB_PROCESSES = [POSTGRESQL_PROCESS, PATRONI_PROCESS] +ENDPOINT_SIMULTANEOUSLY_BLOCKING_MESSAGE = ( + "Please choose one endpoint to use. No need to relate all of them simultaneously!" +) @pytest.mark.group(1) @@ -78,6 +88,14 @@ async def test_build_and_deploy(ops_test: OpsTest) -> None: storage={"pgdata": {"pool": "lxd-btrfs", "size": 2048}}, config={"profile": "testing"}, ) + await ops_test.model.deploy( + charm, + num_units=1, + application_name=SECOND_APPLICATION, + series=CHARM_SERIES, + storage={"pgdata": {"pool": "lxd-btrfs", "size": 2048}}, + config={"profile": "testing"}, + ) # Deploy the continuous writes application charm if it wasn't already deployed. if not await app_name(ops_test, APPLICATION_NAME): wait_for_apps = True @@ -543,3 +561,123 @@ async def test_network_cut_without_ip_change( ), "Connection is not possible after network restore" await is_cluster_updated(ops_test, primary_name, use_ip_from_inside=True) + + +@pytest.mark.group(1) +async def test_deploy_zero_units(ops_test: OpsTest, charm): + """Scale the database to zero units and scale up again.""" + app = await app_name(ops_test) + dbname = f"{APPLICATION_NAME.replace('-', '_')}_first_database" + connection_string, _ = await get_db_connection(ops_test, dbname=dbname) + + # Start an application that continuously writes data to the database. + await start_continuous_writes(ops_test, app) + + logger.info("checking whether writes are increasing") + await are_writes_increasing(ops_test) + + # Connect to the database. + # Create test data. + logger.info("connect to DB and create test table") + await create_test_data(connection_string) + + # Test to check the use of different versions postgresql. + # Release of a new version of charm with another version of postgresql, + # it is necessary to implement a test that will check the use of different versions of postgresql. + + unit_ip_addresses = [] + primary_storage = "" + for unit in ops_test.model.applications[app].units: + # Save IP addresses of units + unit_ip_addresses.append(await get_unit_ip(ops_test, unit.name)) + + # Save detached storage ID + if await unit.is_leader_from_status(): + primary_storage = storage_id(ops_test, unit.name) + + logger.info(f"get storage id app: {SECOND_APPLICATION}") + second_storage = "" + for unit in ops_test.model.applications[SECOND_APPLICATION].units: + if await unit.is_leader_from_status(): + second_storage = storage_id(ops_test, unit.name) + break + + # Scale the database to zero units. + logger.info("scaling database to zero units") + await scale_application(ops_test, app, 0) + await scale_application(ops_test, SECOND_APPLICATION, 0) + + # Checking shutdown units. + for unit_ip in unit_ip_addresses: + try: + resp = requests.get(f"http://{unit_ip}:8008") + assert ( + resp.status_code != 200 + ), f"status code = {resp.status_code}, message = {resp.text}" + except requests.exceptions.ConnectionError: + assert True, f"unit host = http://{unit_ip}:8008, all units shutdown" + except Exception as e: + assert False, f"{e} unit host = http://{unit_ip}:8008, something went wrong" + + # Scale up to one unit. + logger.info("scaling database to one unit") + await add_unit_with_storage(ops_test, app=app, storage=primary_storage) + await ops_test.model.wait_for_idle( + apps=[APP_NAME, APPLICATION_NAME], status="active", timeout=1500 + ) + + connection_string, _ = await get_db_connection(ops_test, dbname=dbname) + logger.info("checking whether writes are increasing") + await are_writes_increasing(ops_test) + + logger.info("check test database data") + await validate_test_data(connection_string) + + logger.info("database scaling up to two units using third-party cluster storage") + new_unit = await add_unit_with_storage( + ops_test, + app=app, + storage=second_storage, + is_blocked=True, + blocked_message=ENDPOINT_SIMULTANEOUSLY_BLOCKING_MESSAGE, + ) + + logger.info(f"remove unit {new_unit.name} with storage from application {SECOND_APPLICATION}") + await ops_test.model.destroy_units(new_unit.name) + + await are_writes_increasing(ops_test) + + logger.info("check test database data") + await validate_test_data(connection_string) + + # Scale up to two units. + logger.info("scaling database to two unit") + prev_units = [unit.name for unit in ops_test.model.applications[app].units] + await scale_application(ops_test, application_name=app, count=2) + unit = await get_last_added_unit(ops_test, app, prev_units) + + logger.info(f"check test database data of unit name {unit.name}") + connection_string, _ = await get_db_connection( + ops_test, dbname=dbname, is_primary=False, replica_unit_name=unit.name + ) + await validate_test_data(connection_string) + assert await reused_replica_storage( + ops_test, unit_name=unit.name + ), "attached storage not properly re-used by Postgresql." + + # Scale up to three units. + logger.info("scaling database to three unit") + prev_units = [unit.name for unit in ops_test.model.applications[app].units] + await scale_application(ops_test, application_name=app, count=3) + unit = await get_last_added_unit(ops_test, app, prev_units) + + logger.info(f"check test database data of unit name {unit.name}") + connection_string, _ = await get_db_connection( + ops_test, dbname=dbname, is_primary=False, replica_unit_name=unit.name + ) + await validate_test_data(connection_string) + assert await reused_replica_storage( + ops_test, unit_name=unit.name + ), "attached storage not properly re-used by Postgresql." + + await check_writes(ops_test) diff --git a/tests/unit/test_charm.py b/tests/unit/test_charm.py index 7ab182447f..2e6ed29d46 100644 --- a/tests/unit/test_charm.py +++ b/tests/unit/test_charm.py @@ -1413,7 +1413,11 @@ def test_on_peer_relation_changed(harness): harness.update_relation_data( rel_id, harness.charm.app.name, - {"cluster_initialised": "True", "members_ips": '["1.1.1.1"]'}, + { + "cluster_initialised": "True", + "members_ips": '["1.1.1.1"]', + "database_version": "14", + }, ) harness.set_leader() _reconfigure_cluster.return_value = False