Skip to content

Commit 76e8bb0

Browse files
authored
Clean cluster demotion (patroni#3361)
Make sure introduction of the "standby_cluster" section in the dynamic configuration leads to a clean cluster demotion
1 parent bc0c423 commit 76e8bb0

File tree

9 files changed

+208
-46
lines changed

9 files changed

+208
-46
lines changed

features/environment.py

Lines changed: 11 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -123,6 +123,12 @@ def read_label(self, label):
123123
except IOError:
124124
return None
125125

126+
def read_config(self):
127+
config = dict()
128+
with open(self._config) as r:
129+
config = yaml.safe_load(r)
130+
return config
131+
126132
@staticmethod
127133
def recursive_update(dst, src):
128134
for k, v in src.items():
@@ -132,11 +138,10 @@ def recursive_update(dst, src):
132138
dst[k] = v
133139

134140
def update_config(self, custom_config):
135-
with open(self._config) as r:
136-
config = yaml.safe_load(r)
137-
self.recursive_update(config, custom_config)
138-
with open(self._config, 'w') as w:
139-
yaml.safe_dump(config, w, default_flow_style=False)
141+
config = self.read_config()
142+
self.recursive_update(config, custom_config)
143+
with open(self._config, 'w') as w:
144+
yaml.safe_dump(config, w, default_flow_style=False)
140145
self._scope = config.get('scope', 'batman')
141146

142147
def add_tag_to_config(self, tag, value):
@@ -857,7 +862,7 @@ def start(self, name, max_wait_limit=40, custom_config=None):
857862
self._processes[name].start(max_wait_limit)
858863

859864
def __getattr__(self, func):
860-
if func not in ['stop', 'query', 'write_label', 'read_label', 'check_role_has_changed_to',
865+
if func not in ['stop', 'query', 'write_label', 'read_label', 'read_config', 'check_role_has_changed_to',
861866
'add_tag_to_config', 'get_watchdog', 'patroni_hang', 'backup', 'read_patroni_log']:
862867
raise AttributeError("PatroniPoolController instance has no attribute '{0}'".format(func))
863868

features/standby_cluster.feature

Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -64,3 +64,17 @@ Feature: standby cluster
6464
And I receive a response role standby_leader
6565
And replication works from postgres-0 to postgres-1 after 15 seconds
6666
And there is a postgres-1_cb.log with "on_role_change replica batman1\non_role_change standby_leader batman1" in postgres-1 data directory
67+
68+
Scenario: demote cluster
69+
When I switch standby cluster batman1 to archive recovery
70+
Then Response on GET http://127.0.0.1:8009/patroni contains replication_state=in archive recovery after 30 seconds
71+
When I demote cluster batman
72+
And "members/postgres-0" key in DCS has role=standby_leader after 20 seconds
73+
And "members/postgres-0" key in DCS has state=running after 10 seconds
74+
75+
Scenario: promote cluster
76+
When I issue a PATCH request to http://127.0.0.1:8009/config with {"standby_cluster": null}
77+
Then I receive a response code 200
78+
And postgres-1 role is the primary after 10 seconds
79+
When I add the table foo2 to postgres-1
80+
Then table foo2 is present on postgres-0 after 20 seconds

features/steps/standby_cluster.py

Lines changed: 28 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,9 @@
1+
import json
12
import os
23
import time
34

45
from behave import step
6+
from patroni_api import check_response, do_request
57

68

79
def callbacks(context, name):
@@ -66,3 +68,29 @@ def check_replication_status(context, pg_name1, pg_name2, timeout):
6668
time.sleep(1)
6769
else:
6870
assert False, "{0} is not replicating from {1} after {2} seconds".format(pg_name1, pg_name2, timeout)
71+
72+
73+
@step('I switch standby cluster {scope:name} to archive recovery')
74+
def standby_cluster_archive(context, scope, demote=False):
75+
for name, proc in context.pctl._processes.items():
76+
if proc._scope != scope or not proc._is_running:
77+
continue
78+
79+
config = context.pctl.read_config(name)
80+
url = f'http://{config["restapi"]["connect_address"]}/config'
81+
data = {
82+
"standby_cluster": {
83+
"restore_command": config['bootstrap']['dcs']['postgresql']['parameters']['restore_command']
84+
}
85+
}
86+
if not demote:
87+
data['standby_cluster']['primary_slot_name'] = data['standby_cluster']['host'] =\
88+
data['standby_cluster']['port'] = None
89+
do_request(context, 'PATCH', url, json.dumps(data))
90+
check_response(context, 'code', 200)
91+
break
92+
93+
94+
@step('I demote cluster {scope:name}')
95+
def demote_cluster(context, scope):
96+
standby_cluster_archive(context, scope, True)

patroni/ha.py

Lines changed: 47 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -8,7 +8,7 @@
88

99
from multiprocessing.pool import ThreadPool
1010
from threading import RLock
11-
from typing import Any, Callable, Collection, Dict, List, NamedTuple, Optional, Tuple, TYPE_CHECKING, Union
11+
from typing import Any, Callable, cast, Collection, Dict, List, NamedTuple, Optional, Tuple, TYPE_CHECKING, Union
1212

1313
from . import global_config, psycopg
1414
from .__main__ import Patroni
@@ -746,7 +746,10 @@ def follow(self, demote_reason: str, follow_reason: str, refresh: bool = True) -
746746
if not node_to_follow:
747747
return 'no action. I am ({0})'.format(self.state_handler.name)
748748
elif is_leader:
749-
self.demote('immediate-nolock')
749+
if self.is_standby_cluster():
750+
self._async_executor.try_run_async('demoting to a standby cluster', self.demote, ('demote-cluster',))
751+
else:
752+
self.demote('immediate-nolock')
750753
return demote_reason
751754

752755
if self.is_standby_cluster() and self._leader_timeline and \
@@ -1563,6 +1566,7 @@ def demote(self, mode: str) -> Optional[bool]:
15631566
'graceful': dict(stop='fast', checkpoint=True, release=True, offline=False, async_req=False), # noqa: E241,E501
15641567
'immediate': dict(stop='immediate', checkpoint=False, release=True, offline=False, async_req=True), # noqa: E241,E501
15651568
'immediate-nolock': dict(stop='immediate', checkpoint=False, release=False, offline=False, async_req=True), # noqa: E241,E501
1569+
'demote-cluster': dict(stop='fast', checkpoint=False, release=True, offline=False, async_req=False), # noqa: E241,E501
15661570

15671571
}[mode]
15681572

@@ -1572,6 +1576,16 @@ def demote(self, mode: str) -> Optional[bool]:
15721576

15731577
status = {'released': False}
15741578

1579+
demote_cluster_with_archive = False
1580+
archive_cmd = self._rewind.get_archive_command()
1581+
if mode == 'demote-cluster' and archive_cmd is not None:
1582+
# We need to send the shutdown checkpoint WAL file to archive to eliminate the need of rewind
1583+
# from a promoted instance that was previously replicating from archive
1584+
# When doing this, we disable stop timeout, do not run on_shutdown callback and do not release
1585+
# leader key.
1586+
demote_cluster_with_archive = True
1587+
mode_control['release'] = False
1588+
15751589
def on_shutdown(checkpoint_location: int, prev_location: int) -> None:
15761590
# Postmaster is still running, but pg_control already reports clean "shut down".
15771591
# It could happen if Postgres is still archiving the backlog of WAL files.
@@ -1580,8 +1594,11 @@ def on_shutdown(checkpoint_location: int, prev_location: int) -> None:
15801594
time.sleep(1) # give replicas some more time to catch up
15811595
if self.is_failover_possible(cluster_lsn=checkpoint_location):
15821596
self.state_handler.set_role(PostgresqlRole.DEMOTED)
1597+
# for demotion to a standby cluster we need shutdown checkpoint lsn to be written to optime,
1598+
# not the prev one
1599+
last_lsn = checkpoint_location if mode == 'demote-cluster' else prev_location
15831600
with self._async_executor:
1584-
self.release_leader_key_voluntarily(prev_location)
1601+
self.release_leader_key_voluntarily(last_lsn)
15851602
status['released'] = True
15861603

15871604
def before_shutdown() -> None:
@@ -1594,16 +1611,33 @@ def before_shutdown() -> None:
15941611
on_safepoint=self.watchdog.disable if self.watchdog.is_running else None,
15951612
on_shutdown=on_shutdown if mode_control['release'] else None,
15961613
before_shutdown=before_shutdown if mode == 'graceful' else None,
1597-
stop_timeout=self.primary_stop_timeout())
1614+
stop_timeout=None if demote_cluster_with_archive else self.primary_stop_timeout())
15981615
self.state_handler.set_role(PostgresqlRole.DEMOTED)
1599-
self.set_is_leader(False)
1616+
1617+
# for demotion to a standby cluster we need shutdown checkpoint lsn to be written to optime, not the prev one
1618+
checkpoint_lsn, prev_lsn = self.state_handler.latest_checkpoint_locations() \
1619+
if mode == 'graceful' else (None, None)
1620+
1621+
is_standby_leader = mode == 'demote-cluster' and not status['released']
1622+
if is_standby_leader:
1623+
with self._async_executor:
1624+
self.dcs.update_leader(self.cluster, checkpoint_lsn, None, self._failsafe_config())
1625+
mode_control['release'] = False
1626+
else:
1627+
self.set_is_leader(False)
16001628

