Skip to content

Commit 823e0a0

Browse files
Logical Replication (#924)
* Migrated logical replication from PG14. * Add logical replication integration test. * Typo fixes for logical replication. * Lint fix. * Lint fix. * Lint fix. * Lint fix. * Lint fix. * Merge fix * Lint fix. * Improved saved resources info. * Add pg_hba support for logical replication. * Properly close psycopg connection. * Review suggestions * Fix bug with duplicating data. * Include fix from @marceloneppel. * Retry slots retrieval and add controller name to Juju command in test Signed-off-by: Marcelo Henrique Neppel <[email protected]> * Fix linting Signed-off-by: Marcelo Henrique Neppel <[email protected]> * Increase test timeouts Signed-off-by: Marcelo Henrique Neppel <[email protected]> * Increase test timeouts Signed-off-by: Marcelo Henrique Neppel <[email protected]> * Test with smaller timeout and longer fast interval Signed-off-by: Marcelo Henrique Neppel <[email protected]> * Use custom fast interval in more checks Signed-off-by: Marcelo Henrique Neppel <[email protected]> * Add sleep call to test Signed-off-by: Marcelo Henrique Neppel <[email protected]> * Add sleep call to test Signed-off-by: Marcelo Henrique Neppel <[email protected]> * Add sleep call to test Signed-off-by: Marcelo Henrique Neppel <[email protected]> * Add comments to trigger CI Signed-off-by: Marcelo Henrique Neppel <[email protected]> * Change sleep time to try to tirigger CI Signed-off-by: Marcelo Henrique Neppel <[email protected]> * Remove sleep calls Signed-off-by: Marcelo Henrique Neppel <[email protected]> * Add relation users and their databases to pg_hba.conf when the database is still starting Signed-off-by: Marcelo Henrique Neppel <[email protected]> * Logical replication config description. --------- Signed-off-by: Marcelo Henrique Neppel <[email protected]> Co-authored-by: Marcelo Henrique Neppel <[email protected]>
1 parent f7de1f1 commit 823e0a0

File tree

13 files changed

+1615
-19
lines changed

13 files changed

+1615
-19
lines changed

config.yaml

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -117,6 +117,13 @@ options:
117117
Enables tracking of function call counts and time used. Specify pl to track only procedural-language functions
118118
type: string
119119
default: "none"
120+
logical_replication_subscription_request:
121+
description: |
122+
Set of databases corresponding to list of tables with schema notation in JSON format, which will be requested from
123+
publiblisher cluster to subscribe on via logical replication.
124+
Example: {"<database>": ["<schema>.<table>", ...], ...}
125+
type: string
126+
default: "{}"
120127
memory_maintenance_work_mem:
121128
description: |
122129
Sets the maximum memory (KB) to be used for maintenance operations.

lib/charms/postgresql_k8s/v1/postgresql.py

Lines changed: 380 additions & 2 deletions
Large diffs are not rendered by default.

metadata.yaml

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -30,6 +30,9 @@ provides:
3030
interface: postgresql_async
3131
limit: 1
3232
optional: true
33+
logical-replication-offer:
34+
interface: postgresql_logical_replication
35+
optional: true
3336
database:
3437
interface: postgresql_client
3538
cos-agent:
@@ -45,6 +48,10 @@ requires:
4548
interface: tls-certificates
4649
limit: 1
4750
optional: true
51+
logical-replication:
52+
interface: postgresql_logical_replication
53+
limit: 1
54+
optional: true
4855
client-certificates:
4956
interface: tls-certificates
5057
limit: 1

src/backups.py

Lines changed: 28 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -43,6 +43,11 @@
4343
POSTGRESQL_DATA_PATH,
4444
UNIT_SCOPE,
4545
)
46+
from relations.async_replication import REPLICATION_CONSUMER_RELATION, REPLICATION_OFFER_RELATION
47+
from relations.logical_replication import (
48+
LOGICAL_REPLICATION_OFFER_RELATION,
49+
LOGICAL_REPLICATION_RELATION,
50+
)
4651

