Skip to content

Commit b669f2a

Browse files
Zvirovyitaurus-forevermarceloneppel
authored
[DPE-7625] Add Logical Replication (#982)
* Migrated logical replication from VM. * Add logical replication integration test. * Lint fix. * Add logical replication integration test. * Improved saved resources info. * Review suggestions. * Fix pg_hba rules. * Fix unit tests. * Properly close psycopg connection. * Review suggestions. * Fix integration test. Fix bug with duplicating data. * Include fix from @marceloneppel. * Fix test_smoke.py Signed-off-by: Marcelo Henrique Neppel <[email protected]> * Fix linting Signed-off-by: Marcelo Henrique Neppel <[email protected]> * Try to make faster the check for all DBs down Signed-off-by: Marcelo Henrique Neppel <[email protected]> * Logical replication config description. --------- Signed-off-by: Marcelo Henrique Neppel <[email protected]> Co-authored-by: Alex Lutay <[email protected]> Co-authored-by: Marcelo Henrique Neppel <[email protected]>
1 parent e2f9d48 commit b669f2a

File tree

15 files changed

+1551
-37
lines changed

15 files changed

+1551
-37
lines changed

config.yaml

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -117,6 +117,13 @@ options:
117117
Enables tracking of function call counts and time used. Specify pl to track only procedural-language functions
118118
type: string
119119
default: "none"
120+
logical_replication_subscription_request:
121+
description: |
122+
Set of databases corresponding to list of tables with schema notation in JSON format, which will be requested from
123+
publiblisher cluster to subscribe on via logical replication.
124+
Example: {"<database>": ["<schema>.<table>", ...], ...}
125+
type: string
126+
default: "{}"
120127
memory_maintenance_work_mem:
121128
description: |
122129
Sets the maximum memory (KB) to be used for maintenance operations.

lib/charms/postgresql_k8s/v0/postgresql.py

Lines changed: 369 additions & 2 deletions
Large diffs are not rendered by default.

metadata.yaml

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -49,6 +49,9 @@ provides:
4949
interface: postgresql_async
5050
limit: 1
5151
optional: true
52+
logical-replication-offer:
53+
interface: postgresql_logical_replication
54+
optional: true
5255
database:
5356
interface: postgresql_client
5457
metrics-endpoint:
@@ -61,6 +64,10 @@ requires:
6164
interface: postgresql_async
6265
limit: 1
6366
optional: true
67+
logical-replication:
68+
interface: postgresql_logical_replication
69+
limit: 1
70+
optional: true
6471
certificates:
6572
interface: tls-certificates
6673
limit: 1

src/backups.py

Lines changed: 18 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -35,6 +35,10 @@
3535
WORKLOAD_OS_USER,
3636
)
3737
from relations.async_replication import REPLICATION_CONSUMER_RELATION, REPLICATION_OFFER_RELATION
38+
from relations.logical_replication import (
39+
LOGICAL_REPLICATION_OFFER_RELATION,
40+
LOGICAL_REPLICATION_RELATION,
41+
)
3842

3943
logger = logging.getLogger(__name__)
4044

@@ -1088,7 +1092,7 @@ def _fetch_backup_from_id(self, backup_id: str) -> str:
10881092

10891093
return None
10901094

