Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
119 changes: 117 additions & 2 deletions patroni/ctl.py
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,7 @@

from . import global_config
from .config import Config
from .dcs import AbstractDCS, Cluster, get_dcs as _get_dcs, Member
from .dcs import AbstractDCS, Cluster, get_dcs as _get_dcs, Leader, Member
from .exceptions import PatroniException
from .postgresql.misc import postgres_version_to_int, PostgresqlRole, PostgresqlState
from .postgresql.mpp import get_mpp
Expand Down Expand Up @@ -523,11 +523,14 @@ def watching(w: bool, watch: Optional[int], max_count: Optional[int] = None, cle
return

counter = 1
yield_time = time.time()
while watch and counter <= (max_count or counter):
time.sleep(watch)
elapsed = time.time() - yield_time
time.sleep(max(0, watch - elapsed))
counter += 1
if clear:
click.clear()
yield_time = time.time()
yield 0


Expand Down Expand Up @@ -2512,3 +2515,115 @@ def format_pg_version(version: int) -> str:
return "{0}.{1}.{2}".format(version // 10000, version // 100 % 100, version % 100)
else:
return "{0}.{1}".format(version // 10000, version % 100)


def change_cluster_role(cluster_name: str, force: bool, standby_config: Optional[Dict[str, Any]]) -> None:
"""Demote or promote cluster.

:param cluster_name: name of the Patroni cluster.
:param force: if ``True`` run cluster demotion without asking for confirmation.
:param standby_config: standby cluster configuration to be applied if demotion is requested.
"""
demote = bool(standby_config)
action_name = 'demot' if demote else 'promot'
target_role = PostgresqlRole.STANDBY_LEADER if demote else PostgresqlRole.PRIMARY

dcs = get_dcs(cluster_name, None)
cluster = dcs.get_cluster()
leader_name = cluster.leader and cluster.leader.name
if not leader_name:
raise PatroniCtlException(f'Cluster has no leader, {action_name}ion is not possible')
if cluster.leader and cluster.leader.data.get('role') == target_role:
raise PatroniCtlException('Cluster is already in the required state')

click.echo('Current cluster topology')
output_members(cluster, cluster_name)
if not force:
confirm = click.confirm(f'Are you sure you want to {action_name}e {cluster_name} cluster?')
if not confirm:
raise PatroniCtlException(f'Aborted cluster {action_name}ion')

try:
if TYPE_CHECKING: # pragma: no cover
assert isinstance(cluster.leader, Leader)
r = request_patroni(cluster.leader.member, 'patch', 'config', {'standby_cluster': standby_config})

if r.status != 200:
raise PatroniCtlException(
f'Failed to {action_name}e {cluster_name} cluster: '
f'/config PATCH status code={r.status}, ({r.data.decode("utf-8")})')
except Exception as err:
raise PatroniCtlException(f'Failed to {action_name}e {cluster_name} cluster: {err}')

for _ in watching(True, 1, clear=False):
cluster = dcs.get_cluster()
is_unlocked = cluster.is_unlocked()
leader_role = cluster.leader and cluster.leader.data.get('role')
leader_state = cluster.leader and cluster.leader.data.get('state')
old_leader = cluster.get_member(leader_name, False)
old_leader_state = old_leader and old_leader.data.get('state')

if not is_unlocked and leader_role == target_role and leader_state == PostgresqlState.RUNNING:
if not demote or old_leader_state == PostgresqlState.RUNNING:
click.echo(
f'{datetime.datetime.now().strftime("%Y-%m-%d %H:%M:%S")} cluster is successfully {action_name}ed')
break

state_prts = [f'{datetime.datetime.now().strftime("%Y-%m-%d %H:%M:%S")} cluster is unlocked: {is_unlocked}',
f'leader role: {leader_role}',
f'leader state: {leader_state}']
if demote and cluster.leader and leader_name != cluster.leader.name and old_leader_state:
state_prts.append(f'previous leader state: {repr(old_leader_state)}')
click.echo(", ".join(state_prts))
output_members(cluster, cluster_name)


@ctl.command('demote-cluster', help="Demote cluster to a standby cluster")
@arg_cluster_name
@option_force
@click.option('--host', help='Address of the remote node', required=False)
@click.option('--port', help='Port of the remote node', type=int, required=False)
@click.option('--restore-command', help='Command to restore WAL records from the remote primary', required=False)
@click.option('--primary-slot-name', help='Name of the slot on the remote node to use for replication', required=False)
def demote_cluster(cluster_name: str, force: bool, host: Optional[str], port: Optional[int],
restore_command: Optional[str], primary_slot_name: Optional[str]) -> None:
"""Process ``demote-cluster`` command of ``patronictl`` utility.

Demote cluster to a standby cluster.

:param cluster_name: name of the Patroni cluster.
:param force: if ``True`` run cluster demotion without asking for confirmation.
:param host: address of the remote node.
:param port: port of the remote node.
:param restore_command: command to restore WAL records from the remote primary'.
:param primary_slot_name: name of the slot on the remote node to use for replication.

:raises:
:class:`PatroniCtlException`: if:
* neither ``host`` nor ``port`` nor ``restore_command`` is provided; or
* cluster has no leader; or
* cluster is already in the required state; or
* operation is aborted.
"""
if not any((host, port, restore_command)):
raise PatroniCtlException('At least --host, --port or --restore-command should be specified')

data = {k: v for k, v in {'host': host,
'port': port,
'primary_slot_name': primary_slot_name,
'restore_command': restore_command}.items() if v}
change_cluster_role(cluster_name, force, data)


@ctl.command('promote-cluster', help="Promote cluster, make it run standalone")
@arg_cluster_name
@option_force
def promote_cluster(cluster_name: str, force: bool) -> None:
"""Process ``promote-cluster`` command of ``patronictl`` utility.

Promote cluster, make it run standalone.

:param cluster_name: name of the Patroni cluster.
:param force: if ``True`` run cluster demotion without asking for confirmation.
"""
change_cluster_role(cluster_name, force, None)
2 changes: 1 addition & 1 deletion patroni/ha.py
Original file line number Diff line number Diff line change
Expand Up @@ -1690,7 +1690,7 @@ def before_shutdown() -> None:
else:
if self._rewind.rewind_or_reinitialize_needed_and_possible(leader):
return False # do not start postgres, but run pg_rewind on the next iteration
self.state_handler.follow(node_to_follow, role)
return self.state_handler.follow(node_to_follow, role)

def should_run_scheduled_action(self, action_name: str, scheduled_at: Optional[datetime.datetime],
cleanup_fn: Callable[..., Any]) -> bool:
Expand Down
74 changes: 72 additions & 2 deletions tests/test_ctl.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,8 @@
from prettytable import PrettyTable

from patroni.ctl import CtlPostgresqlRole
from patroni.postgresql.misc import PostgresqlState
from patroni.dcs import ClusterConfig, Leader, Member, SyncState
from patroni.postgresql.misc import PostgresqlRole, PostgresqlState

try:
from prettytable import HRuleStyle
Expand All @@ -35,7 +36,8 @@
from . import MockConnect, MockCursor, MockResponse, psycopg_connect
from .test_etcd import etcd_read, socket_getaddrinfo
from .test_ha import get_cluster, get_cluster_initialized_with_leader, get_cluster_initialized_with_only_leader, \
get_cluster_initialized_without_leader, get_cluster_not_initialized_without_leader, Member
get_cluster_initialized_without_leader, get_cluster_not_initialized_without_leader, \
get_standby_cluster_initialized_with_only_leader


def get_default_config(*args):
Expand Down Expand Up @@ -784,6 +786,74 @@ def test_reinit_wait(self):
self.assertIn("Waiting for reinitialize to complete on: other", result.output)
self.assertIn("Reinitialize is completed on: other", result.output)

@patch('patroni.ctl.watching', Mock(return_value=[0, 0]))
@patch('patroni.ctl.request_patroni')
def test_cluster_demote(self, mock_patch):
m1 = Member(0, 'new_leader', 28, {'conn_url': 'postgres://replicator:[email protected]:5435/postgres',
'role': PostgresqlRole.STANDBY_LEADER, 'state': 'running'})
m2 = Member(0, 'leader', 28, {'conn_url': 'postgres://replicator:[email protected]:5435/postgres',
'role': PostgresqlRole.PRIMARY, 'state': 'stopping'})
standby_leader = Leader(0, 0, m1)
leader = Leader(0, 0, m2)
original_cluster = get_cluster('12345678901', leader, [m1, m2], None, SyncState.empty(), None, 1)
standby_cluster = get_cluster(
'12345678901', standby_leader, [m1, m2], None, SyncState.empty(),
ClusterConfig(1, {"standby_cluster": {"host": "localhost", "port": 5432, "primary_slot_name": ""}}, 1))

# no option provided
self.runner.invoke(ctl, ['demote-cluster', 'dummy'])
# no leader
with patch('patroni.dcs.AbstractDCS.get_cluster', Mock(return_value=get_cluster_initialized_without_leader())):
result = self.runner.invoke(ctl, ['demote-cluster', 'dummy', '--restore-command', 'foo'])
assert 'Cluster has no leader, demotion is not possible' in result.output
# aborted
with patch('patroni.dcs.AbstractDCS.get_cluster', Mock(return_value=original_cluster)):
result = self.runner.invoke(ctl, ['demote-cluster', 'dummy', '--restore-command', 'foo'], input='N')
assert 'Aborted' in result.output
# already required state
with patch('patroni.dcs.AbstractDCS.get_cluster', Mock(return_value=standby_cluster)):
result = self.runner.invoke(ctl, ['demote-cluster', 'dummy', '--restore-command', 'foo'])
assert 'Cluster is already in the required state' in result.output

mock_patch.return_value.status = 200
# success
with patch('patroni.dcs.AbstractDCS.get_cluster', Mock(side_effect=[original_cluster, original_cluster,
standby_cluster])):
result = self.runner.invoke(ctl, ['demote-cluster', 'dummy', '--restore-command', 'foo', '--force'])
assert result.exit_code == 0

@patch('patroni.ctl.polling_loop', Mock(return_value=[0, 0]))
@patch('patroni.ctl.request_patroni')
def test_cluster_promote(self, mock_patch):
only_leader_cluster = get_cluster_initialized_with_only_leader()
standby_cluster = get_standby_cluster_initialized_with_only_leader()
# no leader
with patch('patroni.dcs.AbstractDCS.get_cluster', Mock(return_value=get_cluster_initialized_without_leader())):
result = self.runner.invoke(ctl, ['promote-cluster', 'dummy'])
assert 'Cluster has no leader, promotion is not possible' in result.output
# aborted
with patch('patroni.dcs.AbstractDCS.get_cluster', Mock(return_value=standby_cluster)):
result = self.runner.invoke(ctl, ['promote-cluster', 'dummy'])
assert 'Aborted' in result.output
# already required state
with patch('patroni.dcs.AbstractDCS.get_cluster', Mock(return_value=only_leader_cluster)):
result = self.runner.invoke(ctl, ['promote-cluster', 'dummy'])
assert 'Cluster is already in the required state' in result.output
# PATCH error
mock_patch.return_value.status = 500
result = self.runner.invoke(ctl, ['demote-cluster', 'dummy', '--restore-command', 'foo', '--force'])
assert 'Failed to demote' in result.output
# Exception
with patch('patroni.ctl.request_patroni', Mock(side_effect=Exception)):
result = self.runner.invoke(ctl, ['demote-cluster', 'dummy', '--restore-command', 'foo', '--force'])
assert 'Failed to demote' in result.output
# success
mock_patch.return_value.status = 200
with patch('patroni.dcs.AbstractDCS.get_cluster', Mock(side_effect=[standby_cluster, standby_cluster,
only_leader_cluster])):
result = self.runner.invoke(ctl, ['promote-cluster', 'dummy', '--force'])
assert result.exit_code == 0


class TestPatronictlPrettyTable(unittest.TestCase):

Expand Down
4 changes: 3 additions & 1 deletion tests/test_ha.py
Original file line number Diff line number Diff line change
Expand Up @@ -86,14 +86,16 @@ def get_cluster_initialized_with_only_leader(failover=None, cluster_config=None)


def get_standby_cluster_initialized_with_only_leader(failover=None, sync=None):
return get_cluster_initialized_with_only_leader(
cluster = get_cluster_initialized_with_only_leader(
cluster_config=ClusterConfig(1, {
"standby_cluster": {
"host": "localhost",
"port": 5432,
"primary_slot_name": "",
}}, 1)
)
cluster.leader.data['role'] = PostgresqlRole.STANDBY_LEADER
return cluster


def get_cluster_initialized_with_leader_and_failsafe():
Expand Down
Loading