Skip to content

Commit a18b1d3

Browse files
committed
handle error: storage belongs to different cluster
1 parent 927ad24 commit a18b1d3

File tree

5 files changed

+104
-24
lines changed

5 files changed

+104
-24
lines changed

src/charm.py

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -478,6 +478,11 @@ def _on_peer_relation_changed(self, event: HookEvent):
478478
try:
479479
# Update the members of the cluster in the Patroni configuration on this unit.
480480
self.update_config()
481+
if self._patroni.cluster_system_id_mismatch(unit_name=self.unit.name):
482+
self.unit.status = BlockedStatus(
483+
"Failed to start postgresql. The storage belongs to a third-party cluster"
484+
)
485+
return
481486
except RetryError:
482487
self.unit.status = BlockedStatus("failed to update cluster members on member")
483488
return

src/cluster.py

Lines changed: 37 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,7 @@
44

55
"""Helper class used to manage cluster lifecycle."""
66

7+
import glob
78
import logging
89
import os
910
import pwd
@@ -647,3 +648,39 @@ def update_synchronous_node_count(self, units: int = None) -> None:
647648
# Check whether the update was unsuccessful.
648649
if r.status_code != 200:
649650
raise UpdateSyncNodeCountError(f"received {r.status_code}")
651+
652+
def cluster_system_id_mismatch(self, unit_name: str) -> bool:
653+
"""Check if the Patroni service is down.
654+
655+
If there is the error storage belongs to third-party cluster in its logs.
656+
657+
Returns:
658+
"True" if an error occurred due to the fact that the storage belongs to someone else's cluster.
659+
"""
660+
last_log_file = self._last_patroni_log_file()
661+
unit_name = unit_name.replace("/", "-")
662+
if (
663+
f" CRITICAL: system ID mismatch, node {unit_name} belongs to a different cluster:"
664+
in last_log_file
665+
):
666+
return True
667+
return False
668+
669+
def _last_patroni_log_file(self) -> str:
670+
"""Get last log file content of Patroni service.
671+
672+
If there is no available log files, empty line will be returned.
673+
674+
Returns:
675+
Content of last log file of Patroni service.
676+
"""
677+
log_files = glob.glob(f"{PATRONI_LOGS_PATH}/*.log")
678+
if len(log_files) == 0:
679+
return ""
680+
latest_file = max(log_files, key=os.path.getmtime)
681+
try:
682+
with open(latest_file) as last_log_file:
683+
return last_log_file.read()
684+
except OSError as e:
685+
logger.exception("Failed to read last patroni log file", exc_info=e)
686+
return ""

tests/integration/ha_tests/helpers.py

Lines changed: 9 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -807,7 +807,7 @@ def storage_id(ops_test, unit_name):
807807
return line.split()[1]
808808

809809

