Skip to content

Testing on GH CI #1076

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Closed
wants to merge 8 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
21 changes: 3 additions & 18 deletions lib/charms/postgresql_k8s/v1/postgresql.py
Original file line number Diff line number Diff line change
Expand Up @@ -255,8 +255,10 @@ def _connect_to_database(
psycopg2 connection object.
"""
host = database_host if database_host is not None else self.primary_host
dbname = database if database else self.database
logger.debug(f"New DB connection: dbname='{dbname}' user='{self.user}' host='{host}' connect_timeout=1")
connection = psycopg2.connect(
f"dbname='{database if database else self.database}' user='{self.user}' host='{host}'"
f"dbname='{dbname}' user='{self.user}' host='{host}'"
f"password='{self.password}' connect_timeout=1"
)
connection.autocommit = True
Expand Down Expand Up @@ -1268,23 +1270,6 @@ def update_user_password(
if connection is not None:
connection.close()

def is_restart_pending(self) -> bool:
"""Query pg_settings for pending restart."""
connection = None
try:
with self._connect_to_database() as connection, connection.cursor() as cursor:
cursor.execute("SELECT COUNT(*) FROM pg_settings WHERE pending_restart=True;")
return cursor.fetchone()[0] > 0
except psycopg2.OperationalError:
logger.warning("Failed to connect to PostgreSQL.")
return False
except psycopg2.Error as e:
logger.error(f"Failed to check if restart is pending: {e}")
return False
finally:
if connection:
connection.close()

def database_exists(self, db: str) -> bool:
"""Check whether specified database exists."""
connection = None
Expand Down
166 changes: 95 additions & 71 deletions src/charm.py

Large diffs are not rendered by default.

112 changes: 96 additions & 16 deletions src/cluster.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
from contextlib import suppress
from pathlib import Path
from ssl import CERT_NONE, create_default_context
from time import sleep
from typing import TYPE_CHECKING, Any, TypedDict

import charm_refresh
Expand Down Expand Up @@ -197,7 +198,7 @@ def _dict_to_hba_string(_dict: dict[str, Any]) -> str:

def bootstrap_cluster(self) -> bool:
"""Bootstrap a PostgreSQL cluster using Patroni."""
# Render the configuration files and start the cluster.
logger.debug("bootstrap_cluster: render the configuration files and start Patroni")
self.configure_patroni_on_unit()
return self.start_patroni()

Expand Down Expand Up @@ -260,7 +261,7 @@ def cluster_status(self, alternative_endpoints: list | None = None) -> list[Clus
if response := self.parallel_patroni_get_request(
f"/{PATRONI_CLUSTER_STATUS_ENDPOINT}", alternative_endpoints
):
logger.debug("Patroni cluster members: %s", response["members"])
logger.debug("API cluster_status: %s", response["members"])
return response["members"]
raise RetryError(last_attempt=Exception("Unable to reach any units"))

Expand Down Expand Up @@ -436,9 +437,44 @@ def get_patroni_health(self) -> dict[str, str]:
timeout=API_REQUEST_TIMEOUT,
auth=self._patroni_auth,
)
logger.debug("API get_patroni_health: %s (%s)", r, r.elapsed.total_seconds())

return r.json()

def is_restart_pending(self) -> bool:
"""Returns whether the Patroni/PostgreSQL restart pending."""
# The current Patroni 3.2.2 has wired behaviour: it temporary flag pending_restart=True
# on any changes to REST API, which is gone within a second but long enough to be
# cougth by charm. Sleep 2 seconds as a protection here until Patroni 3.3.0 upgrade
pending_restart = self._get_patroni_restart_pending()
if pending_restart:
# The current Patroni 3.2.2 has wired behaviour: it temporary flag pending_restart=True
# on any changes to REST API, which is gone within a second but long enough to be
# cougth by charm. Sleep 2 seconds as a protection here until Patroni 3.3.0 upgrade.
# Repeat the request to make sure pending_restart flag is still here
sleep(2)
pending_restart = self._get_patroni_restart_pending()

return pending_restart

def _get_patroni_restart_pending(self) -> bool:
"""Returns whether the Patroni flag pending_restart on REST API."""
r = requests.get(
f"{self._patroni_url}/patroni",
verify=self.verify,
timeout=API_REQUEST_TIMEOUT,
auth=self._patroni_auth,
)

pending_restart = r.json().get("pending_restart", False)
logger.debug(
f"API is_restart_pending ({pending_restart}): %s (%s)",
r,
r.elapsed.total_seconds(),
)

return pending_restart

@property
def is_creating_backup(self) -> bool:
"""Returns whether a backup is being created."""
Expand Down Expand Up @@ -466,15 +502,20 @@ def is_replication_healthy(self) -> bool:
for members_ip in members_ips:
endpoint = "leader" if members_ip == primary_ip else "replica?lag=16kB"
url = self._patroni_url.replace(self.unit_ip, members_ip)
member_status = requests.get(
r = requests.get(
f"{url}/{endpoint}",
verify=self.verify,
auth=self._patroni_auth,
timeout=PATRONI_TIMEOUT,
)
if member_status.status_code != 200:
logger.debug(
"API is_replication_healthy: %s (%s)",
r,
r.elapsed.total_seconds(),
)
if r.status_code != 200:
logger.debug(
f"Failed replication check for {members_ip} with code {member_status.status_code}"
f"Failed replication check for {members_ip} with code {r.status_code}"
)
raise Exception
except RetryError:
Expand Down Expand Up @@ -527,17 +568,20 @@ def is_member_isolated(self) -> bool:
try:
for attempt in Retrying(stop=stop_after_delay(10), wait=wait_fixed(3)):
with attempt:
cluster_status = requests.get(
r = requests.get(
f"{self._patroni_url}/{PATRONI_CLUSTER_STATUS_ENDPOINT}",
verify=self.verify,
timeout=API_REQUEST_TIMEOUT,
auth=self._patroni_auth,
)
logger.debug(
"API is_member_isolated: %s (%s)", r["members"], r.elapsed.total_seconds()
)
except RetryError:
# Return False if it was not possible to get the cluster info. Try again later.
return False

return len(cluster_status.json()["members"]) == 0
return len(r.json()["members"]) == 0

def online_cluster_members(self) -> list[ClusterMember]:
"""Return list of online cluster members."""
Expand Down Expand Up @@ -568,15 +612,21 @@ def promote_standby_cluster(self) -> None:
auth=self._patroni_auth,
timeout=PATRONI_TIMEOUT,
)
logger.debug(
"API promote_standby_cluster: %s (%s)",
config_response,
config_response.elapsed.total_seconds(),
)
if "standby_cluster" not in config_response.json():
raise StandbyClusterAlreadyPromotedError("standby cluster is already promoted")
requests.patch(
r = requests.patch(
f"{self._patroni_url}/config",
verify=self.verify,
json={"standby_cluster": None},
auth=self._patroni_auth,
timeout=PATRONI_TIMEOUT,
)
logger.debug("API promote_standby_cluster patch: %s (%s)", r, r.elapsed.total_seconds())
for attempt in Retrying(stop=stop_after_delay(60), wait=wait_fixed(3)):
with attempt:
if self.get_primary() is None:
Expand Down Expand Up @@ -660,7 +710,7 @@ def render_patroni_yml_file(
partner_addrs=self.charm.async_replication.get_partner_addresses()
if not no_peers
else [],
peers_ips=self.peers_ips if not no_peers else set(),
peers_ips=sorted(self.peers_ips) if not no_peers else set(),
pgbackrest_configuration_file=PGBACKREST_CONFIGURATION_FILE,
scope=self.cluster_name,
self_ip=self.unit_ip,
Expand Down Expand Up @@ -699,6 +749,7 @@ def start_patroni(self) -> bool:
Whether the service started successfully.
"""
try:
logger.debug("Starting Patroni...")
cache = snap.SnapCache()
selected_snap = cache["charmed-postgresql"]
selected_snap.start(services=["patroni"])
Expand All @@ -718,6 +769,7 @@ def patroni_logs(self, num_lines: int | str | None = 10) -> str:
Multi-line logs string.
"""
try:
logger.debug("Getting Patroni logs...")
cache = snap.SnapCache()
selected_snap = cache["charmed-postgresql"]
return selected_snap.logs(services=["patroni"], num_lines=num_lines)
Expand Down Expand Up @@ -753,6 +805,7 @@ def stop_patroni(self) -> bool:
Whether the service stopped successfully.
"""
try:
logger.debug("Stopping Patroni...")
cache = snap.SnapCache()
selected_snap = cache["charmed-postgresql"]
selected_snap.stop(services=["patroni"])
Expand All @@ -764,7 +817,7 @@ def stop_patroni(self) -> bool:

def switchover(self, candidate: str | None = None) -> None:
"""Trigger a switchover."""
# Try to trigger the switchover.
logger.debug("Triggering the switchover to {candidate}")
for attempt in Retrying(stop=stop_after_delay(60), wait=wait_fixed(3)):
with attempt:
current_primary = self.get_primary()
Expand All @@ -778,6 +831,7 @@ def switchover(self, candidate: str | None = None) -> None:
auth=self._patroni_auth,
timeout=PATRONI_TIMEOUT,
)
logger.debug("API switchover: %s (%s)", r, r.elapsed.total_seconds())

