Skip to content

Commit 59b58e9

Browse files
[MISC] Parallel patroni calls (#925)
* Switch to pydantic 2 * Initial tls v4 * Linting * IP sans * Add ips to dns sans * Add back cert update on ip change * Spaces addresses * Add peer cert relation * Fix rel check * Revert to reload on cert change * Get CA from the correct tls relation * Apply suggestions from code review Co-authored-by: Marcelo Henrique Neppel <[email protected]> * Add optimizer_cpu_tuple_cost constraints * Block on missing TLS rel * Fix tls test * Wrong key * Async patroni wip * Dont update on rel mismatch * Parallel requests * Disable unit tests * Async checks * Linting * Try to verify cert * Reenable network cut for arm * Reduce httpx logging * Try wait first completed * Replace httpx with aiohttp * Add back alternative endpoints and coro as_completed * Session for each async request * Back to tasks * Remove JujuVersion warning * Split tls enabled flags * Sync to dpl repo * Initial parallel observer * Bump lib and fix peer enablement * Peer checks * Internal cert * Fix internal ca check * Try not to deffer peer change * Missed http calls * Peer CAs bundle for requests * Patroni magic config * Magic config for other users * Disable upgrade tests * Cache old cas * Remove logger * Fix charm int test * Correct schema and tls unit test * Try to deffer if no certs * Handle Retry errors * Update libs * Revert cluster changes * Try getting alternative endpoints * Move ip change block before conf validation * Try to update IPs after potential deferrals * Update log message * Revert IP update tweaks * Remove client cert * Revert "Remove client cert" This reverts commit 9ca2287. * Squashed commit of the following: commit da3dd59 Author: Dragomir Penev <[email protected]> Date: Wed Jun 4 01:05:15 2025 +0300 Add sleep interval commit 141efaf Author: Dragomir Penev <[email protected]> Date: Wed Jun 4 00:35:32 2025 +0300 Don't defer on raft removal commit bcecb8c Author: Dragomir Penev <[email protected]> Date: Tue Jun 3 23:44:30 2025 +0300 Log raft removal error commit d12515f Author: Dragomir Penev <[email protected]> Date: Tue Jun 3 18:17:40 2025 +0300 Use peer addrs directly commit 03e5031 Author: Dragomir Penev <[email protected]> Date: Tue Jun 3 16:47:54 2025 +0300 Use peer cert and key commit 762a9e3 Author: Dragomir Penev <[email protected]> Date: Tue Jun 3 15:56:19 2025 +0300 Disable unit tests commit ee091df Author: Dragomir Penev <[email protected]> Date: Tue Jun 3 15:50:40 2025 +0300 Try to suppress update status commit 24cdb8f Merge: f3befdb fb27850 Author: Dragomir Penev <[email protected]> Date: Tue Jun 3 15:49:31 2025 +0300 Merge branch 'tlsv4' into tlsv4-conditional-validation commit f3befdb Author: Dragomir Penev <[email protected]> Date: Tue Jun 3 14:29:01 2025 +0300 Try to wait for idle again commit a9e9d50 Author: Dragomir Penev <[email protected]> Date: Tue Jun 3 13:42:06 2025 +0300 Bump timeout commit 90a8f7d Author: Dragomir Penev <[email protected]> Date: Mon Jun 2 23:59:12 2025 +0300 Add reraising commit aa6a1ea Author: Dragomir Penev <[email protected]> Date: Mon Jun 2 21:36:39 2025 +0300 Try bumping timeout commit cf6e998 Author: Dragomir Penev <[email protected]> Date: Mon Jun 2 18:49:36 2025 +0300 Try to update ips first commit 08cea05 Author: Dragomir Penev <[email protected]> Date: Mon Jun 2 17:29:06 2025 +0300 Try to update conf on IP change without validation commit 6840707 Author: Dragomir Penev <[email protected]> Date: Mon Jun 2 16:55:30 2025 +0300 Try to log the exception commit e67489b Author: Dragomir Penev <[email protected]> Date: Mon Jun 2 16:35:00 2025 +0300 Debug network cut * Switch back to httpx * Fix httpx * Retry error when unable to reach cluster status * Re-enable upgrade tests * Try to mute asyncio message --------- Co-authored-by: Marcelo Henrique Neppel <[email protected]>
1 parent 03b1d71 commit 59b58e9

File tree

8 files changed

+149
-139
lines changed

8 files changed

+149
-139
lines changed

poetry.lock

Lines changed: 3 additions & 1 deletion
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

pyproject.toml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@ jinja2 = "^3.1.6"
1818
pysyncobj = "^0.3.14"
1919
psutil = "^7.0.0"
2020
charm-refresh = "^3.0.0.3"
21+
httpx = "^0.28.1"
2122

2223
[tool.poetry.group.charm-libs.dependencies]
2324
# data_platform_libs/v0/data_interfaces.py

scripts/cluster_topology_observer.py

Lines changed: 33 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -6,8 +6,10 @@
66
import json
77
import subprocess
88
import sys
9+
from asyncio import as_completed, get_running_loop, run, wait
10+
from contextlib import suppress
911
from os import environ
10-
from ssl import CERT_NONE, create_default_context
12+
from ssl import create_default_context
1113
from time import sleep
1214
from urllib.parse import urljoin
1315
from urllib.request import urlopen
@@ -16,6 +18,10 @@
1618

1719
API_REQUEST_TIMEOUT = 5
1820
PATRONI_CLUSTER_STATUS_ENDPOINT = "cluster"
21+
TLS_CA_BUNDLE_FILE = "peer_ca_bundle.pem"
22+
SNAP_CURRENT_PATH = "/var/snap/charmed-postgresql/current"
23+
SNAP_CONF_PATH = f"{SNAP_CURRENT_PATH}/etc"
24+
PATRONI_CONF_PATH = f"{SNAP_CONF_PATH}/patroni"
1925

2026
# File path for the spawned cluster topology observer process to write logs.
2127
LOG_FILE_PATH = "/var/log/cluster_topology_observer.log"
@@ -25,6 +31,20 @@ class UnreachableUnitsError(Exception):
2531
"""Cannot reach any known cluster member."""
2632

2733

34+
def call_url(url, context):
35+
"""Task handler for calling an url."""
36+
try:
37+
# Scheme is generated by the charm
38+
resp = urlopen( # noqa: S310
39+
url,
40+
timeout=API_REQUEST_TIMEOUT,
41+
context=context,
42+
)
43+
return json.loads(resp.read())
44+
except Exception as e:
45+
print(f"Failed to contact {url} with {e}")
46+
47+
2848
def check_for_authorisation_rules_changes(run_cmd, unit, charm_dir, previous_authorisation_rules):
2949
"""Check for changes in the authorisation rules.
3050
@@ -120,7 +140,7 @@ def dispatch(run_cmd, unit, charm_dir, custom_event):
120140
subprocess.run([run_cmd, "-u", unit, dispatch_sub_cmd.format(custom_event, charm_dir)]) # noqa: S603
121141

122142

123-
def main():
143+
async def main():
124144
"""Main watch and dispatch loop.
125145
126146
Watch the Patroni API cluster info. When changes are detected, dispatch the change event.
@@ -135,23 +155,19 @@ def main():
135155
while True:
136156
# Disable TLS chain verification
137157
context = create_default_context()
138-
context.check_hostname = False
139-
context.verify_mode = CERT_NONE
158+
with suppress(FileNotFoundError):
159+
context.load_verify_locations(cafile=f"{PATRONI_CONF_PATH}/{TLS_CA_BUNDLE_FILE}")
140160

141161
cluster_status = None
142-
for url in urls:
143-
try:
144-
# Scheme is generated by the charm
145-
resp = urlopen( # noqa: S310
146-
url,
147-
timeout=API_REQUEST_TIMEOUT,
148-
context=context,
149-
)
150-
cluster_status = json.loads(resp.read())
162+
loop = get_running_loop()
163+
tasks = [loop.run_in_executor(None, call_url, url, context) for url in urls]
164+
for task in as_completed(tasks):
165+
if result := await task:
166+
for task in tasks:
167+
task.cancel()
168+
await wait(tasks)
169+
cluster_status = result
151170
break
152-
except Exception as e:
153-
print(f"Failed to contact {url} with {e}")
154-
continue
155171
if not cluster_status:
156172
raise UnreachableUnitsError("Unable to reach cluster members")
157173
current_cluster_topology = {}
@@ -186,4 +202,4 @@ def main():
186202

187203

188204
if __name__ == "__main__":
189-
main()
205+
run(main())

src/charm.py

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -130,6 +130,7 @@
130130
logger = logging.getLogger(__name__)
131131
logging.getLogger("httpx").setLevel(logging.WARNING)
132132
logging.getLogger("httpcore").setLevel(logging.WARNING)
133+
logging.getLogger("asyncio").setLevel(logging.WARNING)
133134

134135
PRIMARY_NOT_REACHABLE_MESSAGE = "waiting for primary to be reachable from this unit"
135136
EXTENSIONS_DEPENDENCY_MESSAGE = "Unsatisfied plugin dependencies. Please check the logs"

src/cluster.py

Lines changed: 62 additions & 52 deletions
Original file line numberDiff line numberDiff line change
@@ -11,18 +11,21 @@
1111
import re
1212
import shutil
1313
import subprocess
14+
from asyncio import as_completed, create_task, run, wait
15+
from contextlib import suppress
1416
from pathlib import Path
17+
from ssl import CERT_NONE, create_default_context
1518
from typing import TYPE_CHECKING, Any, TypedDict
1619

1720
import charm_refresh
1821
import psutil
1922
import requests
2023
from charms.operator_libs_linux.v2 import snap
24+
from httpx import AsyncClient, BasicAuth, HTTPError
2125
from jinja2 import Template
2226
from ops import BlockedStatus
2327
from pysyncobj.utility import TcpUtility, UtilityException
2428
from tenacity import (
25-
AttemptManager,
2629
RetryError,
2730
Retrying,
2831
retry,
@@ -172,6 +175,10 @@ def __init__(
172175
def _patroni_auth(self) -> requests.auth.HTTPBasicAuth:
173176
return requests.auth.HTTPBasicAuth("patroni", self.patroni_password)
174177

178+
@property
179+
def _patroni_async_auth(self) -> BasicAuth:
180+
return BasicAuth("patroni", password=self.patroni_password)
181+
175182
@property
176183
def _patroni_url(self) -> str:
177184
"""Patroni REST API URL."""
@@ -249,28 +256,14 @@ def get_postgresql_version(self) -> str:
249256
if snp["name"] == charm_refresh.snap_name():
250257
return snp["version"]
251258

252-
def cluster_status(
253-
self, alternative_endpoints: list | None = None
254-
) -> list[ClusterMember] | None:
259+
def cluster_status(self, alternative_endpoints: list | None = None) -> list[ClusterMember]:
255260
"""Query the cluster status."""
256261
# Request info from cluster endpoint (which returns all members of the cluster).
257-
# TODO we don't know the other cluster's ca
258-
verify = self.verify if not alternative_endpoints else False
259-
for attempt in Retrying(
260-
stop=stop_after_attempt(
261-
len(alternative_endpoints) if alternative_endpoints else len(self.peers_ips)
262-
)
262+
if response := self.parallel_patroni_get_request(
263+
f"/{PATRONI_CLUSTER_STATUS_ENDPOINT}", alternative_endpoints
263264
):
264-
with attempt:
265-
request_url = self._get_alternative_patroni_url(attempt, alternative_endpoints)
266-
267-
cluster_status = requests.get(
268-
f"{request_url}/{PATRONI_CLUSTER_STATUS_ENDPOINT}",
269-
verify=verify,
270-
timeout=API_REQUEST_TIMEOUT,
271-
auth=self._patroni_auth,
272-
)
273-
return cluster_status.json()["members"]
265+
return response["members"]
266+
raise RetryError(last_attempt=Exception("Unable to reach any units"))
274267

275268
def get_member_ip(self, member_name: str) -> str | None:
276269
"""Get cluster member IP address.
@@ -281,13 +274,14 @@ def get_member_ip(self, member_name: str) -> str | None:
281274
Returns:
282275
IP address of the cluster member.
283276
"""
284-
cluster_status = self.cluster_status()
285-
if not cluster_status:
286-
return
277+
try:
278+
cluster_status = self.cluster_status()
287279

288-
for member in cluster_status:
289-
if member["name"] == member_name:
290-
return member["host"]
280+
for member in cluster_status:
281+
if member["name"] == member_name:
282+
return member["host"]
283+
except RetryError:
284+
logger.debug("Unable to get IP. Cluster status unreachable")
291285

292286
def get_member_status(self, member_name: str) -> str:
293287
"""Get cluster member status.
@@ -307,6 +301,44 @@ def get_member_status(self, member_name: str) -> str:
307301
return member["state"]
308302
return ""
309303

304+
async def _httpx_get_request(self, url: str, verify: bool = True):
305+
ssl_ctx = create_default_context()
306+
if verify:
307+
with suppress(FileNotFoundError):
308+
ssl_ctx.load_verify_locations(cafile=f"{PATRONI_CONF_PATH}/{TLS_CA_BUNDLE_FILE}")
309+
else:
310+
ssl_ctx.check_hostname = False
311+
ssl_ctx.verify_mode = CERT_NONE
312+
async with AsyncClient(
313+
auth=self._patroni_async_auth, timeout=API_REQUEST_TIMEOUT, verify=ssl_ctx
314+
) as client:
315+
try:
316+
return (await client.get(url)).json()
317+
except (HTTPError, ValueError):
318+
return None
319+
320+
async def _async_get_request(self, uri: str, endpoints: list[str], verify: bool = True):
321+
tasks = [
322+
create_task(self._httpx_get_request(f"https://{ip}:8008{uri}", verify))
323+
for ip in endpoints
324+
]
325+
for task in as_completed(tasks):
326+
if result := await task:
327+
for task in tasks:
328+
task.cancel()
329+
await wait(tasks)
330+
return result
331+
332+
def parallel_patroni_get_request(self, uri: str, endpoints: list[str] | None = None) -> dict:
333+
"""Call all possible patroni endpoints in parallel."""
334+
if not endpoints:
335+
endpoints = (self.unit_ip, *self.peers_ips)
336+
verify = True
337+
else:
338+
# TODO we don't know the other cluster's ca
339+
verify = False
340+
return run(self._async_get_request(uri, endpoints, verify))
341+
310342
def get_primary(
311343
self, unit_name_pattern=False, alternative_endpoints: list[str] | None = None
312344
) -> str | None:
@@ -320,14 +352,17 @@ def get_primary(
320352
primary pod or unit name.
321353
"""
322354
# Request info from cluster endpoint (which returns all members of the cluster).
323-
if cluster_status := self.cluster_status(alternative_endpoints):
355+
try:
356+
cluster_status = self.cluster_status(alternative_endpoints)
324357
for member in cluster_status:
325358
if member["role"] == "leader":
326359
primary = member["name"]
327360
if unit_name_pattern:
328361
# Change the last dash to / in order to match unit name pattern.
329362
primary = label2name(primary)
330363
return primary
364+
except RetryError:
365+
logger.debug("Unable to get primary. Cluster status unreachable")
331366

332367
def get_standby_leader(
333368
self, unit_name_pattern=False, check_whether_is_running: bool = False
@@ -366,31 +401,6 @@ def get_sync_standby_names(self) -> list[str]:
366401
sync_standbys.append(label2name(member["name"]))
367402
return sync_standbys
368403

369-
def _get_alternative_patroni_url(
370-
self, attempt: AttemptManager, alternative_endpoints: list[str] | None = None
371-
) -> str:
372-
"""Get an alternative REST API URL from another member each time.
373-
374-
When the Patroni process is not running in the current unit it's needed
375-
to use a URL from another cluster member REST API to do some operations.
376-
"""
377-
if alternative_endpoints is not None:
378-
return self._patroni_url.replace(
379-
self.unit_ip, alternative_endpoints[attempt.retry_state.attempt_number - 1]
380-
)
381-
attempt_number = attempt.retry_state.attempt_number
382-
if attempt_number > 1:
383-
url = self._patroni_url
384-
if (attempt_number - 1) <= len(self.peers_ips):
385-
unit_number = attempt_number - 2
386-
else:
387-
unit_number = attempt_number - 2 - len(self.peers_ips)
388-
other_unit_ip = list(self.peers_ips)[unit_number]
389-
url = url.replace(self.unit_ip, other_unit_ip)
390-
else:
391-
url = self._patroni_url
392-
return url
393-
394404
def are_all_members_ready(self) -> bool:
395405
"""Check if all members are correctly running Patroni and PostgreSQL.
396406

tests/unit/test_charm.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -602,7 +602,7 @@ def test_on_start(harness):
602602
patch(
603603
"charm.PostgresqlOperatorCharm._is_storage_attached",
604604
side_effect=[False, True, True, True, True, True],
605-
) as _is_storage_attached,
605+
),
606606
patch(
607607
"charm.PostgresqlOperatorCharm._can_connect_to_postgresql",
608608
new_callable=PropertyMock,

0 commit comments

Comments
 (0)