Skip to content

Commit 448a1a6

Browse files
[DPE-2953] Cross-region async replication (#452)
* Add async replication implementation Signed-off-by: Marcelo Henrique Neppel <[email protected]> * Backup standby pgdata folder Signed-off-by: Marcelo Henrique Neppel <[email protected]> * Fix OS call Signed-off-by: Marcelo Henrique Neppel <[email protected]> * Fix unit tests Signed-off-by: Marcelo Henrique Neppel <[email protected]> * Improve comments and logs Signed-off-by: Marcelo Henrique Neppel <[email protected]> * Revert permission change Signed-off-by: Marcelo Henrique Neppel <[email protected]> * Add optional type hint Signed-off-by: Marcelo Henrique Neppel <[email protected]> * Add relation name to secret label and revert poetry.lock Signed-off-by: Marcelo Henrique Neppel <[email protected]> * Reload Patroni configuration when member is not ready yet Signed-off-by: Marcelo Henrique Neppel <[email protected]> --------- Signed-off-by: Marcelo Henrique Neppel <[email protected]>
1 parent 35673fb commit 448a1a6

File tree

9 files changed

+967
-62
lines changed

9 files changed

+967
-62
lines changed

actions.yaml

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,12 @@ list-backups:
1717
description: Lists backups in s3 storage.
1818
pre-upgrade-check:
1919
description: Run necessary pre-upgrade checks and preparations before executing a charm refresh.
20+
promote-cluster:
21+
description: Promotes the cluster of choice to a primary cluster. Must be ran against the leader unit.
22+
params:
23+
force-promotion:
24+
type: boolean
25+
description: Force the promotion of a cluster when there is already a primary cluster.
2026
restore:
2127
description: Restore a database backup using pgBackRest.
2228
S3 credentials are retrieved from a relation with the S3 integrator charm.

lib/charms/postgresql_k8s/v0/postgresql.py

Lines changed: 23 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@
1818
1919
Any charm using this library should import the `psycopg2` or `psycopg2-binary` dependency.
2020
"""
21+
2122
import logging
2223
from collections import OrderedDict
2324
from typing import Dict, List, Optional, Set, Tuple
@@ -35,7 +36,7 @@
3536

3637
# Increment this PATCH version before using `charmcraft publish-lib` or reset
3738
# to 0 if you are raising the major API version
38-
LIBPATCH = 24
39+
LIBPATCH = 26
3940

4041
INVALID_EXTRA_USER_ROLE_BLOCKING_MESSAGE = "invalid role(s) for extra user roles"
4142

@@ -358,9 +359,7 @@ def _generate_database_privileges_statements(
358359
statements.append(
359360
"""UPDATE pg_catalog.pg_largeobject_metadata
360361
SET lomowner = (SELECT oid FROM pg_roles WHERE rolname = '{}')
361-
WHERE lomowner = (SELECT oid FROM pg_roles WHERE rolname = '{}');""".format(
362-
user, self.user
363-
)
362+
WHERE lomowner = (SELECT oid FROM pg_roles WHERE rolname = '{}');""".format(user, self.user)
364363
)
365364
else:
366365
for schema in schemas:
@@ -477,11 +476,11 @@ def set_up_database(self) -> None:
477476
"""Set up postgres database with the right permissions."""
478477
connection = None
479478
try:
480-
self.create_user(
481-
"admin",
482-
extra_user_roles="pg_read_all_data,pg_write_all_data",
483-
)
484479
with self._connect_to_database() as connection, connection.cursor() as cursor:
480+
cursor.execute("SELECT TRUE FROM pg_roles WHERE rolname='admin';")
481+
if cursor.fetchone() is not None:
482+
return
483+
485484
# Allow access to the postgres database only to the system users.
486485
cursor.execute("REVOKE ALL PRIVILEGES ON DATABASE postgres FROM PUBLIC;")
487486
cursor.execute("REVOKE CREATE ON SCHEMA public FROM PUBLIC;")
@@ -491,6 +490,10 @@ def set_up_database(self) -> None:
491490
sql.Identifier(user)
492491
)
493492
)
493+
self.create_user(
494+
"admin",
495+
extra_user_roles="pg_read_all_data,pg_write_all_data",
496+
)
494497
cursor.execute("GRANT CONNECT ON DATABASE postgres TO admin;")
495498
except psycopg2.Error as e:
496499
logger.error(f"Failed to set up databases: {e}")
@@ -562,18 +565,16 @@ def build_postgresql_parameters(
562565
parameters = {}
563566
for config, value in config_options.items():
564567
# Filter config option not related to PostgreSQL parameters.
565-
if not config.startswith(
566-
(
567-
"durability",
568-
"instance",
569-
"logging",
570-
"memory",
571-
"optimizer",
572-
"request",
573-
"response",
574-
"vacuum",
575-
)
576-
):
568+
if not config.startswith((
569+
"durability",
570+
"instance",
571+
"logging",
572+
"memory",
573+
"optimizer",
574+
"request",
575+
"response",
576+
"vacuum",
577+
)):
577578
continue
578579
parameter = "_".join(config.split("_")[1:])
579580
if parameter in ["date_style", "time_zone"]:
@@ -594,8 +595,8 @@ def build_postgresql_parameters(
594595
# and the remaining as cache memory.
595596
shared_buffers = int(available_memory * 0.25)
596597
effective_cache_size = int(available_memory - shared_buffers)
597-
parameters.setdefault("shared_buffers", f"{int(shared_buffers/10**6)}MB")
598-
parameters.update({"effective_cache_size": f"{int(effective_cache_size/10**6)}MB"})
598+
parameters.setdefault("shared_buffers", f"{int(shared_buffers / 10**6)}MB")
599+
parameters.update({"effective_cache_size": f"{int(effective_cache_size / 10**6)}MB"})
599600
else:
600601
# Return default
601602
parameters.setdefault("shared_buffers", "128MB")

metadata.yaml

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,10 @@ peers:
2626
interface: upgrade
2727

2828
provides:
29+
async-primary:
30+
interface: async_replication
31+
limit: 1
32+
optional: true
2933
database:
3034
interface: postgresql_client
3135
db:
@@ -37,6 +41,10 @@ provides:
3741
limit: 1
3842

3943
requires:
44+
async-replica:
45+
interface: async_replication
46+
limit: 1
47+
optional: true
4048
certificates:
4149
interface: tls-certificates
4250
limit: 1

src/charm.py

Lines changed: 44 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -86,6 +86,7 @@
8686
USER,
8787
USER_PASSWORD_KEY,
8888
)
89+
from relations.async_replication import PostgreSQLAsyncReplication
8990
from relations.db import EXTENSIONS_BLOCKING_MESSAGE, DbProvides
9091
from relations.postgresql_provider import PostgreSQLProvider
9192
from upgrade import PostgreSQLUpgrade, get_postgresql_dependencies_model
@@ -166,6 +167,7 @@ def __init__(self, *args):
166167
self.legacy_db_admin_relation = DbProvides(self, admin=True)
167168
self.backup = PostgreSQLBackups(self, "s3-parameters")
168169
self.tls = PostgreSQLTLS(self, PEER)
170+
self.async_replication = PostgreSQLAsyncReplication(self)
169171
self.restart_manager = RollingOpsManager(
170172
charm=self, relation="restart", callback=self._restart
171173
)
@@ -321,6 +323,8 @@ def primary_endpoint(self) -> Optional[str]:
321323
for attempt in Retrying(stop=stop_after_delay(60), wait=wait_fixed(3)):
322324
with attempt:
323325
primary = self._patroni.get_primary()
326+
if primary is None and (standby_leader := self._patroni.get_standby_leader()):
327+
primary = standby_leader
324328
primary_endpoint = self._patroni.get_member_ip(primary)
325329
# Force a retry if there is no primary or the member that was
326330
# returned is not in the list of the current cluster members
@@ -420,6 +424,9 @@ def _on_peer_relation_departed(self, event: RelationDepartedEvent) -> None:
420424
self.unit.status = WaitingStatus(PRIMARY_NOT_REACHABLE_MESSAGE)
421425
return
422426

427+
# Update the sync-standby endpoint in the async replication data.
428+
self.async_replication.update_async_replication_data()
429+
423430
def _on_pgdata_storage_detaching(self, _) -> None:
424431
# Change the primary if it's the unit that is being removed.
425432
try:
@@ -513,9 +520,13 @@ def _on_peer_relation_changed(self, event: HookEvent):
513520

514521
# Restart the workload if it's stuck on the starting state after a timeline divergence
515522
# due to a backup that was restored.
516-
if not self.is_primary and (
517-
self._patroni.member_replication_lag == "unknown"
518-
or int(self._patroni.member_replication_lag) > 1000
523+
if (
524+
not self.is_primary
525+
and not self.is_standby_leader
526+
and (
527+
self._patroni.member_replication_lag == "unknown"
528+
or int(self._patroni.member_replication_lag) > 1000
529+
)
519530
):
520531
self._patroni.reinitialize_postgresql()
521532
logger.debug("Deferring on_peer_relation_changed: reinitialising replica")
@@ -551,8 +562,7 @@ def _update_new_unit_status(self) -> None:
551562
# a failed switchover, so wait until the primary is elected.
552563
if self.primary_endpoint:
553564
self._update_relation_endpoints()
554-
if not self.is_blocked:
555-
self.unit.status = ActiveStatus()
565+
self.async_replication.handle_read_only_mode()
556566
else:
557567
self.unit.status = WaitingStatus(PRIMARY_NOT_REACHABLE_MESSAGE)
558568

@@ -688,6 +698,7 @@ def _hosts(self) -> set:
688698
def _patroni(self) -> Patroni:
689699
"""Returns an instance of the Patroni object."""
690700
return Patroni(
701+
self,
691702
self._unit_ip,
692703
self.cluster_name,
693704
self._member_name,
@@ -704,6 +715,11 @@ def is_primary(self) -> bool:
704715
"""Return whether this unit is the primary instance."""
705716
return self.unit.name == self._patroni.get_primary(unit_name_pattern=True)
706717

718+
@property
719+
def is_standby_leader(self) -> bool:
720+
"""Return whether this unit is the standby leader instance."""
721+
return self.unit.name == self._patroni.get_standby_leader(unit_name_pattern=True)
722+
707723
@property
708724
def is_tls_enabled(self) -> bool:
709725
"""Return whether TLS is enabled."""
@@ -902,6 +918,9 @@ def _on_config_changed(self, event) -> None:
902918
if self.is_blocked and "Configuration Error" in self.unit.status.message:
903919
self.unit.status = ActiveStatus()
904920

921+
# Update the sync-standby endpoint in the async replication data.
922+
self.async_replication.update_async_replication_data()
923+
905924
if not self.unit.is_leader():
906925
return
907926

@@ -929,6 +948,9 @@ def enable_disable_extensions(self, database: str = None) -> None:
929948
Args:
930949
database: optional database where to enable/disable the extension.
931950
"""
951+
if self._patroni.get_primary() is None:
952+
logger.debug("Early exit enable_disable_extensions: standby cluster")
953+
return
932954
spi_module = ["refint", "autoinc", "insert_username", "moddatetime"]
933955
plugins_exception = {"uuid_ossp": '"uuid-ossp"'}
934956
original_status = self.unit.status
@@ -1188,6 +1210,9 @@ def _on_set_password(self, event: ActionEvent) -> None:
11881210
# Other units Patroni configuration will be reloaded in the peer relation changed event.
11891211
self.update_config()
11901212

1213+
# Update the password in the async replication data.
1214+
self.async_replication.update_async_replication_data()
1215+
11911216
event.set_results({"password": password})
11921217

11931218
def _on_update_status(self, _) -> None:
@@ -1225,6 +1250,9 @@ def _on_update_status(self, _) -> None:
12251250
if self._handle_workload_failures():
12261251
return
12271252

1253+
# Update the sync-standby endpoint in the async replication data.
1254+
self.async_replication.update_async_replication_data()
1255+
12281256
self._set_primary_status_message()
12291257

12301258
# Restart topology observer if it is gone
@@ -1270,8 +1298,16 @@ def _handle_workload_failures(self) -> bool:
12701298
a bool indicating whether the charm performed any action.
12711299
"""
12721300
# Restart the workload if it's stuck on the starting state after a restart.
1301+
try:
1302+
is_primary = self.is_primary
1303+
is_standby_leader = self.is_standby_leader
1304+
except RetryError:
1305+
return False
1306+
12731307
if (
1274-
not self._patroni.member_started
1308+
not is_primary
1309+
and not is_standby_leader
1310+
and not self._patroni.member_started
12751311
and "postgresql_restarted" in self._peers.data[self.unit]
12761312
and self._patroni.member_replication_lag == "unknown"
12771313
):
@@ -1291,6 +1327,8 @@ def _set_primary_status_message(self) -> None:
12911327
try:
12921328
if self._patroni.get_primary(unit_name_pattern=True) == self.unit.name:
12931329
self.unit.status = ActiveStatus("Primary")
1330+
elif self.is_standby_leader:
1331+
self.unit.status = ActiveStatus("Standby Leader")
12941332
elif self._patroni.member_started:
12951333
self.unit.status = ActiveStatus()
12961334
except (RetryError, ConnectionError) as e:

0 commit comments

Comments
 (0)