Skip to content

Commit 212e610

Browse files
committed
Merge remote-tracking branch 'zalando/master' into multisite
2 parents 07fb6cc + 6e2e239 commit 212e610

File tree

4 files changed

+193
-6
lines changed

4 files changed

+193
-6
lines changed

patroni/ctl.py

Lines changed: 117 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -63,7 +63,7 @@
6363

6464
from . import global_config
6565
from .config import Config
66-
from .dcs import AbstractDCS, Cluster, get_dcs as _get_dcs, Member
66+
from .dcs import AbstractDCS, Cluster, get_dcs as _get_dcs, Leader, Member
6767
from .exceptions import PatroniException
6868
from .postgresql.misc import postgres_version_to_int, PostgresqlRole, PostgresqlState
6969
from .postgresql.mpp import get_mpp
@@ -523,11 +523,14 @@ def watching(w: bool, watch: Optional[int], max_count: Optional[int] = None, cle
523523
return
524524

525525
counter = 1
526+
yield_time = time.time()
526527
while watch and counter <= (max_count or counter):
527-
time.sleep(watch)
528+
elapsed = time.time() - yield_time
529+
time.sleep(max(0, watch - elapsed))
528530
counter += 1
529531
if clear:
530532
click.clear()
533+
yield_time = time.time()
531534
yield 0
532535

533536

@@ -2512,3 +2515,115 @@ def format_pg_version(version: int) -> str:
25122515
return "{0}.{1}.{2}".format(version // 10000, version // 100 % 100, version % 100)
25132516
else:
25142517
return "{0}.{1}".format(version // 10000, version % 100)
2518+
2519+
2520+
def change_cluster_role(cluster_name: str, force: bool, standby_config: Optional[Dict[str, Any]]) -> None:
2521+
"""Demote or promote cluster.
2522+
2523+
:param cluster_name: name of the Patroni cluster.
2524+
:param force: if ``True`` run cluster demotion without asking for confirmation.
2525+
:param standby_config: standby cluster configuration to be applied if demotion is requested.
2526+
"""
2527+
demote = bool(standby_config)
2528+
action_name = 'demot' if demote else 'promot'
2529+
target_role = PostgresqlRole.STANDBY_LEADER if demote else PostgresqlRole.PRIMARY
2530+
2531+
dcs = get_dcs(cluster_name, None)
2532+
cluster = dcs.get_cluster()
2533+
leader_name = cluster.leader and cluster.leader.name
2534+
if not leader_name:
2535+
raise PatroniCtlException(f'Cluster has no leader, {action_name}ion is not possible')
2536+
if cluster.leader and cluster.leader.data.get('role') == target_role:
2537+
raise PatroniCtlException('Cluster is already in the required state')
2538+
2539+
click.echo('Current cluster topology')
2540+
output_members(cluster, cluster_name)
2541+
if not force:
2542+
confirm = click.confirm(f'Are you sure you want to {action_name}e {cluster_name} cluster?')
2543+
if not confirm:
2544+
raise PatroniCtlException(f'Aborted cluster {action_name}ion')
2545+
2546+
try:
2547+
if TYPE_CHECKING: # pragma: no cover
2548+
assert isinstance(cluster.leader, Leader)
2549+
r = request_patroni(cluster.leader.member, 'patch', 'config', {'standby_cluster': standby_config})
2550+
2551+
if r.status != 200:
2552+
raise PatroniCtlException(
2553+
f'Failed to {action_name}e {cluster_name} cluster: '
2554+
f'/config PATCH status code={r.status}, ({r.data.decode("utf-8")})')
2555+
except Exception as err:
2556+
raise PatroniCtlException(f'Failed to {action_name}e {cluster_name} cluster: {err}')
2557+
2558+
for _ in watching(True, 1, clear=False):
2559+
cluster = dcs.get_cluster()
2560+
is_unlocked = cluster.is_unlocked()
2561+
leader_role = cluster.leader and cluster.leader.data.get('role')
2562+
leader_state = cluster.leader and cluster.leader.data.get('state')
2563+
old_leader = cluster.get_member(leader_name, False)
2564+
old_leader_state = old_leader and old_leader.data.get('state')
2565+
2566+
if not is_unlocked and leader_role == target_role and leader_state == PostgresqlState.RUNNING:
2567+
if not demote or old_leader_state == PostgresqlState.RUNNING:
2568+
click.echo(
2569+
f'{datetime.datetime.now().strftime("%Y-%m-%d %H:%M:%S")} cluster is successfully {action_name}ed')
2570+
break
2571+
2572+
state_prts = [f'{datetime.datetime.now().strftime("%Y-%m-%d %H:%M:%S")} cluster is unlocked: {is_unlocked}',
2573+
f'leader role: {leader_role}',
2574+
f'leader state: {leader_state}']
2575+
if demote and cluster.leader and leader_name != cluster.leader.name and old_leader_state:
2576+
state_prts.append(f'previous leader state: {repr(old_leader_state)}')
2577+
click.echo(", ".join(state_prts))
2578+
output_members(cluster, cluster_name)
2579+
2580+
2581+
@ctl.command('demote-cluster', help="Demote cluster to a standby cluster")
2582+
@arg_cluster_name
2583+
@option_force
2584+
@click.option('--host', help='Address of the remote node', required=False)
2585+
@click.option('--port', help='Port of the remote node', type=int, required=False)
2586+
@click.option('--restore-command', help='Command to restore WAL records from the remote primary', required=False)
2587+
@click.option('--primary-slot-name', help='Name of the slot on the remote node to use for replication', required=False)
2588+
def demote_cluster(cluster_name: str, force: bool, host: Optional[str], port: Optional[int],
2589+
restore_command: Optional[str], primary_slot_name: Optional[str]) -> None:
2590+
"""Process ``demote-cluster`` command of ``patronictl`` utility.
2591+
2592+
Demote cluster to a standby cluster.
2593+
2594+
:param cluster_name: name of the Patroni cluster.
2595+
:param force: if ``True`` run cluster demotion without asking for confirmation.
2596+
:param host: address of the remote node.
2597+
:param port: port of the remote node.
2598+
:param restore_command: command to restore WAL records from the remote primary'.
2599+
:param primary_slot_name: name of the slot on the remote node to use for replication.
2600+
2601+
:raises:
2602+
:class:`PatroniCtlException`: if:
2603+
* neither ``host`` nor ``port`` nor ``restore_command`` is provided; or
2604+
* cluster has no leader; or
2605+
* cluster is already in the required state; or
2606+
* operation is aborted.
2607+
"""
2608+
if not any((host, port, restore_command)):
2609+
raise PatroniCtlException('At least --host, --port or --restore-command should be specified')
2610+
2611+
data = {k: v for k, v in {'host': host,
2612+
'port': port,
2613+
'primary_slot_name': primary_slot_name,
2614+
'restore_command': restore_command}.items() if v}
2615+
change_cluster_role(cluster_name, force, data)
2616+
2617+
2618+
@ctl.command('promote-cluster', help="Promote cluster, make it run standalone")
2619+
@arg_cluster_name
2620+
@option_force
2621+
def promote_cluster(cluster_name: str, force: bool) -> None:
2622+
"""Process ``promote-cluster`` command of ``patronictl`` utility.
2623+
2624+
Promote cluster, make it run standalone.
2625+
2626+
:param cluster_name: name of the Patroni cluster.
2627+
:param force: if ``True`` run cluster demotion without asking for confirmation.
2628+
"""
2629+
change_cluster_role(cluster_name, force, None)

patroni/ha.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1690,7 +1690,7 @@ def before_shutdown() -> None:
16901690
else:
16911691
if self._rewind.rewind_or_reinitialize_needed_and_possible(leader):
16921692
return False # do not start postgres, but run pg_rewind on the next iteration
1693-
self.state_handler.follow(node_to_follow, role)
1693+
return self.state_handler.follow(node_to_follow, role)
16941694

16951695
def should_run_scheduled_action(self, action_name: str, scheduled_at: Optional[datetime.datetime],
16961696
cleanup_fn: Callable[..., Any]) -> bool:

tests/test_ctl.py

Lines changed: 72 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -12,7 +12,8 @@
1212
from prettytable import PrettyTable
1313

1414
from patroni.ctl import CtlPostgresqlRole
15-
from patroni.postgresql.misc import PostgresqlState
15+
from patroni.dcs import ClusterConfig, Leader, Member, SyncState
16+
from patroni.postgresql.misc import PostgresqlRole, PostgresqlState
1617

1718
try:
1819
from prettytable import HRuleStyle
@@ -35,7 +36,8 @@
3536
from . import MockConnect, MockCursor, MockResponse, psycopg_connect
3637
from .test_etcd import etcd_read, socket_getaddrinfo
3738
from .test_ha import get_cluster, get_cluster_initialized_with_leader, get_cluster_initialized_with_only_leader, \
38-
get_cluster_initialized_without_leader, get_cluster_not_initialized_without_leader, Member
39+
get_cluster_initialized_without_leader, get_cluster_not_initialized_without_leader, \
40+
get_standby_cluster_initialized_with_only_leader
3941

4042

4143
def get_default_config(*args):
@@ -784,6 +786,74 @@ def test_reinit_wait(self):
784786
self.assertIn("Waiting for reinitialize to complete on: other", result.output)
785787
self.assertIn("Reinitialize is completed on: other", result.output)
786788

789+
@patch('patroni.ctl.watching', Mock(return_value=[0, 0]))
790+
@patch('patroni.ctl.request_patroni')
791+
def test_cluster_demote(self, mock_patch):
792+
m1 = Member(0, 'new_leader', 28, {'conn_url': 'postgres://replicator:[email protected]:5435/postgres',
793+
'role': PostgresqlRole.STANDBY_LEADER, 'state': 'running'})
794+
m2 = Member(0, 'leader', 28, {'conn_url': 'postgres://replicator:[email protected]:5435/postgres',
795+
'role': PostgresqlRole.PRIMARY, 'state': 'stopping'})
796+
standby_leader = Leader(0, 0, m1)
797+
leader = Leader(0, 0, m2)
798+
original_cluster = get_cluster('12345678901', leader, [m1, m2], None, SyncState.empty(), None, 1)
799+
standby_cluster = get_cluster(
800+
'12345678901', standby_leader, [m1, m2], None, SyncState.empty(),
801+
ClusterConfig(1, {"standby_cluster": {"host": "localhost", "port": 5432, "primary_slot_name": ""}}, 1))
802+
803+
# no option provided
804+
self.runner.invoke(ctl, ['demote-cluster', 'dummy'])
805+
# no leader
806+
with patch('patroni.dcs.AbstractDCS.get_cluster', Mock(return_value=get_cluster_initialized_without_leader())):
807+
result = self.runner.invoke(ctl, ['demote-cluster', 'dummy', '--restore-command', 'foo'])
808+
assert 'Cluster has no leader, demotion is not possible' in result.output
809+
# aborted
810+
with patch('patroni.dcs.AbstractDCS.get_cluster', Mock(return_value=original_cluster)):
811+
result = self.runner.invoke(ctl, ['demote-cluster', 'dummy', '--restore-command', 'foo'], input='N')
812+
assert 'Aborted' in result.output
813+
# already required state
814+
with patch('patroni.dcs.AbstractDCS.get_cluster', Mock(return_value=standby_cluster)):
815+
result = self.runner.invoke(ctl, ['demote-cluster', 'dummy', '--restore-command', 'foo'])
816+
assert 'Cluster is already in the required state' in result.output
817+
818+
mock_patch.return_value.status = 200
819+
# success
820+
with patch('patroni.dcs.AbstractDCS.get_cluster', Mock(side_effect=[original_cluster, original_cluster,
821+
standby_cluster])):
822+
result = self.runner.invoke(ctl, ['demote-cluster', 'dummy', '--restore-command', 'foo', '--force'])
823+
assert result.exit_code == 0
824+
825+
@patch('patroni.ctl.polling_loop', Mock(return_value=[0, 0]))
826+
@patch('patroni.ctl.request_patroni')
827+
def test_cluster_promote(self, mock_patch):
828+
only_leader_cluster = get_cluster_initialized_with_only_leader()
829+
standby_cluster = get_standby_cluster_initialized_with_only_leader()
830+
# no leader
831+
with patch('patroni.dcs.AbstractDCS.get_cluster', Mock(return_value=get_cluster_initialized_without_leader())):
832+
result = self.runner.invoke(ctl, ['promote-cluster', 'dummy'])
833+
assert 'Cluster has no leader, promotion is not possible' in result.output
834+
# aborted
835+
with patch('patroni.dcs.AbstractDCS.get_cluster', Mock(return_value=standby_cluster)):
836+
result = self.runner.invoke(ctl, ['promote-cluster', 'dummy'])
837+
assert 'Aborted' in result.output
838+
# already required state
839+
with patch('patroni.dcs.AbstractDCS.get_cluster', Mock(return_value=only_leader_cluster)):
840+
result = self.runner.invoke(ctl, ['promote-cluster', 'dummy'])
841+
assert 'Cluster is already in the required state' in result.output
842+
# PATCH error
843+
mock_patch.return_value.status = 500
844+
result = self.runner.invoke(ctl, ['demote-cluster', 'dummy', '--restore-command', 'foo', '--force'])
845+
assert 'Failed to demote' in result.output
846+
# Exception
847+
with patch('patroni.ctl.request_patroni', Mock(side_effect=Exception)):
848+
result = self.runner.invoke(ctl, ['demote-cluster', 'dummy', '--restore-command', 'foo', '--force'])
849+
assert 'Failed to demote' in result.output
850+
# success
851+
mock_patch.return_value.status = 200
852+
with patch('patroni.dcs.AbstractDCS.get_cluster', Mock(side_effect=[standby_cluster, standby_cluster,
853+
only_leader_cluster])):
854+
result = self.runner.invoke(ctl, ['promote-cluster', 'dummy', '--force'])
855+
assert result.exit_code == 0
856+
787857

788858
class TestPatronictlPrettyTable(unittest.TestCase):
789859

tests/test_ha.py

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -86,14 +86,16 @@ def get_cluster_initialized_with_only_leader(failover=None, cluster_config=None)
8686

8787

8888
def get_standby_cluster_initialized_with_only_leader(failover=None, sync=None):
89-
return get_cluster_initialized_with_only_leader(
89+
cluster = get_cluster_initialized_with_only_leader(
9090
cluster_config=ClusterConfig(1, {
9191
"standby_cluster": {
9292
"host": "localhost",
9393
"port": 5432,
9494
"primary_slot_name": "",
9595
}}, 1)
9696
)
97+
cluster.leader.data['role'] = PostgresqlRole.STANDBY_LEADER
98+
return cluster
9799

98100

99101
def get_cluster_initialized_with_leader_and_failsafe():

0 commit comments

Comments
 (0)