16011629
if mode_control['release']:
16021630
if not status['released']:
1603-
checkpoint_location = self.state_handler.latest_checkpoint_location() if mode == 'graceful' else None
16041631
with self._async_executor:
1605-
self.release_leader_key_voluntarily(checkpoint_location)
1632+
self.release_leader_key_voluntarily(prev_lsn)
16061633
time.sleep(2) # Give a time to somebody to take the leader lock
1634+
1635+
if mode == 'demote-cluster':
1636+
if demote_cluster_with_archive:
1637+
self._rewind.archive_shutdown_checkpoint_wal(cast(str, archive_cmd))
1638+
else:
1639+
logger.info('Not archiving latest checkpoint WAL file. Archiving is not configured.')
1640+
16071641
if mode_control['offline']:
16081642
node_to_follow, leader = None, None
16091643
else:
@@ -1616,15 +1650,17 @@ def before_shutdown() -> None:
16161650
if self.is_synchronous_mode():
16171651
self.state_handler.sync_handler.set_synchronous_standby_names(CaseInsensitiveSet())
16181652

1653+
role = PostgresqlRole.STANDBY_LEADER if is_standby_leader else PostgresqlRole.REPLICA
16191654
# FIXME: with mode offline called from DCS exception handler and handle_long_action_in_progress
16201655
# there could be an async action already running, calling follow from here will lead
16211656
# to racy state handler state updates.
16221657
if mode_control['async_req']:
1623-
self._async_executor.try_run_async('starting after demotion', self.state_handler.follow, (node_to_follow,))
1658+
self._async_executor.try_run_async('starting after demotion', self.state_handler.follow,
1659+
(node_to_follow, role,))
16241660
else:
16251661
if self._rewind.rewind_or_reinitialize_needed_and_possible(leader):
16261662
return False # do not start postgres, but run pg_rewind on the next iteration
1627-
self.state_handler.follow(node_to_follow)
1663+
self.state_handler.follow(node_to_follow, role)
16281664

