Skip to content

Commit e3a249b

Browse files
authored
[DPE-7594] Sync up pg_hba changes and remove trigger (#1051)
* Sync relations with pg_hba * Port from K8s * Increase timeouts * Switch observer to httpx * Bump coverage * Tactical sleep * Try to clean up triggers * Use edge for spaces test * Blocking test app * Wrong host * Drop second trigger
1 parent 008cb9f commit e3a249b

22 files changed

+274
-308
lines changed

lib/charms/postgresql_k8s/v1/postgresql.py

Lines changed: 37 additions & 85 deletions
Original file line numberDiff line numberDiff line change
@@ -898,91 +898,6 @@ def set_up_database(self, temp_location: Optional[str] = None) -> None:
898898
self.set_up_login_hook_function()
899899
self.set_up_predefined_catalog_roles_function()
900900

901-
# Create database function and event trigger to identify users created by PgBouncer.
902-
cursor.execute("""
903-
CREATE OR REPLACE FUNCTION update_pg_hba()
904-
RETURNS event_trigger
905-
LANGUAGE plpgsql
906-
AS $$
907-
DECLARE
908-
temp_schema TEXT;
909-
hba_file TEXT;
910-
copy_command TEXT;
911-
connection_type TEXT;
912-
rec record;
913-
insert_value TEXT;
914-
changes INTEGER = 0;
915-
BEGIN
916-
-- Don't execute on replicas.
917-
IF NOT pg_is_in_recovery() THEN
918-
-- Load the current authorisation rules.
919-
SELECT nspname INTO temp_schema FROM pg_namespace WHERE oid = pg_my_temp_schema();
920-
IF temp_schema != '' THEN
921-
PERFORM TRUE FROM pg_tables WHERE schemaname = temp_schema AND tablename = 'pg_hba';
922-
IF FOUND THEN
923-
DROP TABLE pg_hba;
924-
END IF;
925-
PERFORM TRUE FROM pg_tables WHERE schemaname = temp_schema AND tablename = 'relation_users';
926-
IF FOUND THEN
927-
DROP TABLE relation_users;
928-
END IF;
929-
END IF;
930-
CREATE TEMPORARY TABLE pg_hba (lines TEXT);
931-
SELECT setting INTO hba_file FROM pg_settings WHERE name = 'hba_file';
932-
IF hba_file IS NOT NULL THEN
933-
copy_command='COPY pg_hba FROM ''' || hba_file || '''' ;
934-
EXECUTE copy_command;
935-
-- Build a list of the relation users and the databases they can access.
936-
CREATE TEMPORARY TABLE relation_users AS
937-
SELECT t.user, STRING_AGG(DISTINCT t.database, ',') AS databases FROM( SELECT u.usename AS user, CASE WHEN u.usesuper THEN 'all' ELSE d.datname END AS database FROM ( SELECT usename, usesuper FROM pg_catalog.pg_user WHERE usename NOT IN ('backup', 'monitoring', 'operator', 'postgres', 'replication', 'rewind')) AS u JOIN ( SELECT datname FROM pg_catalog.pg_database WHERE NOT datistemplate ) AS d ON has_database_privilege(u.usename, d.datname, 'CONNECT') ) AS t GROUP BY 1;
938-
IF (SELECT COUNT(lines) FROM pg_hba WHERE lines LIKE 'hostssl %') > 0 THEN
939-
connection_type := 'hostssl';
940-
ELSE
941-
connection_type := 'host';
942-
END IF;
943-
-- Add the new users to the pg_hba file.
944-
FOR rec IN SELECT * FROM relation_users
945-
LOOP
946-
insert_value := connection_type || ' ' || rec.databases || ' ' || rec.user || ' 0.0.0.0/0 md5';
947-
IF (SELECT COUNT(lines) FROM pg_hba WHERE lines = insert_value) = 0 THEN
948-
INSERT INTO pg_hba (lines) VALUES (insert_value);
949-
changes := changes + 1;
950-
END IF;
951-
END LOOP;
952-
-- Remove users that don't exist anymore from the pg_hba file.
953-
FOR rec IN SELECT h.lines FROM pg_hba AS h LEFT JOIN relation_users AS r ON SPLIT_PART(h.lines, ' ', 3) = r.user WHERE r.user IS NULL AND (SPLIT_PART(h.lines, ' ', 3) LIKE 'relation_id_%' OR SPLIT_PART(h.lines, ' ', 3) LIKE 'pgbouncer_auth_relation_%' OR SPLIT_PART(h.lines, ' ', 3) LIKE '%_user_%_%')
954-
LOOP
955-
DELETE FROM pg_hba WHERE lines = rec.lines;
956-
changes := changes + 1;
957-
END LOOP;
958-
-- Apply the changes to the pg_hba file.
959-
IF changes > 0 THEN
960-
copy_command='COPY pg_hba TO ''' || hba_file || '''' ;
961-
EXECUTE copy_command;
962-
PERFORM pg_reload_conf();
963-
END IF;
964-
END IF;
965-
END IF;
966-
END;
967-
$$ SECURITY DEFINER;
968-
""")
969-
cursor.execute(
970-
"SELECT TRUE FROM pg_event_trigger WHERE evtname = 'update_pg_hba_on_create_schema';"
971-
)
972-
if cursor.fetchone() is None:
973-
cursor.execute("""
974-
CREATE EVENT TRIGGER update_pg_hba_on_create_schema
975-
ON ddl_command_end
976-
WHEN TAG IN ('CREATE SCHEMA')
977-
EXECUTE FUNCTION update_pg_hba();
978-
""")
979-
cursor.execute("""
980-
CREATE EVENT TRIGGER update_pg_hba_on_drop_schema
981-
ON ddl_command_end
982-
WHEN TAG IN ('DROP SCHEMA')
983-
EXECUTE FUNCTION update_pg_hba();
984-
""")
985-
986901
connection.close()
987902
connection = None
988903

@@ -1403,3 +1318,40 @@ def is_user_in_hba(self, username: str) -> bool:
14031318
finally:
14041319
if connection:
14051320
connection.close()
1321+
1322+
def drop_hba_triggers(self) -> None:
1323+
"""Drop pg_hba triggers on schema change."""
1324+
try:
1325+
with self._connect_to_database() as connection, connection.cursor() as cursor:
1326+
cursor.execute(
1327+
SQL(
1328+
"SELECT datname FROM pg_database WHERE datname <> 'template0' AND datname <>'postgres';"
1329+
)
1330+
)
1331+
databases = [row[0] for row in cursor.fetchall()]
1332+
except psycopg2.Error as e:
1333+
logger.warning(f"Failed to get databases when removing hba trigger: {e}")
1334+
return
1335+
finally:
1336+
if connection:
1337+
connection.close()
1338+
1339+
# Existing objects need to be reassigned in each database
1340+
# before the user can be deleted.
1341+
1342+
for database in databases:
1343+
try:
1344+
with self._connect_to_database(
1345+
database
1346+
) as connection, connection.cursor() as cursor:
1347+
cursor.execute(
1348+
SQL("DROP EVENT TRIGGER IF EXISTS update_pg_hba_on_create_schema;")
1349+
)
1350+
cursor.execute(
1351+
SQL("DROP EVENT TRIGGER IF EXISTS update_pg_hba_on_drop_schema;")
1352+
)
1353+
except psycopg2.Error as e:
1354+
logger.warning(f"Failed to remove hba trigger for {database}: {e}")
1355+
finally:
1356+
if connection:
1357+
connection.close()

scripts/cluster_topology_observer.py

Lines changed: 37 additions & 68 deletions
Original file line numberDiff line numberDiff line change
@@ -3,25 +3,25 @@
33

44
"""Cluster topology changes observer."""
55

6-
import json
76
import subprocess
87
import sys
9-
from asyncio import as_completed, get_running_loop, run, wait
8+
from asyncio import as_completed, create_task, run, wait
109
from contextlib import suppress
11-
from os import environ
1210
from ssl import create_default_context
1311
from time import sleep
1412
from urllib.parse import urljoin
15-
from urllib.request import urlopen
1613

14+
import psycopg2
1715
import yaml
16+
from httpx import AsyncClient
1817

1918
API_REQUEST_TIMEOUT = 5
2019
PATRONI_CLUSTER_STATUS_ENDPOINT = "cluster"
2120
TLS_CA_BUNDLE_FILE = "peer_ca_bundle.pem"
2221
SNAP_CURRENT_PATH = "/var/snap/charmed-postgresql/current"
2322
SNAP_CONF_PATH = f"{SNAP_CURRENT_PATH}/etc"
2423
PATRONI_CONF_PATH = f"{SNAP_CONF_PATH}/patroni"
24+
PATRONI_CONF_FILE_PATH = f"{PATRONI_CONF_PATH}/patroni.yaml"
2525

2626
# File path for the spawned cluster topology observer process to write logs.
2727
LOG_FILE_PATH = "/var/log/cluster_topology_observer.log"
@@ -31,40 +31,16 @@ class UnreachableUnitsError(Exception):
3131
"""Cannot reach any known cluster member."""
3232

3333

34-
def call_url(url, context):
35-
"""Task handler for calling an url."""
36-
try:
37-
# Scheme is generated by the charm
38-
resp = urlopen( # noqa: S310
39-
url,
40-
timeout=API_REQUEST_TIMEOUT,
41-
context=context,
42-
)
43-
return json.loads(resp.read())
44-
except Exception as e:
45-
print(f"Failed to contact {url} with {e}")
46-
47-
48-
def check_for_authorisation_rules_changes(run_cmd, unit, charm_dir, previous_authorisation_rules):
49-
"""Check for changes in the authorisation rules.
50-
51-
If changes are detected, dispatch an event to handle them.
52-
"""
53-
# Read contents from the pg_hba.conf file.
54-
with open("/var/snap/charmed-postgresql/common/var/lib/postgresql/pg_hba.conf") as file:
55-
current_authorisation_rules = file.read()
56-
current_authorisation_rules = [
57-
line for line in current_authorisation_rules.splitlines() if not line.startswith("#")
58-
]
59-
# If it's the first time the authorisation rules were retrieved, then store it and use
60-
# it for subsequent checks.
61-
if not previous_authorisation_rules:
62-
previous_authorisation_rules = current_authorisation_rules
63-
# If the authorisation rules changed, dispatch a charm event to handle this change.
64-
elif current_authorisation_rules != previous_authorisation_rules:
65-
previous_authorisation_rules = current_authorisation_rules
66-
dispatch(run_cmd, unit, charm_dir, "authorisation_rules_change")
67-
return previous_authorisation_rules
34+
async def _httpx_get_request(url: str):
35+
ssl_ctx = create_default_context()
36+
with suppress(FileNotFoundError):
37+
ssl_ctx.load_verify_locations(cafile=f"{PATRONI_CONF_PATH}/{TLS_CA_BUNDLE_FILE}")
38+
async with AsyncClient(timeout=API_REQUEST_TIMEOUT, verify=ssl_ctx) as client:
39+
try:
40+
return (await client.get(url)).json()
41+
except Exception as e:
42+
print(f"Failed to contact {url} with {e}")
43+
return None
6844

6945

7046
def check_for_cluster_topology_changes(
@@ -90,34 +66,29 @@ def check_for_database_changes(run_cmd, unit, charm_dir, previous_databases):
9066
9167
If changes are detected, dispatch an event to handle them.
9268
"""
93-
conf_file_path = "/var/snap/charmed-postgresql/current/etc/patroni/patroni.yaml"
94-
with open(conf_file_path) as conf_file:
69+
with open(PATRONI_CONF_FILE_PATH) as conf_file:
9570
conf_file_contents = yaml.safe_load(conf_file)
96-
username = conf_file_contents["postgresql"]["authentication"]["superuser"]["username"]
9771
password = conf_file_contents["postgresql"]["authentication"]["superuser"]["password"]
98-
env = environ.copy()
99-
env["PGPASSWORD"] = password
100-
# Fake cert location for patronictl
101-
env["PGSSLCERT"] = "/var/snap/charmed-postgresql/current/etc/patroni/nonexistent_cert.pem"
102-
command = [
103-
"sudo",
104-
"-E",
105-
"-H",
106-
"charmed-postgresql.patronictl",
107-
"-c",
108-
conf_file_path,
109-
"query",
110-
"-c",
111-
"SELECT datname,datacl FROM pg_database;",
112-
"-U",
113-
username,
114-
"-d",
115-
"postgres",
116-
]
72+
listen_addr = conf_file_contents["postgresql"]["listen"]
73+
if not listen_addr or ":" not in listen_addr:
74+
with open(LOG_FILE_PATH, "a") as log_file:
75+
log_file.write("Failed to retrieve databases: Host not set yet.\n")
76+
return previous_databases
77+
host, _ = listen_addr.split(":")
78+
79+
connection = None
11780
try:
11881
# Input is generated by the charm
119-
current_databases = subprocess.check_output(command, env=env) # noqa: S603
120-
except subprocess.CalledProcessError as e:
82+
with (
83+
psycopg2.connect(
84+
f"dbname='postgres' user='operator' host='{host}' "
85+
f"password='{password}' connect_timeout=1"
86+
) as connection,
87+
connection.cursor() as cursor,
88+
):
89+
cursor.execute("SELECT datname, datacl FROM pg_database;")
90+
current_databases = cursor.fetchall()
91+
except psycopg2.Error as e:
12192
with open(LOG_FILE_PATH, "a") as log_file:
12293
log_file.write(f"Failed to retrieve databases: {e}\n")
12394
return previous_databases
@@ -131,6 +102,9 @@ def check_for_database_changes(run_cmd, unit, charm_dir, previous_databases):
131102
previous_databases = current_databases
132103
dispatch(run_cmd, unit, charm_dir, "databases_change")
133104
return previous_databases
105+
finally:
106+
if connection:
107+
connection.close()
134108

135109

136110
def dispatch(run_cmd, unit, charm_dir, custom_event):
@@ -148,7 +122,6 @@ async def main():
148122
patroni_urls, run_cmd, unit, charm_dir = sys.argv[1:]
149123

150124
previous_cluster_topology = {}
151-
previous_authorisation_rules = []
152125
previous_databases = None
153126
urls = [urljoin(url, PATRONI_CLUSTER_STATUS_ENDPOINT) for url in patroni_urls.split(",")]
154127
member_name = unit.replace("/", "-")
@@ -159,8 +132,7 @@ async def main():
159132
context.load_verify_locations(cafile=f"{PATRONI_CONF_PATH}/{TLS_CA_BUNDLE_FILE}")
160133

161134
cluster_status = None
162-
loop = get_running_loop()
163-
tasks = [loop.run_in_executor(None, call_url, url, context) for url in urls]
135+
tasks = [create_task(_httpx_get_request(url)) for url in urls]
164136
for task in as_completed(tasks):
165137
if result := await task:
166138
for task in tasks:
@@ -190,9 +162,6 @@ async def main():
190162
)
191163

192164
if is_primary:
193-
previous_authorisation_rules = check_for_authorisation_rules_changes(
194-
run_cmd, unit, charm_dir, previous_authorisation_rules
195-
)
196165
previous_databases = check_for_database_changes(
197166
run_cmd, unit, charm_dir, previous_databases
198167
)

0 commit comments

Comments
 (0)