1091-
def _pre_restore_checks(self, event: ActionEvent) -> bool:
1095+
def _pre_restore_checks(self, event: ActionEvent) -> bool: # noqa: C901
10921096
"""Run some checks before starting the restore.
10931097
10941098
Returns:
@@ -1143,14 +1147,25 @@ def _pre_restore_checks(self, event: ActionEvent) -> bool:
11431147
event.fail(error_message)
11441148
return False
11451149

1146-
logger.info("Checking that the cluster is not replicating data to a standby cluster")
1150+
logger.info("Checking that cluster does not have an active async replication relation")
11471151
for relation in [
11481152
self.model.get_relation(REPLICATION_CONSUMER_RELATION),
11491153
self.model.get_relation(REPLICATION_OFFER_RELATION),
11501154
]:
11511155
if not relation:
11521156
continue
1153-
error_message = "Unit cannot restore backup as the cluster is replicating data to a standby cluster"
1157+
error_message = "Unit cannot restore backup with an active async replication relation"
1158+
logger.error(f"Restore failed: {error_message}")
1159+
event.fail(error_message)
1160+
return False
1161+
1162+
logger.info("Checking that cluster does not have an active logical replication relation")
1163+
if self.model.get_relation(LOGICAL_REPLICATION_RELATION) or len(
1164+
self.model.relations.get(LOGICAL_REPLICATION_OFFER_RELATION, ())
1165+
):
1166+
error_message = (
1167+
"Unit cannot restore backup with an active logical replication connection"
1168+
)
11541169
logger.error(f"Restore failed: {error_message}")
11551170
event.fail(error_message)
11561171
return False

src/charm.py

Lines changed: 41 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -49,6 +49,7 @@
4949
PostgreSQL,
5050
PostgreSQLEnableDisableExtensionError,
5151
PostgreSQLGetCurrentTimelineError,
52+
PostgreSQLListGroupsError,
5253
PostgreSQLUpdateUserPasswordError,
5354
)
5455
from charms.postgresql_k8s.v0.postgresql_tls import PostgreSQLTLS
@@ -134,6 +135,10 @@
134135
REPLICATION_OFFER_RELATION,
135136
PostgreSQLAsyncReplication,
136137
)
138+
from relations.logical_replication import (
139+
LOGICAL_REPLICATION_VALIDATION_ERROR_STATUS,
140+
PostgreSQLLogicalReplication,
141+
)
137142
from relations.postgresql_provider import PostgreSQLProvider
138143
from upgrade import PostgreSQLUpgrade, get_postgresql_k8s_dependencies_model
139144
from utils import any_cpu_to_cores, any_memory_to_bytes, new_password
@@ -250,6 +255,7 @@ def __init__(self, *args):
250255
self.ldap = PostgreSQLLDAP(self, "ldap")
251256
self.tls = PostgreSQLTLS(self, PEER, [self.primary_endpoint, self.replicas_endpoint])
252257
self.async_replication = PostgreSQLAsyncReplication(self)
258+
self.logical_replication = PostgreSQLLogicalReplication(self)
253259
self.restart_manager = RollingOpsManager(
254260
charm=self, relation="restart", callback=self._restart
255261
)
@@ -709,7 +715,7 @@ def _on_secret_changed(self, event: SecretChangedEvent) -> None:
709715
except PostgreSQLUpdateUserPasswordError:
710716
event.defer()
711717

712-
def _on_config_changed(self, event) -> None:
718+
def _on_config_changed(self, event) -> None: # noqa: C901
713719
"""Handle configuration changes, like enabling plugins."""
714720
if not self.is_cluster_initialised:
715721
logger.debug("Defer on_config_changed: cluster not initialised yet")
@@ -744,6 +750,9 @@ def _on_config_changed(self, event) -> None:
744750
# Update the sync-standby endpoint in the async replication data.
745751
self.async_replication.update_async_replication_data()
746752

753+
if not self.logical_replication.apply_changed_config(event):
754+
return
755+
747756
if not self.unit.is_leader():
748757
return
749758

@@ -1097,6 +1106,12 @@ def _set_active_status(self):
10971106
self.app_peer_data["s3-initialization-block-message"]
10981107
)
10991108
return
1109+
if self.unit.is_leader() and (
1110+
self.app_peer_data.get("logical-replication-validation") == "error"
1111+
or self.logical_replication.has_remote_publisher_errors()
1112+
):
1113+
self.unit.status = BlockedStatus(LOGICAL_REPLICATION_VALIDATION_ERROR_STATUS)
1114+
return
11001115
if (
11011116
self._patroni.get_primary(unit_name_pattern=True) == self.unit.name
11021117
or self.is_standby_leader
@@ -1488,7 +1503,9 @@ def _on_update_status_early_exit_checks(self, container) -> bool:
14881503
self._check_pgdata_storage_size()
14891504

14901505
if (
1491-
self._has_blocked_status and self.unit.status not in S3_BLOCK_MESSAGES
1506+
self._has_blocked_status
1507+
and self.unit.status not in S3_BLOCK_MESSAGES
1508+
and self.unit.status.message != LOGICAL_REPLICATION_VALIDATION_ERROR_STATUS
14921509
) or self._has_non_restore_waiting_status:
14931510
# If charm was failing to disable plugin, try again and continue (user may have removed the objects)
14941511
if self.unit.status.message == EXTENSION_OBJECT_MESSAGE:
@@ -1565,6 +1582,8 @@ def _on_update_status(self, _) -> None:
15651582

15661583
self.backup.coordinate_stanza_fields()
15671584

1585+
self.logical_replication.retry_validations()
1586+
15681587
self._set_active_status()
15691588

15701589
def _was_restore_successful(self, container: Container, service: ServiceInfo) -> bool:
@@ -2078,6 +2097,8 @@ def update_config(self, is_creating_backup: bool = False) -> bool:
20782097
self.model.config, available_memory, limit_memory
20792098
)
20802099

2100+
replication_slots = self.logical_replication.replication_slots()
2101+
20812102
logger.info("Updating Patroni config file")
20822103
# Update and reload configuration based on TLS files availability.
20832104
self._patroni.render_patroni_yml_file(
@@ -2094,6 +2115,7 @@ def update_config(self, is_creating_backup: bool = False) -> bool:
20942115
restore_stanza=self.app_peer_data.get("restore-stanza"),
20952116
parameters=postgresql_parameters,
20962117
user_databases_map=self.relations_user_databases_map,
2118+
slots=replication_slots or None,
20972119
)
20982120

20992121
if not self._is_workload_running:
@@ -2133,6 +2155,8 @@ def update_config(self, is_creating_backup: bool = False) -> bool:
21332155
"wal_keep_size": self.config.durability_wal_keep_size,
21342156
})
21352157

2158+
self._patroni.ensure_slots_controller_by_patroni(replication_slots)
2159+
21362160
self._handle_postgresql_restart_need()
21372161
self._restart_metrics_service()
21382162
self._restart_ldap_sync_service()
@@ -2323,22 +2347,30 @@ def client_relations(self) -> list[Relation]:
23232347
@property
23242348
def relations_user_databases_map(self) -> dict:
23252349
"""Returns a user->databases map for all relations."""
2326-
if (
2327-
not self.is_cluster_initialised
2328-
or not self._patroni.member_started
2329-
or self.postgresql.list_access_groups(current_host=self.is_connectivity_enabled)
2330-
!= set(ACCESS_GROUPS)
2331-
):
2350+
try:
2351+
if (
2352+
not self.is_cluster_initialised
2353+
or not self._patroni.member_started
2354+
or self.postgresql.list_access_groups(current_host=self.is_connectivity_enabled)
2355+
!= set(ACCESS_GROUPS)
2356+
):
2357+
return {USER: "all", REPLICATION_USER: "all", REWIND_USER: "all"}
2358+
except PostgreSQLListGroupsError as e:
2359+
logger.warning(f"Failed to list access groups: {e}")
23322360
return {USER: "all", REPLICATION_USER: "all", REWIND_USER: "all"}
23332361
user_database_map = {}
23342362
for user in self.postgresql.list_users_from_relation(
23352363
current_host=self.is_connectivity_enabled
23362364
):
2337-
user_database_map[user] = ",".join(
2365+
databases = ",".join(
23382366
self.postgresql.list_accessible_databases_for_user(
23392367
user, current_host=self.is_connectivity_enabled
23402368
)
23412369
)
2370+
if databases:
2371+
user_database_map[user] = databases
2372+
else:
2373+
logger.debug(f"User {user} has no databases to connect to")
23422374

23432375
# Copy relations users directly instead of waiting for them to be created
23442376
for relation in self.model.relations[self.postgresql_client_relation.relation_name]:

src/config.py

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -35,6 +35,7 @@ class CharmConfig(BaseConfigModel):
3535
logging_log_lock_waits: bool | None
3636
logging_log_min_duration_statement: int | None
3737
logging_track_functions: str | None
38+
logical_replication_subscription_request: str | None
3839
memory_maintenance_work_mem: int | None
3940
memory_max_prepared_transactions: int | None
4041
memory_shared_buffers: int | None

src/patroni.py

Lines changed: 32 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -456,6 +456,35 @@ def bulk_update_parameters_controller_by_patroni(self, parameters: dict[str, Any
456456
timeout=PATRONI_TIMEOUT,
457457
)
458458

459+
def ensure_slots_controller_by_patroni(self, slots: dict[str, str]) -> None:
460+
"""Synchronises slots controlled by Patroni with the provided state by removing unneeded slots and creating new ones.
461+
462+
Args:
463+
slots: dictionary of slots in the {slot: database} format.
464+
"""
465+
current_config = requests.get(
466+
f"{self._patroni_url}/config",
467+
verify=self._verify,
468+
timeout=PATRONI_TIMEOUT,
469+
auth=self._patroni_auth,
470+
)
471+
slots_patch: dict[str, dict[str, str] | None] = dict.fromkeys(
472+
current_config.json().get("slots", ())
473+
)
474+
for slot, database in slots.items():
475+
slots_patch[slot] = {
476+
"database": database,
477+
"plugin": "pgoutput",
478+
"type": "logical",
479+
}
480+
requests.patch(
481+
f"{self._patroni_url}/config",
482+
verify=self._verify,
483+
json={"slots": slots_patch},
484+
auth=self._patroni_auth,
485+
timeout=PATRONI_TIMEOUT,
486+
)
487+
459488
def promote_standby_cluster(self) -> None:
460489
"""Promote a standby cluster to be a regular cluster."""
461490
config_response = requests.get(
@@ -526,6 +555,7 @@ def render_patroni_yml_file(
526555
restore_to_latest: bool = False,
527556
parameters: dict[str, str] | None = None,
528557
user_databases_map: dict[str, str] | None = None,
558+
slots: dict[str, str] | None = None,
529559
) -> None:
530560
"""Render the Patroni configuration file.
531561
@@ -545,6 +575,7 @@ def render_patroni_yml_file(
545575
restore_to_latest: restore all the WAL transaction logs from the stanza.
546576
parameters: PostgreSQL parameters to be added to the postgresql.conf file.
547577
user_databases_map: map of databases to be accessible by each user.
578+
slots: replication slots (keys) with assigned database name (values).
548579
"""
549580
# Open the template patroni.yml file.
550581
with open("templates/patroni.yml.j2") as file:
@@ -584,6 +615,7 @@ def render_patroni_yml_file(
584615
ldap_parameters=self._dict_to_hba_string(ldap_params),
585616
patroni_password=self._patroni_password,
586617
user_databases_map=user_databases_map,
618+
slots=slots,
587619
)
588620
self._render_file(f"{self._storage_path}/patroni.yml", rendered, 0o644)
589621

0 commit comments

Comments
 (0)