# Check whether the switchover was unsuccessful.
if r.status_code != 200:
Expand Down Expand Up @@ -916,6 +970,7 @@ def remove_raft_member(self, member_ip: str) -> None:

# Leader doesn't always trigger when changing it's own peer data.
if self.charm.unit.is_leader():
logger.debug("Emitting peer_relation_changed on leader")
self.charm.on[PEER].relation_changed.emit(
unit=self.charm.unit,
app=self.charm.app,
Expand All @@ -938,12 +993,14 @@ def remove_raft_member(self, member_ip: str) -> None:
@retry(stop=stop_after_attempt(20), wait=wait_exponential(multiplier=1, min=2, max=10))
def reload_patroni_configuration(self):
"""Reload Patroni configuration after it was changed."""
requests.post(
logger.debug("Reloading Patroni configuration...")
r = requests.post(
f"{self._patroni_url}/reload",
verify=self.verify,
auth=self._patroni_auth,
timeout=PATRONI_TIMEOUT,
)
logger.debug("API reload_patroni_configuration: %s (%s)", r, r.elapsed.total_seconds())

def is_patroni_running(self) -> bool:
"""Check if the Patroni service is running."""
Expand All @@ -962,6 +1019,7 @@ def restart_patroni(self) -> bool:
Whether the service restarted successfully.
"""
try:
logger.debug("Restarting Patroni...")
cache = snap.SnapCache()
selected_snap = cache["charmed-postgresql"]
selected_snap.restart(services=["patroni"])
Expand All @@ -974,36 +1032,45 @@ def restart_patroni(self) -> bool:
@retry(stop=stop_after_attempt(3), wait=wait_exponential(multiplier=1, min=2, max=10))
def restart_postgresql(self) -> None:
"""Restart PostgreSQL."""
requests.post(
logger.debug("Restarting PostgreSQL...")
r = requests.post(
f"{self._patroni_url}/restart",
verify=self.verify,
auth=self._patroni_auth,
timeout=PATRONI_TIMEOUT,
)
logger.debug("API restart_postgresql: %s (%s)", r, r.elapsed.total_seconds())

@retry(stop=stop_after_attempt(3), wait=wait_exponential(multiplier=1, min=2, max=10))
def reinitialize_postgresql(self) -> None:
"""Reinitialize PostgreSQL."""
requests.post(
logger.debug("Reinitializing PostgreSQL...")
r = requests.post(
f"{self._patroni_url}/reinitialize",
verify=self.verify,
auth=self._patroni_auth,
timeout=PATRONI_TIMEOUT,
)
logger.debug("API reinitialize_postgresql: %s (%s)", r, r.elapsed.total_seconds())

@retry(stop=stop_after_attempt(3), wait=wait_exponential(multiplier=1, min=2, max=10))
def bulk_update_parameters_controller_by_patroni(self, parameters: dict[str, Any]) -> None:
"""Update the value of a parameter controller by Patroni.

For more information, check https://patroni.readthedocs.io/en/latest/patroni_configuration.html#postgresql-parameters-controlled-by-patroni.
"""
requests.patch(
r = requests.patch(
f"{self._patroni_url}/config",
verify=self.verify,
json={"postgresql": {"parameters": parameters}},
auth=self._patroni_auth,
timeout=PATRONI_TIMEOUT,
)
logger.debug(
"API bulk_update_parameters_controller_by_patroni: %s (%s)",
r,
r.elapsed.total_seconds(),
)

def ensure_slots_controller_by_patroni(self, slots: dict[str, str]) -> None:
"""Synchronises slots controlled by Patroni with the provided state by removing unneeded slots and creating new ones.
Expand All @@ -1019,26 +1086,36 @@ def ensure_slots_controller_by_patroni(self, slots: dict[str, str]) -> None:
timeout=PATRONI_TIMEOUT,
auth=self._patroni_auth,
)
logger.debug(
"API ensure_slots_controller_by_patroni: %s (%s)",
current_config,
current_config.elapsed.total_seconds(),
)
if current_config.status_code != 200:
raise Exception(
f"Failed to get current Patroni config: {current_config.status_code} {current_config.text}"
)
slots_patch: dict[str, dict[str, str] | None] = dict.fromkeys(
current_config.json().get("slots", ())
current_config.json().get("slots", ()) or {}
)
for slot, database in slots.items():
slots_patch[slot] = {
"database": database,
"plugin": "pgoutput",
"type": "logical",
}
requests.patch(
r = requests.patch(
f"{self._patroni_url}/config",
verify=self.verify,
json={"slots": slots_patch},
auth=self._patroni_auth,
timeout=PATRONI_TIMEOUT,
)
logger.debug(
"API ensure_slots_controller_by_patroni: %s (%s)",
r,
r.elapsed.total_seconds(),
)

@property
def _synchronous_node_count(self) -> int:
Expand Down Expand Up @@ -1066,6 +1143,9 @@ def update_synchronous_node_count(self) -> None:
auth=self._patroni_auth,
timeout=PATRONI_TIMEOUT,
)
logger.debug(
"API update_synchronous_node_count: %s (%s)", r, r.elapsed.total_seconds()
)

# Check whether the update was unsuccessful.
if r.status_code != 200:
Expand Down
1 change: 1 addition & 0 deletions src/cluster_topology_observer.py
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,7 @@ def __init__(self, charm: CharmBase, run_cmd: str):
def start_observer(self):
"""Start the cluster topology observer running in a new process."""
if not isinstance(self._charm.unit.status, ActiveStatus) or self._charm._peers is None:
logging.info("Early exit: skip topology observer start")
return
if "observer-pid" in self._charm._peers.data[self._charm.unit]:
# Double check that the PID exists
Expand Down
8 changes: 6 additions & 2 deletions src/relations/async_replication.py
Original file line number Diff line number Diff line change
Expand Up @@ -251,8 +251,9 @@ def get_partner_addresses(self) -> list[str]:
or self.charm._peers.data[self.charm.unit].get("unit-promoted-cluster-counter")
== self._get_highest_promoted_cluster_counter_value()
):
logger.debug(f"Partner addresses: {self.charm._peer_members_ips}")
return self.charm._peer_members_ips
sorted_partners = sorted(self.charm._peer_members_ips)
logger.debug(f"Partner addresses: {sorted_partners}")
return sorted_partners

logger.debug("Partner addresses: []")
return []
Expand Down Expand Up @@ -653,6 +654,9 @@ def _primary_cluster_endpoint(self) -> str | None:
def _re_emit_async_relation_changed_event(self) -> None:
"""Re-emit the async relation changed event."""
relation = self._relation
logger.debug("Emitting async relation changed event")
test = next(unit for unit in relation.units if unit.app == relation.app)
logger.debug(f"Event details: relation={relation} unit={test} relation.app={relation.app}")
getattr(self.charm.on, f"{relation.name.replace('-', '_')}_relation_changed").emit(
relation,
app=relation.app,
Expand Down
Loading
Loading