4752
logger = logging.getLogger(__name__)
4853

@@ -1214,6 +1219,29 @@ def _pre_restore_checks(self, event: ActionEvent) -> bool:
12141219
event.fail(error_message)
12151220
return False
12161221

1222+
logger.info("Checking that cluster does not have an active async replication relation")
1223+
for relation in [
1224+
self.model.get_relation(REPLICATION_CONSUMER_RELATION),
1225+
self.model.get_relation(REPLICATION_OFFER_RELATION),
1226+
]:
1227+
if not relation:
1228+
continue
1229+
error_message = "Unit cannot restore backup with an active async replication relation"
1230+
logger.error(f"Restore failed: {error_message}")
1231+
event.fail(error_message)
1232+
return False
1233+
1234+
logger.info("Checking that cluster does not have an active logical replication relation")
1235+
if self.model.get_relation(LOGICAL_REPLICATION_RELATION) or len(
1236+
self.model.relations.get(LOGICAL_REPLICATION_OFFER_RELATION, ())
1237+
):
1238+
error_message = (
1239+
"Unit cannot restore backup with an active logical replication connection"
1240+
)
1241+
logger.error(f"Restore failed: {error_message}")
1242+
event.fail(error_message)
1243+
return False
1244+
12171245
logger.info("Checking that this unit was already elected the leader unit")
12181246
if not self.charm.unit.is_leader():
12191247
error_message = "Unit cannot restore backup as it was not elected the leader unit yet"

src/charm.py

Lines changed: 47 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -122,6 +122,10 @@
122122
)
123123
from ldap import PostgreSQLLDAP
124124
from relations.async_replication import PostgreSQLAsyncReplication
125+
from relations.logical_replication import (
126+
LOGICAL_REPLICATION_VALIDATION_ERROR_STATUS,
127+
PostgreSQLLogicalReplication,
128+
)
125129
from relations.postgresql_provider import PostgreSQLProvider
126130
from relations.tls import TLS
127131
from relations.tls_transfer import TLSTransfer
@@ -303,6 +307,7 @@ def __init__(self, *args):
303307
self.tls = TLS(self, PEER)
304308
self.tls_transfer = TLSTransfer(self, PEER)
305309
self.async_replication = PostgreSQLAsyncReplication(self)
310+
self.logical_replication = PostgreSQLLogicalReplication(self)
306311
self.restart_manager = RollingOpsManager(
307312
charm=self, relation="restart", callback=self._restart
308313
)
@@ -1445,6 +1450,9 @@ def _on_config_changed(self, event) -> None: # noqa: C901
14451450
# Update the sync-standby endpoint in the async replication data.
14461451
self.async_replication.update_async_replication_data()
14471452

1453+
if not self.logical_replication.apply_changed_config(event):
1454+
return
1455+
14481456
if not self.unit.is_leader():
14491457
return
14501458

@@ -1915,6 +1923,8 @@ def _on_update_status(self, _) -> None:
19151923

19161924
self.backup.coordinate_stanza_fields()
19171925

1926+
self.logical_replication.retry_validations()
1927+
19181928
self._set_primary_status_message()
19191929

19201930
# Restart topology observer if it is gone
@@ -2004,7 +2014,11 @@ def _can_run_on_update_status(self) -> bool:
20042014
logger.debug("Early exit on_update_status: Refresh in progress")
20052015
return False
20062016

