Skip to content

Commit a627e0b

Browse files
[DPE-4256] Async replication UX improvements (#491)
1 parent c996dd3 commit a627e0b

File tree

13 files changed

+353
-189
lines changed

13 files changed

+353
-189
lines changed

actions.yaml

Lines changed: 9 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,13 @@ create-backup:
1111
Differential backup is a copy only of changed data since the last full backup.
1212
Incremental backup is a copy only of changed data since the last backup (any type).
1313
Possible values - full, differential, incremental.
14+
create-replication:
15+
description: Set up asynchronous replication between two clusters.
16+
params:
17+
name:
18+
type: string
19+
description: The name of the replication (defaults to 'default').
20+
default: default
1421
get-primary:
1522
description: Get the unit with is the primary/leader in the replication.
1623
get-password:
@@ -25,10 +32,10 @@ list-backups:
2532
description: Lists backups in s3 storage in AWS.
2633
pre-upgrade-check:
2734
description: Run necessary pre-upgrade checks and preparations before executing a charm refresh.
28-
promote-cluster:
35+
promote-to-primary:
2936
description: Promotes the cluster of choice to a primary cluster. Must be ran against the leader unit.
3037
params:
31-
force-promotion:
38+
force:
3239
type: boolean
3340
description: Force the promotion of a cluster when there is already a primary cluster.
3441
restore:

lib/charms/postgresql_k8s/v0/postgresql.py

Lines changed: 15 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -36,7 +36,7 @@
3636

3737
# Increment this PATCH version before using `charmcraft publish-lib` or reset
3838
# to 0 if you are raising the major API version
39-
LIBPATCH = 26
39+
LIBPATCH = 27
4040

4141
INVALID_EXTRA_USER_ROLE_BLOCKING_MESSAGE = "invalid role(s) for extra user roles"
4242

@@ -111,20 +111,19 @@ def __init__(
111111
self.system_users = system_users
112112

113113
def _connect_to_database(
114-
self, database: str = None, connect_to_current_host: bool = False
114+
self, database: str = None, database_host: str = None
115115
) -> psycopg2.extensions.connection:
116116
"""Creates a connection to the database.
117117
118118
Args:
119119
database: database to connect to (defaults to the database
120120
provided when the object for this class was created).
121-
connect_to_current_host: whether to connect to the current host
122-
instead of the primary host.
121+
database_host: host to connect to instead of the primary host.
123122
124123
Returns:
125124
psycopg2 connection object.
126125
"""
127-
host = self.current_host if connect_to_current_host else self.primary_host
126+
host = database_host if database_host is not None else self.primary_host
128127
connection = psycopg2.connect(
129128
f"dbname='{database if database else self.database}' user='{self.user}' host='{host}'"
130129
f"password='{self.password}' connect_timeout=1"
@@ -388,7 +387,7 @@ def get_postgresql_text_search_configs(self) -> Set[str]:
388387
Set of PostgreSQL text search configs.
389388
"""
390389
with self._connect_to_database(
391-
connect_to_current_host=True
390+
database_host=self.current_host
392391
) as connection, connection.cursor() as cursor:
393392
cursor.execute("SELECT CONCAT('pg_catalog.', cfgname) FROM pg_ts_config;")
394393
text_search_configs = cursor.fetchall()
@@ -401,7 +400,7 @@ def get_postgresql_timezones(self) -> Set[str]:
401400
Set of PostgreSQL timezones.
402401
"""
403402
with self._connect_to_database(
404-
connect_to_current_host=True
403+
database_host=self.current_host
405404
) as connection, connection.cursor() as cursor:
406405
cursor.execute("SELECT name FROM pg_timezone_names;")
407406
timezones = cursor.fetchall()
@@ -434,7 +433,7 @@ def is_tls_enabled(self, check_current_host: bool = False) -> bool:
434433
"""
435434
try:
436435
with self._connect_to_database(
437-
connect_to_current_host=check_current_host
436+
database_host=self.current_host if check_current_host else None
438437
) as connection, connection.cursor() as cursor:
439438
cursor.execute("SHOW ssl;")
440439
return "on" in cursor.fetchone()[0]
@@ -502,19 +501,24 @@ def set_up_database(self) -> None:
502501
if connection is not None:
503502
connection.close()
504503

505-
def update_user_password(self, username: str, password: str) -> None:
504+
def update_user_password(
505+
self, username: str, password: str, database_host: str = None
506+
) -> None:
506507
"""Update a user password.
507508
508509
Args:
509510
username: the user to update the password.
510511
password: the new password for the user.
512+
database_host: the host to connect to.
511513
512514
Raises:
513515
PostgreSQLUpdateUserPasswordError if the password couldn't be changed.
514516
"""
515517
connection = None
516518
try:
517-
with self._connect_to_database() as connection, connection.cursor() as cursor:
519+
with self._connect_to_database(
520+
database_host=database_host
521+
) as connection, connection.cursor() as cursor:
518522
cursor.execute(
519523
sql.SQL("ALTER USER {} WITH ENCRYPTED PASSWORD '" + password + "';").format(
520524
sql.Identifier(username)
@@ -610,7 +614,7 @@ def validate_date_style(self, date_style: str) -> bool:
610614
"""
611615
try:
612616
with self._connect_to_database(
613-
connect_to_current_host=True
617+
database_host=self.current_host
614618
) as connection, connection.cursor() as cursor:
615619
cursor.execute(
616620
sql.SQL(

metadata.yaml

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -39,8 +39,8 @@ peers:
3939
interface: upgrade
4040

4141
provides:
42-
async-primary:
43-
interface: async_replication
42+
replication-offer:
43+
interface: postgresql_async
4444
limit: 1
4545
optional: true
4646
database:
@@ -55,8 +55,8 @@ provides:
5555
interface: grafana_dashboard
5656

5757
requires:
58-
async-replica:
59-
interface: async_replication
58+
replication:
59+
interface: postgresql_async
6060
limit: 1
6161
optional: true
6262
certificates:

src/backups.py

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -26,7 +26,7 @@
2626
from tenacity import RetryError, Retrying, stop_after_attempt, wait_fixed
2727

2828
from constants import BACKUP_TYPE_OVERRIDES, BACKUP_USER, WORKLOAD_OS_GROUP, WORKLOAD_OS_USER
29-
from relations.async_replication import ASYNC_PRIMARY_RELATION, ASYNC_REPLICA_RELATION
29+
from relations.async_replication import REPLICATION_CONSUMER_RELATION, REPLICATION_OFFER_RELATION
3030

3131
logger = logging.getLogger(__name__)
3232

@@ -810,8 +810,8 @@ def _pre_restore_checks(self, event: ActionEvent) -> bool:
810810

811811
logger.info("Checking that the cluster is not replicating data to a standby cluster")
812812
for relation in [
813-
self.model.get_relation(ASYNC_REPLICA_RELATION),
814-
self.model.get_relation(ASYNC_PRIMARY_RELATION),
813+
self.model.get_relation(REPLICATION_CONSUMER_RELATION),
814+
self.model.get_relation(REPLICATION_OFFER_RELATION),
815815
]:
816816
if not relation:
817817
continue

src/charm.py

Lines changed: 39 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -82,7 +82,11 @@
8282
WORKLOAD_OS_USER,
8383
)
8484
from patroni import NotReadyError, Patroni, SwitchoverFailedError
85-
from relations.async_replication import PostgreSQLAsyncReplication
85+
from relations.async_replication import (
86+
REPLICATION_CONSUMER_RELATION,
87+
REPLICATION_OFFER_RELATION,
88+
PostgreSQLAsyncReplication,
89+
)
8690
from relations.db import EXTENSIONS_BLOCKING_MESSAGE, DbProvides
8791
from relations.postgresql_provider import PostgreSQLProvider
8892
from upgrade import PostgreSQLUpgrade, get_postgresql_k8s_dependencies_model
@@ -844,7 +848,7 @@ def _set_active_status(self):
844848
if self._patroni.get_primary(unit_name_pattern=True) == self.unit.name:
845849
self.unit.status = ActiveStatus("Primary")
846850
elif self.is_standby_leader:
847-
self.unit.status = ActiveStatus("Standby Leader")
851+
self.unit.status = ActiveStatus("Standby")
848852
elif self._patroni.member_started:
849853
self.unit.status = ActiveStatus()
850854
except (RetryError, ConnectionError) as e:
@@ -1072,15 +1076,42 @@ def _on_set_password(self, event: ActionEvent) -> None:
10721076
)
10731077
return
10741078

1075-
# Update the password in the PostgreSQL instance.
1076-
try:
1077-
self.postgresql.update_user_password(username, password)
1078-
except PostgreSQLUpdateUserPasswordError as e:
1079-
logger.exception(e)
1079+
replication_offer_relation = self.model.get_relation(REPLICATION_OFFER_RELATION)
1080+
if (
1081+
replication_offer_relation is not None
1082+
and not self.async_replication.is_primary_cluster()
1083+
):
1084+
# Update the password in the other cluster PostgreSQL primary instance.
1085+
other_cluster_endpoints = self.async_replication.get_all_primary_cluster_endpoints()
1086+
other_cluster_primary = self._patroni.get_primary(
1087+
alternative_endpoints=other_cluster_endpoints
1088+
)
1089+
other_cluster_primary_ip = [
1090+
replication_offer_relation.data[unit].get("private-address")
1091+
for unit in replication_offer_relation.units
1092+
if unit.name.replace("/", "-") == other_cluster_primary
1093+
][0]
1094+
try:
1095+
self.postgresql.update_user_password(
1096+
username, password, database_host=other_cluster_primary_ip
1097+
)
1098+
except PostgreSQLUpdateUserPasswordError as e:
1099+
logger.exception(e)
1100+
event.fail("Failed changing the password.")
1101+
return
1102+
elif self.model.get_relation(REPLICATION_CONSUMER_RELATION) is not None:
10801103
event.fail(
1081-
"Failed changing the password: Not all members healthy or finished initial sync."
1104+
"Failed changing the password: This action can be ran only in the cluster from the offer side."
10821105
)
10831106
return
1107+
else:
1108+
# Update the password in this cluster PostgreSQL primary instance.
1109+
try:
1110+
self.postgresql.update_user_password(username, password)
1111+
except PostgreSQLUpdateUserPasswordError as e:
1112+
logger.exception(e)
1113+
event.fail("Failed changing the password.")
1114+
return
10841115

10851116
# Update the password in the secret store.
10861117
self.set_secret(APP_SCOPE, f"{username}-password", password)
@@ -1089,9 +1120,6 @@ def _on_set_password(self, event: ActionEvent) -> None:
10891120
# Other units Patroni configuration will be reloaded in the peer relation changed event.
10901121
self.update_config()
10911122

1092-
# Update the password in the async replication data.
1093-
self.async_replication.update_async_replication_data()
1094-
10951123
event.set_results({"password": password})
10961124

10971125
def _on_get_primary(self, event: ActionEvent) -> None:

src/patroni.py

Lines changed: 11 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -88,20 +88,6 @@ def _patroni_url(self) -> str:
8888
"""Patroni REST API URL."""
8989
return f"{'https' if self._tls_enabled else 'http'}://{self._endpoint}:8008"
9090

91-
# def configure_standby_cluster(self, host: str) -> None:
92-
# """Configure this cluster as a standby cluster."""
93-
# requests.patch(
94-
# f"{self._patroni_url}/config",
95-
# verify=self._verify,
96-
# json={
97-
# "standby_cluster": {
98-
# "create_replica_methods": ["basebackup"],
99-
# "host": host,
100-
# "port": 5432,
101-
# }
102-
# },
103-
# )
104-
10591
@property
10692
def rock_postgresql_version(self) -> Optional[str]:
10793
"""Version of Postgresql installed in the Rock image."""
@@ -112,12 +98,18 @@ def rock_postgresql_version(self) -> Optional[str]:
11298
snap_meta = container.pull("/meta.charmed-postgresql/snap.yaml")
11399
return yaml.safe_load(snap_meta)["version"]
114100

115-
def _get_alternative_patroni_url(self, attempt: AttemptManager) -> str:
101+
def _get_alternative_patroni_url(
102+
self, attempt: AttemptManager, alternative_endpoints: List[str] = None
103+
) -> str:
116104
"""Get an alternative REST API URL from another member each time.
117105
118106
When the Patroni process is not running in the current unit it's needed
119107
to use a URL from another cluster member REST API to do some operations.
120108
"""
109+
if alternative_endpoints is not None:
110+
return self._patroni_url.replace(
111+
self._endpoint, alternative_endpoints[attempt.retry_state.attempt_number - 1]
112+
)
121113
if attempt.retry_state.attempt_number > 1:
122114
url = self._patroni_url.replace(
123115
self._endpoint, list(self._endpoints)[attempt.retry_state.attempt_number - 2]
@@ -126,11 +118,12 @@ def _get_alternative_patroni_url(self, attempt: AttemptManager) -> str:
126118
url = self._patroni_url
127119
return url
128120

129-
def get_primary(self, unit_name_pattern=False) -> str:
121+
def get_primary(self, unit_name_pattern=False, alternative_endpoints: List[str] = None) -> str:
130122
"""Get primary instance.
131123
132124
Args:
133125
unit_name_pattern: whether or not to convert pod name to unit name
126+
alternative_endpoints: list of alternative endpoints to check for the primary.
134127
135128
Returns:
136129
primary pod or unit name.
@@ -139,8 +132,8 @@ def get_primary(self, unit_name_pattern=False) -> str:
139132
# Request info from cluster endpoint (which returns all members of the cluster).
140133
for attempt in Retrying(stop=stop_after_attempt(len(self._endpoints) + 1)):
141134
with attempt:
142-
url = self._get_alternative_patroni_url(attempt)
143-
r = requests.get(f"{url}/cluster", verify=self._verify)
135+
url = self._get_alternative_patroni_url(attempt, alternative_endpoints)
136+
r = requests.get(f"{url}/cluster", verify=self._verify, timeout=5)
144137
for member in r.json()["members"]:
145138
if member["role"] == "leader":
146139
primary = member["name"]

0 commit comments

Comments
 (0)