810-
async def add_unit_with_storage(ops_test, app, storage):
810+
async def add_unit_with_storage(ops_test, app, storage, is_blocked: bool = False):
811811
"""Adds unit with storage.
812812
813813
Note: this function exists as a temporary solution until this issue is resolved:
@@ -820,7 +820,14 @@ async def add_unit_with_storage(ops_test, app, storage):
820820
return_code, _, _ = await ops_test.juju(*add_unit_cmd)
821821
assert return_code == 0, "Failed to add unit with storage"
822822
async with ops_test.fast_forward():
823-
await ops_test.model.wait_for_idle(apps=[app], status="active", timeout=2000)
823+
if is_blocked:
824+
application = ops_test.model.applications[app]
825+
await ops_test.model.block_until(
826+
lambda: "blocked" in {unit.workload_status for unit in application.units},
827+
timeout=1500,
828+
)
829+
else:
830+
await ops_test.model.wait_for_idle(apps=[app], status="active", timeout=1500)
824831
assert (
825832
len(ops_test.model.applications[app].units) == expected_units
826833
), "New unit not added to model"

tests/integration/ha_tests/test_restore_cluster.py

Lines changed: 17 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
11
#!/usr/bin/env python3
22
# Copyright 2023 Canonical Ltd.
33
# See LICENSE file for licensing details.
4+
import asyncio
45
import logging
56

67
import pytest
@@ -37,24 +38,23 @@ async def test_build_and_deploy(ops_test: OpsTest) -> None:
3738
charm = await ops_test.build_charm(".")
3839
async with ops_test.fast_forward():
3940
# Deploy the first cluster with reusable storage
40-
await ops_test.model.deploy(
41-
charm,
42-
application_name=FIRST_APPLICATION,
43-
num_units=3,
44-
series=CHARM_SERIES,
45-
storage={"pgdata": {"pool": "lxd-btrfs", "size": 2048}},
46-
config={"profile": "testing"},
41+
await asyncio.gather(
42+
ops_test.model.deploy(
43+
charm,
44+
application_name=FIRST_APPLICATION,
45+
num_units=3,
46+
series=CHARM_SERIES,
47+
storage={"pgdata": {"pool": "lxd-btrfs", "size": 2048}},
48+
config={"profile": "testing"},
49+
),
50+
ops_test.model.deploy(
51+
charm,
52+
application_name=SECOND_APPLICATION,
53+
num_units=1,
54+
series=CHARM_SERIES,
55+
config={"profile": "testing"},
56+
),
4757
)
48-
49-
# Deploy the second cluster
50-
await ops_test.model.deploy(
51-
charm,
52-
application_name=SECOND_APPLICATION,
53-
num_units=1,
54-
series=CHARM_SERIES,
55-
config={"profile": "testing"},
56-
)
57-
5858
await ops_test.model.wait_for_idle(status="active", timeout=1500)
5959

6060
# TODO have a better way to bootstrap clusters with existing storage

tests/integration/ha_tests/test_self_healing.py

Lines changed: 36 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -57,6 +57,7 @@
5757
validate_test_data,
5858
wait_network_restore,
5959
)
60+
from .test_restore_cluster import SECOND_APPLICATION
6061

6162
logger = logging.getLogger(__name__)
6263

@@ -79,7 +80,15 @@ async def test_build_and_deploy(ops_test: OpsTest) -> None:
7980
async with ops_test.fast_forward():
8081
await ops_test.model.deploy(
8182
charm,
82-
num_units=3,
83+
num_units=1,
84+
series=CHARM_SERIES,
85+
storage={"pgdata": {"pool": "lxd-btrfs", "size": 2048}},
86+
config={"profile": "testing"},
87+
)
88+
await ops_test.model.deploy(
89+
charm,
90+
num_units=1,
91+
application_name=SECOND_APPLICATION,
8392
series=CHARM_SERIES,
8493
storage={"pgdata": {"pool": "lxd-btrfs", "size": 2048}},
8594
config={"profile": "testing"},
@@ -552,10 +561,9 @@ async def test_network_cut_without_ip_change(
552561

553562

554563
@pytest.mark.group(1)
555-
async def test_deploy_zero_units(ops_test: OpsTest):
564+
async def test_deploy_zero_units(ops_test: OpsTest, charm):
556565
"""Scale the database to zero units and scale up again."""
557566
app = await app_name(ops_test)
558-
559567
dbname = f"{APPLICATION_NAME.replace('-', '_')}_first_database"
560568
connection_string, _ = await get_db_connection(ops_test, dbname=dbname)
561569

@@ -577,12 +585,20 @@ async def test_deploy_zero_units(ops_test: OpsTest):
577585
unit_ip_addresses.append(await get_unit_ip(ops_test, unit.name))
578586

579587
# Save detached storage ID
580-
if await unit.is_leader_from_status:
588+
if await unit.is_leader_from_status():
581589
primary_storage = storage_id(ops_test, unit.name)
582590

591+
logger.info(f"get storage id app: {SECOND_APPLICATION}")
592+
second_storage = ""
593+
for unit in ops_test.model.applications[SECOND_APPLICATION].units:
594+
if await unit.is_leader_from_status():
595+
second_storage = storage_id(ops_test, unit.name)
596+
break
597+
583598
# Scale the database to zero units.
584599
logger.info("scaling database to zero units")
585600
await scale_application(ops_test, app, 0)
601+
await scale_application(ops_test, SECOND_APPLICATION, 0)
586602

587603
# Checking shutdown units.
588604
for unit_ip in unit_ip_addresses:
@@ -599,7 +615,9 @@ async def test_deploy_zero_units(ops_test: OpsTest):
599615
# Scale up to one unit.
600616
logger.info("scaling database to one unit")
601617
await add_unit_with_storage(ops_test, app=app, storage=primary_storage)
602-
await ops_test.model.wait_for_idle(status="active", timeout=1500)
618+
await ops_test.model.wait_for_idle(
619+
apps=[APP_NAME, APPLICATION_NAME], status="active", timeout=1500
620+
)
603621

604622
connection_string, _ = await get_db_connection(ops_test, dbname=dbname)
605623
logger.info("checking whether writes are increasing")
@@ -608,6 +626,19 @@ async def test_deploy_zero_units(ops_test: OpsTest):
608626
logger.info("check test database data")
609627
await validate_test_data(connection_string)
610628

629+
logger.info("database scaling up to two units using third-party cluster storage")
630+
new_unit = await add_unit_with_storage(
631+
ops_test, app=app, storage=second_storage, is_blocked=True
632+
)
633+
634+
logger.info(f"remove unit {new_unit.name} with storage from application {SECOND_APPLICATION}")
635+
await ops_test.model.destroy_units(new_unit.name)
636+
637+
await are_writes_increasing(ops_test)
638+
639+
logger.info("check test database data")
640+
await validate_test_data(connection_string)
641+
611642
# Scale up to two units.
612643
logger.info("scaling database to two unit")
613644
prev_units = [unit.name for unit in ops_test.model.applications[app].units]

0 commit comments

Comments
 (0)