2007-
if self.is_blocked and self.unit.status not in S3_BLOCK_MESSAGES:
2017+
if (
2018+
self.is_blocked
2019+
and self.unit.status not in S3_BLOCK_MESSAGES
2020+
and self.unit.status.message != LOGICAL_REPLICATION_VALIDATION_ERROR_STATUS
2021+
):
20082022
# If charm was failing to disable plugin, try again (user may have removed the objects)
20092023
if self.unit.status.message == EXTENSION_OBJECT_MESSAGE:
20102024
self.enable_disable_extensions()
@@ -2048,6 +2062,12 @@ def _set_primary_status_message(self) -> None:
20482062
BlockedStatus(self.app_peer_data["s3-initialization-block-message"])
20492063
)
20502064
return
2065+
if self.unit.is_leader() and (
2066+
self.app_peer_data.get("logical-replication-validation") == "error"
2067+
or self.logical_replication.has_remote_publisher_errors()
2068+
):
2069+
self.unit.status = BlockedStatus(LOGICAL_REPLICATION_VALIDATION_ERROR_STATUS)
2070+
return
20512071
if (
20522072
self._patroni.get_primary(unit_name_pattern=True) == self.unit.name
20532073
or self.is_standby_leader
@@ -2295,6 +2315,8 @@ def update_config(
22952315
self.model.config, self.get_available_memory(), limit_memory
22962316
)
22972317

2318+
replication_slots = self.logical_replication.replication_slots()
2319+
22982320
# Update and reload configuration based on TLS files availability.
22992321
self._patroni.render_patroni_yml_file(
23002322
connectivity=self.is_connectivity_enabled,
@@ -2310,6 +2332,7 @@ def update_config(
23102332
parameters=pg_parameters,
23112333
no_peers=no_peers,
23122334
user_databases_map=self.relations_user_databases_map,
2335+
slots=replication_slots or None,
23132336
)
23142337
if no_peers:
23152338
return True
@@ -2356,6 +2379,8 @@ def update_config(
23562379
"wal_keep_size": self.config.durability_wal_keep_size,
23572380
})
23582381

2382+
self._patroni.ensure_slots_controller_by_patroni(replication_slots)
2383+
23592384
self._handle_postgresql_restart_need()
23602385

23612386
cache = snap.SnapCache()
@@ -2454,18 +2479,36 @@ def client_relations(self) -> list[Relation]:
24542479
@property
24552480
def relations_user_databases_map(self) -> dict:
24562481
"""Returns a user->databases map for all relations."""
2457-
if not self.is_cluster_initialised or not self._patroni.member_started:
2458-
return {USER: "all", REPLICATION_USER: "all", REWIND_USER: "all"}
24592482
user_database_map = {}
2483+
# Copy relations users directly instead of waiting for them to be created
2484+
for relation in self.model.relations[self.postgresql_client_relation.relation_name]:
2485+
user = f"relation-{relation.id}"
2486+
if user not in user_database_map and (
2487+
database := self.postgresql_client_relation.database_provides.fetch_relation_field(
2488+
relation.id, "database"
2489+
)
2490+
):
2491+
user_database_map[user] = database
2492+
if not self.is_cluster_initialised or not self._patroni.member_started:
2493+
user_database_map.update({
2494+
USER: "all",
2495+
REPLICATION_USER: "all",
2496+
REWIND_USER: "all",
2497+
})
2498+
return user_database_map
24602499
try:
24612500
for user in self.postgresql.list_users_from_relation(
24622501
current_host=self.is_connectivity_enabled
24632502
):
2464-
user_database_map[user] = ",".join(
2503+
databases = ",".join(
24652504
self.postgresql.list_accessible_databases_for_user(
24662505
user, current_host=self.is_connectivity_enabled
24672506
)
24682507
)
2508+
if databases:
2509+
user_database_map[user] = databases
2510+
else:
2511+
logger.debug(f"User {user} has no databases to connect to")
24692512
# Add "landscape" superuser by default to the list when the "db-admin" relation is present.
24702513
if any(True for relation in self.client_relations if relation.name == "db-admin"):
24712514
user_database_map["landscape"] = "all"
@@ -2477,17 +2520,6 @@ def relations_user_databases_map(self) -> dict:
24772520
REPLICATION_USER: "all",
24782521
REWIND_USER: "all",
24792522
})
2480-
2481-
# Copy relations users directly instead of waiting for them to be created
2482-
for relation in self.model.relations[self.postgresql_client_relation.relation_name]:
2483-
user = f"relation-{relation.id}"
2484-
if user not in user_database_map and (
2485-
database
2486-
:= self.postgresql_client_relation.database_provides.fetch_relation_field(
2487-
relation.id, "database"
2488-
)
2489-
):
2490-
user_database_map[user] = database
24912523
return user_database_map
24922524
except PostgreSQLListUsersError:
24932525
logger.debug("relations_user_databases_map: Unable to get users")