16291665
def should_run_scheduled_action(self, action_name: str, scheduled_at: Optional[datetime.datetime],
16301666
cleanup_fn: Callable[..., Any]) -> bool:
@@ -2363,8 +2399,8 @@ def _before_shutdown() -> None:
23632399
stop_timeout=self.primary_stop_timeout()))
23642400
if not self.state_handler.is_running():
23652401
if self.is_leader() and not status['deleted']:
2366-
checkpoint_location = self.state_handler.latest_checkpoint_location()
2367-
self.dcs.delete_leader(self.cluster.leader, checkpoint_location)
2402+
_, prev_location = self.state_handler.latest_checkpoint_locations()
2403+
self.dcs.delete_leader(self.cluster.leader, prev_location)
23682404
self.touch_member()
23692405
else:
23702406
# XXX: what about when Patroni is started as the wrong user that has access to the watchdog device

patroni/postgresql/__init__.py

Lines changed: 7 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -623,14 +623,16 @@ def parse_wal_record(self, timeline: str,
623623
return match.group(1), match.group(2), match.group(3), match.group(4)
624624
return None, None, None, None
625625

626-
def _checkpoint_locations_from_controldata(self, data: Dict[str, str]) -> Optional[Tuple[int, int]]:
626+
def latest_checkpoint_locations(self, data: Optional[Dict[str, str]] = None) -> Tuple[Optional[int], Optional[int]]:
627627
"""Get shutdown checkpoint location.
628628
629629
:param data: :class:`dict` object with values returned by `pg_controldata` tool.
630630
631631
:returns: a tuple of checkpoint LSN for the cleanly shut down primary, and LSN of prev wal record (SWITCH)
632632
if we know that the checkpoint was written to the new WAL file due to the archive_mode=on.
633633
"""
634+
if data is None:
635+
data = self.controldata()
634636
timeline = data.get("Latest checkpoint's TimeLineID")
635637
lsn = checkpoint_lsn = data.get('Latest checkpoint location')
636638
prev_lsn = None
@@ -651,19 +653,7 @@ def _checkpoint_locations_from_controldata(self, data: Dict[str, str]) -> Option
651653
logger.error('Exception when parsing WAL pg_%sdump output: %r', self.wal_name, e)
652654
if isinstance(checkpoint_lsn, int):
653655
return checkpoint_lsn, (prev_lsn or checkpoint_lsn)
654-
655-
def latest_checkpoint_location(self) -> Optional[int]:
656-
"""Get shutdown checkpoint location.
657-
658-
.. note::
659-
In case if checkpoint was written to the new WAL file due to the archive_mode=on
660-
we return LSN of the previous wal record (SWITCH).
661-
662-
:returns: checkpoint LSN for the cleanly shut down primary.
663-
"""
664-
checkpoint_locations = self._checkpoint_locations_from_controldata(self.controldata())
665-
if checkpoint_locations:
666-
return checkpoint_locations[1]
656+
return None, None
667657

668658
def is_running(self) -> Optional[PostmasterProcess]:
669659
"""Returns PostmasterProcess if one is running on the data directory or None. If most recently seen process
@@ -922,9 +912,9 @@ def _do_stop(self, mode: str, block_callbacks: bool, checkpoint: bool,
922912
while postmaster.is_running():
923913
data = self.controldata()
924914
if data.get('Database cluster state', '') == 'shut down':
925-
checkpoint_locations = self._checkpoint_locations_from_controldata(data)
926-
if checkpoint_locations:
927-
on_shutdown(*checkpoint_locations)
915+
checkpoint_lsn, prev_lsn = self.latest_checkpoint_locations(data)
916+
if checkpoint_lsn is not None and prev_lsn is not None:
917+
on_shutdown(checkpoint_lsn, prev_lsn)
928918
break
929919
elif data.get('Database cluster state', '').startswith('shut down'): # shut down in recovery
930920
break

patroni/postgresql/rewind.py

Lines changed: 26 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -327,6 +327,16 @@ def ensure_checkpoint_after_promote(self, wakeup: Callable[..., Any]) -> None:
327327
def checkpoint_after_promote(self) -> bool:
328328
return self._state == REWIND_STATUS.CHECKPOINT
329329

330+
def get_archive_command(self) -> Optional[str]:
331+
"""Get ``archive_command`` GUC value if defined and archiving is enabled.
332+
333+
:returns: ``archive_command`` defined in the Postgres configuration or None.
334+
"""
335+
archive_mode = self._postgresql.get_guc_value('archive_mode')
336+
archive_cmd = self._postgresql.get_guc_value('archive_command')
337+
if archive_mode in ('on', 'always') and archive_cmd:
338+
return archive_cmd
339+
330340
def _build_archiver_command(self, command: str, wal_filename: str) -> str:
331341
"""Replace placeholders in the given archiver command's template.
332342
Applicable for archive_command and restore_command.
@@ -380,9 +390,8 @@ def _archive_ready_wals(self) -> None:
380390
after it the WALs were recycled on the promoted replica.
381391
With this we prevent the entire loss of such WALs and the
382392
consequent old leader's start failure."""
383-
archive_mode = self._postgresql.get_guc_value('archive_mode')
384-
archive_cmd = self._postgresql.get_guc_value('archive_command')
385-
if archive_mode not in ('on', 'always') or not archive_cmd:
393+
archive_cmd = self.get_archive_command()
394+
if not archive_cmd:
386395
return
387396

388397
walseg_regex = re.compile(r'^[0-9A-F]{24}(\.partial){0,1}\.ready$')
@@ -602,3 +611,17 @@ def ensure_clean_shutdown(self) -> Optional[bool]:
602611
logger.info(' stdout=%s', output['stdout'].decode('utf-8'))
603612
logger.info(' stderr=%s', output['stderr'].decode('utf-8'))
604613
return ret == 0 or None
614+
615+
def archive_shutdown_checkpoint_wal(self, archive_cmd: str) -> None:
616+
"""Archive WAL file with the shutdown checkpoint.
617+
618+
:param archive_cmd: archiver command to use
619+
"""
620+
data = self._postgresql.controldata()
621+
wal_file = data.get("Latest checkpoint's REDO WAL file", '')
622+
if not wal_file:
623+
logger.error("Cannot extract latest checkpoint's WAL file name")
624+
return
625+
cmd = self._build_archiver_command(archive_cmd, wal_file)
626+
if self._postgresql.cancellable.call([cmd], shell=True):
627+
logger.error("Failed to archive WAL file with the shutdown checkpoint")

0 commit comments

Comments
 (0)