diff --git a/patroni/ctl.py b/patroni/ctl.py index b52187fc9..48bc87534 100644 --- a/patroni/ctl.py +++ b/patroni/ctl.py @@ -63,7 +63,7 @@ from . import global_config from .config import Config -from .dcs import AbstractDCS, Cluster, get_dcs as _get_dcs, Member +from .dcs import AbstractDCS, Cluster, get_dcs as _get_dcs, Leader, Member from .exceptions import PatroniException from .postgresql.misc import postgres_version_to_int, PostgresqlRole, PostgresqlState from .postgresql.mpp import get_mpp @@ -523,11 +523,14 @@ def watching(w: bool, watch: Optional[int], max_count: Optional[int] = None, cle return counter = 1 + yield_time = time.time() while watch and counter <= (max_count or counter): - time.sleep(watch) + elapsed = time.time() - yield_time + time.sleep(max(0, watch - elapsed)) counter += 1 if clear: click.clear() + yield_time = time.time() yield 0 @@ -2512,3 +2515,115 @@ def format_pg_version(version: int) -> str: return "{0}.{1}.{2}".format(version // 10000, version // 100 % 100, version % 100) else: return "{0}.{1}".format(version // 10000, version % 100) + + +def change_cluster_role(cluster_name: str, force: bool, standby_config: Optional[Dict[str, Any]]) -> None: + """Demote or promote cluster. + + :param cluster_name: name of the Patroni cluster. + :param force: if ``True`` run cluster demotion without asking for confirmation. + :param standby_config: standby cluster configuration to be applied if demotion is requested. + """ + demote = bool(standby_config) + action_name = 'demot' if demote else 'promot' + target_role = PostgresqlRole.STANDBY_LEADER if demote else PostgresqlRole.PRIMARY + + dcs = get_dcs(cluster_name, None) + cluster = dcs.get_cluster() + leader_name = cluster.leader and cluster.leader.name + if not leader_name: + raise PatroniCtlException(f'Cluster has no leader, {action_name}ion is not possible') + if cluster.leader and cluster.leader.data.get('role') == target_role: + raise PatroniCtlException('Cluster is already in the required state') + + click.echo('Current cluster topology') + output_members(cluster, cluster_name) + if not force: + confirm = click.confirm(f'Are you sure you want to {action_name}e {cluster_name} cluster?') + if not confirm: + raise PatroniCtlException(f'Aborted cluster {action_name}ion') + + try: + if TYPE_CHECKING: # pragma: no cover + assert isinstance(cluster.leader, Leader) + r = request_patroni(cluster.leader.member, 'patch', 'config', {'standby_cluster': standby_config}) + + if r.status != 200: + raise PatroniCtlException( + f'Failed to {action_name}e {cluster_name} cluster: ' + f'/config PATCH status code={r.status}, ({r.data.decode("utf-8")})') + except Exception as err: + raise PatroniCtlException(f'Failed to {action_name}e {cluster_name} cluster: {err}') + + for _ in watching(True, 1, clear=False): + cluster = dcs.get_cluster() + is_unlocked = cluster.is_unlocked() + leader_role = cluster.leader and cluster.leader.data.get('role') + leader_state = cluster.leader and cluster.leader.data.get('state') + old_leader = cluster.get_member(leader_name, False) + old_leader_state = old_leader and old_leader.data.get('state') + + if not is_unlocked and leader_role == target_role and leader_state == PostgresqlState.RUNNING: + if not demote or old_leader_state == PostgresqlState.RUNNING: + click.echo( + f'{datetime.datetime.now().strftime("%Y-%m-%d %H:%M:%S")} cluster is successfully {action_name}ed') + break + + state_prts = [f'{datetime.datetime.now().strftime("%Y-%m-%d %H:%M:%S")} cluster is unlocked: {is_unlocked}', + f'leader role: {leader_role}', + f'leader state: {leader_state}'] + if demote and cluster.leader and leader_name != cluster.leader.name and old_leader_state: + state_prts.append(f'previous leader state: {repr(old_leader_state)}') + click.echo(", ".join(state_prts)) + output_members(cluster, cluster_name) + + +@ctl.command('demote-cluster', help="Demote cluster to a standby cluster") +@arg_cluster_name +@option_force +@click.option('--host', help='Address of the remote node', required=False) +@click.option('--port', help='Port of the remote node', type=int, required=False) +@click.option('--restore-command', help='Command to restore WAL records from the remote primary', required=False) +@click.option('--primary-slot-name', help='Name of the slot on the remote node to use for replication', required=False) +def demote_cluster(cluster_name: str, force: bool, host: Optional[str], port: Optional[int], + restore_command: Optional[str], primary_slot_name: Optional[str]) -> None: + """Process ``demote-cluster`` command of ``patronictl`` utility. + + Demote cluster to a standby cluster. + + :param cluster_name: name of the Patroni cluster. + :param force: if ``True`` run cluster demotion without asking for confirmation. + :param host: address of the remote node. + :param port: port of the remote node. + :param restore_command: command to restore WAL records from the remote primary'. + :param primary_slot_name: name of the slot on the remote node to use for replication. + + :raises: + :class:`PatroniCtlException`: if: + * neither ``host`` nor ``port`` nor ``restore_command`` is provided; or + * cluster has no leader; or + * cluster is already in the required state; or + * operation is aborted. + """ + if not any((host, port, restore_command)): + raise PatroniCtlException('At least --host, --port or --restore-command should be specified') + + data = {k: v for k, v in {'host': host, + 'port': port, + 'primary_slot_name': primary_slot_name, + 'restore_command': restore_command}.items() if v} + change_cluster_role(cluster_name, force, data) + + +@ctl.command('promote-cluster', help="Promote cluster, make it run standalone") +@arg_cluster_name +@option_force +def promote_cluster(cluster_name: str, force: bool) -> None: + """Process ``promote-cluster`` command of ``patronictl`` utility. + + Promote cluster, make it run standalone. + + :param cluster_name: name of the Patroni cluster. + :param force: if ``True`` run cluster demotion without asking for confirmation. + """ + change_cluster_role(cluster_name, force, None) diff --git a/patroni/ha.py b/patroni/ha.py index f8cca4e53..6632c9753 100644 --- a/patroni/ha.py +++ b/patroni/ha.py @@ -1690,7 +1690,7 @@ def before_shutdown() -> None: else: if self._rewind.rewind_or_reinitialize_needed_and_possible(leader): return False # do not start postgres, but run pg_rewind on the next iteration - self.state_handler.follow(node_to_follow, role) + return self.state_handler.follow(node_to_follow, role) def should_run_scheduled_action(self, action_name: str, scheduled_at: Optional[datetime.datetime], cleanup_fn: Callable[..., Any]) -> bool: diff --git a/tests/test_ctl.py b/tests/test_ctl.py index 85ba2c92d..c900edab2 100644 --- a/tests/test_ctl.py +++ b/tests/test_ctl.py @@ -12,7 +12,8 @@ from prettytable import PrettyTable from patroni.ctl import CtlPostgresqlRole -from patroni.postgresql.misc import PostgresqlState +from patroni.dcs import ClusterConfig, Leader, Member, SyncState +from patroni.postgresql.misc import PostgresqlRole, PostgresqlState try: from prettytable import HRuleStyle @@ -35,7 +36,8 @@ from . import MockConnect, MockCursor, MockResponse, psycopg_connect from .test_etcd import etcd_read, socket_getaddrinfo from .test_ha import get_cluster, get_cluster_initialized_with_leader, get_cluster_initialized_with_only_leader, \ - get_cluster_initialized_without_leader, get_cluster_not_initialized_without_leader, Member + get_cluster_initialized_without_leader, get_cluster_not_initialized_without_leader, \ + get_standby_cluster_initialized_with_only_leader def get_default_config(*args): @@ -784,6 +786,74 @@ def test_reinit_wait(self): self.assertIn("Waiting for reinitialize to complete on: other", result.output) self.assertIn("Reinitialize is completed on: other", result.output) + @patch('patroni.ctl.watching', Mock(return_value=[0, 0])) + @patch('patroni.ctl.request_patroni') + def test_cluster_demote(self, mock_patch): + m1 = Member(0, 'new_leader', 28, {'conn_url': 'postgres://replicator:rep-pass@127.0.0.1:5435/postgres', + 'role': PostgresqlRole.STANDBY_LEADER, 'state': 'running'}) + m2 = Member(0, 'leader', 28, {'conn_url': 'postgres://replicator:rep-pass@127.0.0.1:5435/postgres', + 'role': PostgresqlRole.PRIMARY, 'state': 'stopping'}) + standby_leader = Leader(0, 0, m1) + leader = Leader(0, 0, m2) + original_cluster = get_cluster('12345678901', leader, [m1, m2], None, SyncState.empty(), None, 1) + standby_cluster = get_cluster( + '12345678901', standby_leader, [m1, m2], None, SyncState.empty(), + ClusterConfig(1, {"standby_cluster": {"host": "localhost", "port": 5432, "primary_slot_name": ""}}, 1)) + + # no option provided + self.runner.invoke(ctl, ['demote-cluster', 'dummy']) + # no leader + with patch('patroni.dcs.AbstractDCS.get_cluster', Mock(return_value=get_cluster_initialized_without_leader())): + result = self.runner.invoke(ctl, ['demote-cluster', 'dummy', '--restore-command', 'foo']) + assert 'Cluster has no leader, demotion is not possible' in result.output + # aborted + with patch('patroni.dcs.AbstractDCS.get_cluster', Mock(return_value=original_cluster)): + result = self.runner.invoke(ctl, ['demote-cluster', 'dummy', '--restore-command', 'foo'], input='N') + assert 'Aborted' in result.output + # already required state + with patch('patroni.dcs.AbstractDCS.get_cluster', Mock(return_value=standby_cluster)): + result = self.runner.invoke(ctl, ['demote-cluster', 'dummy', '--restore-command', 'foo']) + assert 'Cluster is already in the required state' in result.output + + mock_patch.return_value.status = 200 + # success + with patch('patroni.dcs.AbstractDCS.get_cluster', Mock(side_effect=[original_cluster, original_cluster, + standby_cluster])): + result = self.runner.invoke(ctl, ['demote-cluster', 'dummy', '--restore-command', 'foo', '--force']) + assert result.exit_code == 0 + + @patch('patroni.ctl.polling_loop', Mock(return_value=[0, 0])) + @patch('patroni.ctl.request_patroni') + def test_cluster_promote(self, mock_patch): + only_leader_cluster = get_cluster_initialized_with_only_leader() + standby_cluster = get_standby_cluster_initialized_with_only_leader() + # no leader + with patch('patroni.dcs.AbstractDCS.get_cluster', Mock(return_value=get_cluster_initialized_without_leader())): + result = self.runner.invoke(ctl, ['promote-cluster', 'dummy']) + assert 'Cluster has no leader, promotion is not possible' in result.output + # aborted + with patch('patroni.dcs.AbstractDCS.get_cluster', Mock(return_value=standby_cluster)): + result = self.runner.invoke(ctl, ['promote-cluster', 'dummy']) + assert 'Aborted' in result.output + # already required state + with patch('patroni.dcs.AbstractDCS.get_cluster', Mock(return_value=only_leader_cluster)): + result = self.runner.invoke(ctl, ['promote-cluster', 'dummy']) + assert 'Cluster is already in the required state' in result.output + # PATCH error + mock_patch.return_value.status = 500 + result = self.runner.invoke(ctl, ['demote-cluster', 'dummy', '--restore-command', 'foo', '--force']) + assert 'Failed to demote' in result.output + # Exception + with patch('patroni.ctl.request_patroni', Mock(side_effect=Exception)): + result = self.runner.invoke(ctl, ['demote-cluster', 'dummy', '--restore-command', 'foo', '--force']) + assert 'Failed to demote' in result.output + # success + mock_patch.return_value.status = 200 + with patch('patroni.dcs.AbstractDCS.get_cluster', Mock(side_effect=[standby_cluster, standby_cluster, + only_leader_cluster])): + result = self.runner.invoke(ctl, ['promote-cluster', 'dummy', '--force']) + assert result.exit_code == 0 + class TestPatronictlPrettyTable(unittest.TestCase): diff --git a/tests/test_ha.py b/tests/test_ha.py index d4f37227a..dc6c6f6d9 100644 --- a/tests/test_ha.py +++ b/tests/test_ha.py @@ -86,7 +86,7 @@ def get_cluster_initialized_with_only_leader(failover=None, cluster_config=None) def get_standby_cluster_initialized_with_only_leader(failover=None, sync=None): - return get_cluster_initialized_with_only_leader( + cluster = get_cluster_initialized_with_only_leader( cluster_config=ClusterConfig(1, { "standby_cluster": { "host": "localhost", @@ -94,6 +94,8 @@ def get_standby_cluster_initialized_with_only_leader(failover=None, sync=None): "primary_slot_name": "", }}, 1) ) + cluster.leader.data['role'] = PostgresqlRole.STANDBY_LEADER + return cluster def get_cluster_initialized_with_leader_and_failsafe():