src/cluster.py

Lines changed: 38 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -619,6 +619,7 @@ def render_patroni_yml_file(
619619
parameters: dict[str, str] | None = None,
620620
no_peers: bool = False,
621621
user_databases_map: dict[str, str] | None = None,
622+
slots: dict[str, str] | None = None,
622623
) -> None:
623624
"""Render the Patroni configuration file.
624625
@@ -637,6 +638,7 @@ def render_patroni_yml_file(
637638
parameters: PostgreSQL parameters to be added to the postgresql.conf file.
638639
no_peers: Don't include peers.
639640
user_databases_map: map of databases to be accessible by each user.
641+
slots: replication slots (keys) with assigned database name (values).
640642
"""
641643
# Open the template patroni.yml file.
642644
with open("templates/patroni.yml.j2") as file:
@@ -686,6 +688,7 @@ def render_patroni_yml_file(
686688
ldap_parameters=self._dict_to_hba_string(ldap_params),
687689
patroni_password=self.patroni_password,
688690
user_databases_map=user_databases_map,
691+
slots=slots,
689692
)
690693
self.render_file(f"{PATRONI_CONF_PATH}/patroni.yaml", rendered, 0o600)
691694

@@ -1002,6 +1005,41 @@ def bulk_update_parameters_controller_by_patroni(self, parameters: dict[str, Any
10021005
timeout=PATRONI_TIMEOUT,
10031006
)
10041007

1008+
def ensure_slots_controller_by_patroni(self, slots: dict[str, str]) -> None:
1009+
"""Synchronises slots controlled by Patroni with the provided state by removing unneeded slots and creating new ones.
1010+
1011+
Args:
1012+
slots: dictionary of slots in the {slot: database} format.
1013+
"""
1014+
for attempt in Retrying(stop=stop_after_delay(60), wait=wait_fixed(3), reraise=True):
1015+
with attempt:
1016+
current_config = requests.get(
1017+
f"{self._patroni_url}/config",
1018+
verify=self.verify,
1019+
timeout=PATRONI_TIMEOUT,
1020+
auth=self._patroni_auth,
1021+
)
1022+
if current_config.status_code != 200:
1023+
raise Exception(
1024+
f"Failed to get current Patroni config: {current_config.status_code} {current_config.text}"
1025+
)
1026+
slots_patch: dict[str, dict[str, str] | None] = dict.fromkeys(
1027+
current_config.json().get("slots", ())
1028+
)
1029+
for slot, database in slots.items():
1030+
slots_patch[slot] = {
1031+
"database": database,
1032+
"plugin": "pgoutput",
1033+
"type": "logical",
1034+
}
1035+
requests.patch(
1036+
f"{self._patroni_url}/config",
1037+
verify=self.verify,
1038+
json={"slots": slots_patch},
1039+
auth=self._patroni_auth,
1040+
timeout=PATRONI_TIMEOUT,
1041+
)
1042+
10051043
@property
10061044
def _synchronous_node_count(self) -> int:
10071045
planned_units = self.charm.app.planned_units()

src/config.py

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -52,6 +52,7 @@ class CharmConfig(BaseConfigModel):
5252
logging_log_lock_waits: bool | None = Field(default=None)
5353
logging_log_min_duration_statement: int | None = Field(ge=-1, le=2147483647, default=None)
5454
logging_track_functions: Literal["none", "pl", "all"] | None = Field(default=None)
55+
logical_replication_subscription_request: str | None
5556
memory_maintenance_work_mem: int | None = Field(ge=1024, le=2147483647, default=None)
5657
memory_max_prepared_transactions: int | None = Field(ge=0, le=262143, default=None)
5758
memory_shared_buffers: int | None = Field(ge=16, le=1073741823, default=None)

0 commit comments

Comments
 (0)