Skip to content

Commit 842aa47

Browse files
authored
[DPE-5827] Set all nodes to synchronous replicas (#784)
* Update patroni configuration * Update test assertion * Copy update_synchronous_node_count from VM * Add unit test * Set sync node count during upgrade * Fix tls test * Switchover primary * Add different helper to get leader * Add config boilerplate * Use config value when setting sync node count * Escape tuple * Add policy values * Add integration test * Fix casting * Fix test * Update to spec * Bump retry timout * Switch to planned units * Fix generator * Update conf description * Spread task * Pass the charm
1 parent 27892b0 commit 842aa47

File tree

16 files changed

+249
-17
lines changed

16 files changed

+249
-17
lines changed

config.yaml

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,12 @@
22
# See LICENSE file for licensing details.
33

44
options:
5+
synchronous_node_count:
6+
description: |
7+
Sets the number of synchronous nodes to be maintained in the cluster. Should be
8+
either "all", "majority" or a positive integer value.
9+
type: string
10+
default: "all"
511
durability_synchronous_commit:
612
description: |
713
Sets the current transactions synchronization level. This charm allows only the

src/charm.py

Lines changed: 18 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -470,13 +470,22 @@ def get_unit_ip(self, unit: Unit) -> str | None:
470470
else:
471471
return None
472472

473+
def updated_synchronous_node_count(self) -> bool:
474+
"""Tries to update synchronous_node_count configuration and reports the result."""
475+
try:
476+
self._patroni.update_synchronous_node_count()
477+
return True
478+
except RetryError:
479+
logger.debug("Unable to set synchronous_node_count")
480+
return False
481+
473482
def _on_peer_relation_departed(self, event: RelationDepartedEvent) -> None:
474483
"""The leader removes the departing units from the list of cluster members."""
475484
# Allow leader to update endpoints if it isn't leaving.
476485
if not self.unit.is_leader() or event.departing_unit == self.unit:
477486
return
478487

479-
if not self.is_cluster_initialised:
488+
if not self.is_cluster_initialised or not self.updated_synchronous_node_count():
480489
logger.debug(
481490
"Deferring on_peer_relation_departed: Cluster must be initialized before members can leave"
482491
)
@@ -680,6 +689,10 @@ def _on_config_changed(self, event) -> None:
680689
self.unit.status = BlockedStatus("Configuration Error. Please check the logs")
681690
logger.error("Invalid configuration: %s", str(e))
682691
return
692+
if not self.updated_synchronous_node_count():
693+
logger.debug("Defer on_config_changed: unable to set synchronous node count")
694+
event.defer()
695+
return
683696

684697
if self.is_blocked and "Configuration Error" in self.unit.status.message:
685698
self._set_active_status()
@@ -693,6 +706,9 @@ def _on_config_changed(self, event) -> None:
693706
# Enable and/or disable the extensions.
694707
self.enable_disable_extensions()
695708

709+
self._unblock_extensions()
710+
711+
def _unblock_extensions(self) -> None:
696712
# Unblock the charm after extensions are enabled (only if it's blocked due to application
697713
# charms requesting extensions).
698714
if self.unit.status.message != EXTENSIONS_BLOCKING_MESSAGE:
@@ -803,6 +819,7 @@ def _add_members(self, event) -> None:
803819
for member in self._hosts - self._patroni.cluster_members:
804820
logger.debug("Adding %s to cluster", member)
805821
self.add_cluster_member(member)
822+
self._patroni.update_synchronous_node_count()
806823
except NotReadyError:
807824
logger.info("Deferring reconfigure: another member doing sync right now")
808825
event.defer()

src/config.py

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -5,16 +5,18 @@
55
"""Structured configuration for the PostgreSQL charm."""
66

77
import logging
8+
from typing import Literal
89

910
from charms.data_platform_libs.v0.data_models import BaseConfigModel
10-
from pydantic import validator
11+
from pydantic import PositiveInt, validator
1112

1213
logger = logging.getLogger(__name__)
1314

1415

1516
class CharmConfig(BaseConfigModel):
1617
"""Manager for the structured configuration."""
1718

19+
synchronous_node_count: Literal["all", "majority"] | PositiveInt
1820
durability_synchronous_commit: str | None
1921
instance_default_text_search_config: str | None
2022
instance_max_locks_per_transaction: int | None

src/patroni.py

Lines changed: 35 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -53,6 +53,10 @@ class SwitchoverFailedError(Exception):
5353
"""Raised when a switchover failed for some reason."""
5454

5555

56+
class UpdateSyncNodeCountError(Exception):
57+
"""Raised when updating synchronous_node_count failed for some reason."""
58+
59+
5660
class Patroni:
5761
"""This class handles the communication with Patroni API and configuration files."""
5862

@@ -126,6 +130,36 @@ def _get_alternative_patroni_url(
126130
url = self._patroni_url
127131
return url
128132

133+
@property
134+
def _synchronous_node_count(self) -> int:
135+
planned_units = self._charm.app.planned_units()
136+
if self._charm.config.synchronous_node_count == "all":
137+
return planned_units - 1
138+
elif self._charm.config.synchronous_node_count == "majority":
139+
return planned_units // 2
140+
return (
141+
self._charm.config.synchronous_node_count
142+
if self._charm.config.synchronous_node_count < self._members_count - 1
143+
else planned_units - 1
144+
)
145+
146+
def update_synchronous_node_count(self) -> None:
147+
"""Update synchronous_node_count."""
148+
# Try to update synchronous_node_count.
149+
for attempt in Retrying(stop=stop_after_delay(60), wait=wait_fixed(3)):
150+
with attempt:
151+
r = requests.patch(
152+
f"{self._patroni_url}/config",
153+
json={"synchronous_node_count": self._synchronous_node_count},
154+
verify=self._verify,
155+
auth=self._patroni_auth,
156+
timeout=PATRONI_TIMEOUT,
157+
)
158+
159+
# Check whether the update was unsuccessful.
160+
if r.status_code != 200:
161+
raise UpdateSyncNodeCountError(f"received {r.status_code}")
162+
129163
def get_primary(
130164
self, unit_name_pattern=False, alternative_endpoints: list[str] | None = None
131165
) -> str:
@@ -525,7 +559,7 @@ def render_patroni_yml_file(
525559
restore_to_latest=restore_to_latest,
526560
stanza=stanza,
527561
restore_stanza=restore_stanza,
528-
minority_count=self._members_count // 2,
562+
synchronous_node_count=self._synchronous_node_count,
529563
version=self.rock_postgresql_version.split(".")[0],
530564
pg_parameters=parameters,
531565
primary_cluster_endpoint=self._charm.async_replication.get_primary_cluster_endpoint(),

src/upgrade.py

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -152,6 +152,7 @@ def _on_upgrade_changed(self, event) -> None:
152152
return
153153

154154
self.charm.update_config()
155+
self.charm.updated_synchronous_node_count()
155156

156157
def _on_upgrade_charm_check_legacy(self, event: UpgradeCharmEvent) -> None:
157158
if not self.peer_relation:

templates/patroni.yml.j2

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,7 @@ bootstrap:
22
dcs:
33
synchronous_mode: true
44
failsafe_mode: true
5-
synchronous_node_count: {{ minority_count }}
5+
synchronous_node_count: {{ synchronous_node_count }}
66
postgresql:
77
use_pg_rewind: true
88
remove_data_directory_on_rewind_failure: true

tests/integration/ha_tests/helpers.py

Lines changed: 41 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -501,6 +501,26 @@ async def get_postgresql_parameter(ops_test: OpsTest, parameter_name: str) -> in
501501
return parameter_value
502502

503503

504+
async def get_leader(model: Model, application_name: str) -> str:
505+
"""Get the standby leader name.
506+
507+
Args:
508+
model: the model instance.
509+
application_name: the name of the application to get the value for.
510+
511+
Returns:
512+
the name of the standby leader.
513+
"""
514+
status = await model.get_status()
515+
first_unit_ip = next(
516+
unit for unit in status["applications"][application_name]["units"].values()
517+
)["address"]
518+
cluster = get_patroni_cluster(first_unit_ip)
519+
for member in cluster["members"]:
520+
if member["role"] == "leader":
521+
return member["name"]
522+
523+
504524
async def get_standby_leader(model: Model, application_name: str) -> str:
505525
"""Get the standby leader name.
506526
@@ -1145,3 +1165,24 @@ async def remove_unit_force(ops_test: OpsTest, num_units: int):
11451165
timeout=1000,
11461166
wait_for_exact_units=scale,
11471167
)
1168+
1169+
1170+
async def get_cluster_roles(
1171+
ops_test: OpsTest, unit_name: str
1172+
) -> dict[str, str | list[str] | None]:
1173+
"""Returns whether the unit a replica in the cluster."""
1174+
unit_ip = await get_unit_address(ops_test, unit_name)
1175+
members = {"replicas": [], "primaries": [], "sync_standbys": []}
1176+
member_list = get_patroni_cluster(unit_ip)["members"]
1177+
logger.info(f"Cluster members are: {member_list}")
1178+
for member in member_list:
1179+
role = member["role"]
1180+
name = "/".join(member["name"].rsplit("-", 1))
1181+
if role == "leader":
1182+
members["primaries"].append(name)
1183+
elif role == "sync_standby":
1184+
members["sync_standbys"].append(name)
1185+
else:
1186+
members["replicas"].append(name)
1187+
1188+
return members

tests/integration/ha_tests/test_async_replication.py

Lines changed: 8 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -30,8 +30,8 @@
3030
from .helpers import (
3131
are_writes_increasing,
3232
check_writes,
33+
get_leader,
3334
get_standby_leader,
34-
get_sync_standby,
3535
start_continuous_writes,
3636
)
3737

@@ -406,11 +406,11 @@ async def test_async_replication_failover_in_main_cluster(
406406
logger.info("checking whether writes are increasing")
407407
await are_writes_increasing(ops_test)
408408

409-
sync_standby = await get_sync_standby(first_model, DATABASE_APP_NAME)
410-
logger.info(f"Sync-standby: {sync_standby}")
411-
logger.info("deleting the sync-standby pod")
409+
primary = await get_leader(first_model, DATABASE_APP_NAME)
410+
logger.info(f"Primary: {primary}")
411+
logger.info("deleting the primary pod")
412412
client = Client(namespace=first_model.info.name)
413-
client.delete(Pod, name=sync_standby.replace("/", "-"))
413+
client.delete(Pod, name=primary.replace("/", "-"))
414414

415415
async with ops_test.fast_forward(FAST_INTERVAL), fast_forward(second_model, FAST_INTERVAL):
416416
await gather(
@@ -423,9 +423,9 @@ async def test_async_replication_failover_in_main_cluster(
423423
)
424424

425425
# Check that the sync-standby unit is not the same as before.
426-
new_sync_standby = await get_sync_standby(first_model, DATABASE_APP_NAME)
427-
logger.info(f"New sync-standby: {new_sync_standby}")
428-
assert new_sync_standby != sync_standby, "Sync-standby is the same as before"
426+
new_primary = await get_leader(first_model, DATABASE_APP_NAME)
427+
logger.info(f"New sync-standby: {new_primary}")
428+
assert new_primary != primary, "Sync-standby is the same as before"
429429

430430
logger.info("Ensure continuous_writes after the crashed unit")
431431
await are_writes_increasing(ops_test)
Lines changed: 79 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,79 @@
1+
#!/usr/bin/env python3
2+
# Copyright 2024 Canonical Ltd.
3+
# See LICENSE file for licensing details.
4+
import pytest
5+
from pytest_operator.plugin import OpsTest
6+
from tenacity import Retrying, stop_after_attempt, wait_fixed
7+
8+
from ..helpers import app_name, build_and_deploy
9+
from .helpers import get_cluster_roles
10+
11+
12+
@pytest.mark.abort_on_fail
13+
async def test_build_and_deploy(ops_test: OpsTest, charm) -> None:
14+
"""Build and deploy three unit of PostgreSQL."""
15+
wait_for_apps = False
16+
# It is possible for users to provide their own cluster for HA testing. Hence, check if there
17+
# is a pre-existing cluster.
18+
if not await app_name(ops_test):
19+
wait_for_apps = True
20+
await build_and_deploy(ops_test, charm, 3, wait_for_idle=False)
21+
22+
if wait_for_apps:
23+
async with ops_test.fast_forward():
24+
await ops_test.model.wait_for_idle(status="active", timeout=1000, raise_on_error=False)
25+
26+
27+
async def test_default_all(ops_test: OpsTest) -> None:
28+
app = await app_name(ops_test)
29+
30+
async with ops_test.fast_forward():
31+
await ops_test.model.wait_for_idle(apps=[app], status="active", timeout=300)
32+
33+
for attempt in Retrying(stop=stop_after_attempt(3), wait=wait_fixed(5), reraise=True):
34+
with attempt:
35+
roles = await get_cluster_roles(
36+
ops_test, ops_test.model.applications[app].units[0].name
37+
)
38+
39+
assert len(roles["primaries"]) == 1
40+
assert len(roles["sync_standbys"]) == 2
41+
assert len(roles["replicas"]) == 0
42+
43+
44+
async def test_majority(ops_test: OpsTest) -> None:
45+
app = await app_name(ops_test)
46+
47+
await ops_test.model.applications[app].set_config({"synchronous_node_count": "majority"})
48+
49+
async with ops_test.fast_forward():
50+
await ops_test.model.wait_for_idle(apps=[app], status="active")
51+
52+
for attempt in Retrying(stop=stop_after_attempt(3), wait=wait_fixed(5), reraise=True):
53+
with attempt:
54+
roles = await get_cluster_roles(
55+
ops_test, ops_test.model.applications[app].units[0].name
56+
)
57+
58+
assert len(roles["primaries"]) == 1
59+
assert len(roles["sync_standbys"]) == 1
60+
assert len(roles["replicas"]) == 1
61+
62+
63+
async def test_constant(ops_test: OpsTest) -> None:
64+
app = await app_name(ops_test)
65+
66+
await ops_test.model.applications[app].set_config({"synchronous_node_count": "2"})
67+
68+
async with ops_test.fast_forward():
69+
await ops_test.model.wait_for_idle(apps=[app], status="active", timeout=300)
70+
71+
for attempt in Retrying(stop=stop_after_attempt(3), wait=wait_fixed(5), reraise=True):
72+
with attempt:
73+
roles = await get_cluster_roles(
74+
ops_test, ops_test.model.applications[app].units[0].name
75+
)
76+
77+
assert len(roles["primaries"]) == 1
78+
assert len(roles["sync_standbys"]) == 2
79+
assert len(roles["replicas"]) == 0

tests/integration/helpers.py

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -754,15 +754,14 @@ async def switchover(
754754
)
755755
assert response.status_code == 200, f"Switchover status code is {response.status_code}"
756756
app_name = current_primary.split("/")[0]
757-
minority_count = len(ops_test.model.applications[app_name].units) // 2
758757
for attempt in Retrying(stop=stop_after_attempt(30), wait=wait_fixed(2), reraise=True):
759758
with attempt:
760759
response = requests.get(f"http://{primary_ip}:8008/cluster")
761760
assert response.status_code == 200
762761
standbys = len([
763762
member for member in response.json()["members"] if member["role"] == "sync_standby"
764763
])
765-
assert standbys >= minority_count
764+
assert standbys == len(ops_test.model.applications[app_name].units) - 1
766765

767766

768767
async def wait_for_idle_on_blocked(

0 commit comments

Comments
 (0)