Skip to content
Merged
Show file tree
Hide file tree
Changes from 20 commits
Commits
Show all changes
30 commits
Select commit Hold shift + click to select a range
a7d7466
Implement multisite-switchover CTL command
Mar 26, 2025
2be30da
Solve a small logic issue
Mar 27, 2025
12e0df5
Set candidate name reliably if there is only one
Mar 27, 2025
f772966
Fix some type hinting and import issues
Apr 17, 2025
412d0a6
Address further type checking issues
Apr 17, 2025
0804b93
Simplify site switchover name
Apr 18, 2025
ec9b6c3
Merge branch 'multisite' into ctl-switchover-only
May 12, 2025
78d8607
Address some flake8 complaints
May 13, 2025
d3d1e47
Limit py-consul version depending on python version (#3336)
CyberDem0n Apr 18, 2025
b47d7b9
Convert roles to enums (#3303)
hughcapet Apr 18, 2025
07b81cd
Convert states to enums (#3293)
hughcapet Mar 14, 2025
ed70e0e
Convert roles to enums (#3303)
hughcapet Apr 18, 2025
2001406
Adapt unit tests to multisite changes
May 13, 2025
6ed7554
Cherry-pivk some upstream changes
hughcapet Apr 18, 2025
bf5746e
Implement kubernetes.bootstrap_labels (#3257)
hughcapet Feb 18, 2025
d49c0ba
Solve a couple of Flaky unit tests (#3294)
Garaz08 Feb 25, 2025
ab43b49
Pick changes manually that were missed during cherry picking
May 14, 2025
2d10c10
Fix some linting issues
May 14, 2025
9f20479
Make sure str representation is used for click options (#3352)
hughcapet May 12, 2025
482e68c
Merge branch 'multisite' into ctl-switchover-only
ants May 15, 2025
3bf317a
Fix a small isort issue
May 15, 2025
db359d6
Fix most Pyright issues in multisite.py
May 15, 2025
ce73bff
Fix most Pyright issues in ha.py
May 15, 2025
b1f204e
Fix method call
May 16, 2025
94fbf0b
Fix remaining pyright issues that can be fixed without implementation…
May 19, 2025
49f8c25
Change MS history to the format of normal history
May 21, 2025
84c351d
Make argument which we do not use optional
May 21, 2025
a5a9c39
Fix tests failing with newest click
May 22, 2025
50c9785
Fix some RST
May 23, 2025
aed5381
Add multisite.rst to TOC
May 23, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions patroni/api.py
Original file line number Diff line number Diff line change
Expand Up @@ -681,9 +681,9 @@
metrics.append("# HELP patroni_multisite_switches Number of times multisite leader has been switched")
metrics.append("# TYPE patroni_multisite_switches counter")
metrics.append("patroni_multisite_switches{0} {1}"
.format(labels, patroni.multisite.site_switches))

Check failure on line 684 in patroni/api.py

View workflow job for this annotation

GitHub Actions / pyright

Type of "site_switches" is partially unknown   Type of "site_switches" is "Unknown | None" (reportUnknownMemberType)

Check failure on line 684 in patroni/api.py

View workflow job for this annotation

GitHub Actions / pyright

Argument type is partially unknown   Argument corresponds to parameter "args" in function "format"   Argument type is "Unknown | None" (reportUnknownArgumentType)

Check failure on line 684 in patroni/api.py

View workflow job for this annotation

GitHub Actions / pyright

Cannot access attribute "site_switches" for class "SingleSiteController"   Attribute "site_switches" is unknown (reportAttributeAccessIssue)

self.write_response(200, '\n'.join(metrics)+'\n', content_type='text/plain')
self.write_response(200, '\n'.join(metrics) + '\n', content_type='text/plain')

def do_GET_multisite(self):
self._write_json_response(200, self.server.patroni.multisite.status())
Expand Down Expand Up @@ -1199,20 +1199,20 @@
self.do_POST_failover(action='switchover')

@check_access
def do_POST_multisite_switchover(self):
def do_POST_site_switchover(self):

Check failure on line 1202 in patroni/api.py

View workflow job for this annotation

GitHub Actions / pyright

Return type, "Unknown | None", is partially unknown (reportUnknownParameterType)
request = self._read_json_content()
(status_code, data) = (400, '')
if not request:
return
if not self.server.patroni.multisite.is_active:
return self._write_response(400, 'Cluster is not in multisite mode')

Check failure on line 1208 in patroni/api.py

View workflow job for this annotation

GitHub Actions / pyright

Type of "_write_response" is unknown (reportUnknownMemberType)

Check failure on line 1208 in patroni/api.py

View workflow job for this annotation

GitHub Actions / pyright

Return type is unknown (reportUnknownVariableType)

Check failure on line 1208 in patroni/api.py

View workflow job for this annotation

GitHub Actions / pyright

Cannot access attribute "_write_response" for class "RestApiHandler*"   Attribute "_write_response" is unknown (reportAttributeAccessIssue)

scheduled_at = request.get('scheduled_at')
target_site = request.get('target_site')
logger.info("received multisite switchover request with target_site=%s scheduled_at=%s",
target_site, scheduled_at)

if self.server.patroni.multisite.dcs.manual_failover(None, None, scheduled_at=scheduled_at,

Check failure on line 1215 in patroni/api.py

View workflow job for this annotation

GitHub Actions / pyright

Type of "dcs" is partially unknown   Type of "dcs" is "AbstractDCS | Unknown" (reportUnknownMemberType)

Check failure on line 1215 in patroni/api.py

View workflow job for this annotation

GitHub Actions / pyright

Type of "manual_failover" is partially unknown   Type of "manual_failover" is "((leader: str | None, candidate: str | None, scheduled_at: datetime | None = None, target_site: str | None = None, version: Any | None = None) -> bool) | Unknown" (reportUnknownMemberType)

Check failure on line 1215 in patroni/api.py

View workflow job for this annotation

GitHub Actions / pyright

Cannot access attribute "dcs" for class "SingleSiteController"   Attribute "dcs" is unknown (reportAttributeAccessIssue)
target_site=target_site):
data = 'multisite switchover scheduled'
status_code = 202
Expand Down
196 changes: 191 additions & 5 deletions patroni/ctl.py
Original file line number Diff line number Diff line change
Expand Up @@ -370,7 +370,7 @@ def is_citus_cluster() -> bool:
__dcs_cache: Dict[Tuple[str, Optional[int]], AbstractDCS] = {}


def get_dcs(scope: str, group: Optional[int]) -> AbstractDCS:
def get_dcs(scope: str, group: Optional[int], multisite: Optional[bool] = False) -> AbstractDCS:
"""Get the DCS object.

:param scope: cluster name.
Expand All @@ -385,13 +385,20 @@ def get_dcs(scope: str, group: Optional[int]) -> AbstractDCS:
"""
if (scope, group) in __dcs_cache:
return __dcs_cache[(scope, group)]

config = _get_configuration()
config.update({'scope': scope, 'patronictl': True})
if group is not None:
config['citus'] = {'group': group, 'database': 'postgres'}
config.setdefault('name', scope)

try:
dcs = _get_dcs(config)
# TODO: might be necessary for site switchover candidates collection
# if multisite:
# _, dcs = MultisiteController.get_dcs_config(config)
# else:
# dcs = _get_dcs(config)
if is_citus_cluster() and group is None:
dcs.is_mpp_coordinator = lambda: True
click.get_current_context().obj['__mpp'] = dcs.mpp
Expand Down Expand Up @@ -1321,6 +1328,11 @@ def _do_failover_or_switchover(action: str, cluster_name: str, group: Optional[i

if candidate is None and not force:
candidate = click.prompt('Candidate ' + str(candidate_names), type=str, default='')
# TODO: set candidate names reliably where there's only one
# if len(candidate_names) == 1:
# candidate = click.prompt('Candidate ', type=str, default=candidate_names[0])
# else:
# candidate = click.prompt('Candidate ' + str(candidate_names), type=str, default='')

if action == 'failover' and not candidate:
raise PatroniCtlException('Failover could be performed only to a specific candidate')
Expand Down Expand Up @@ -1404,6 +1416,151 @@ def _do_failover_or_switchover(action: str, cluster_name: str, group: Optional[i
output_members(cluster, cluster_name, group=group)


def _do_site_switchover(cluster_name: str, group: Optional[int],
switchover_leader: Optional[str], candidate: Optional[str],
force: bool, scheduled: Optional[str] = None) -> None:
"""Perform a site switchover operation in the cluster.

Informational messages are printed in the console during the operation, as well as the list of members before and
after the operation, so the user can follow the operation status.

.. note::
If not able to perform the operation through the REST API, write directly to the DCS as a fall back.

:param cluster_name: name of the Patroni cluster.
:param group: filter Citus group within we should perform a failover or switchover. If ``None``, user will be
prompted for filling it -- unless *force* is ``True``, in which case an exception is raised.
:param switchover_leader: name of the leader site passed as switchover option.
:param candidate: name of a standby site to be promoted.
:param force: perform the switchover without asking for confirmations.
:param scheduled: timestamp when the switchover should be scheduled to occur. If ``now`` perform immediately.

:raises:
:class:`PatroniCtlException`: if:
* Patroni is running on a Citus cluster, but no *group* was specified; or
* a switchover was requested but the cluster has no leader site (or multisite is not active); or
* *switchover_leader* does not match the current leader site of the cluster; or
* cluster has no candidate sites available for the operation; or
* current leader and *candidate* are the same; or
* *candidate* is not a site of the cluster; or
* trying to schedule a switchover in a cluster that is in maintenance mode; or
* user aborts the operation.
"""
dcs = get_dcs(cluster_name, group)
cluster = dcs.get_cluster()
click.echo('Current cluster topology')
output_members(cluster, cluster_name, group=group)

if is_citus_cluster() and group is None:
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Citus will be quite unhappy if some nodes don't have a local leader. Correct fix is probably to have a single point that determines which site is active for the whole cluster. But that is a larger job. I think for now we should just have an error saying that citus and multisite together is unsupported.

if force:
raise PatroniCtlException('For Citus clusters the --group must me specified')
else:
group = click.prompt('Citus group', type=int)
dcs = get_dcs(cluster_name, group)
cluster = dcs.get_cluster()

config = global_config.from_cluster(cluster)

cluster_leader = cluster.leader and cluster.leader.name

if not cluster_leader:
raise PatroniCtlException('This cluster has no leader')

if cluster.leader and cluster.leader.multisite:
leader_site = (cluster.leader.multisite.get('name') if not cluster.leader.multisite.get('standby_config') else
cluster.leader.multisite.get('standby_config', {}).get('leader_site'))
else:
raise PatroniCtlException('Multisite is not active or there is no leader site, cannot switch sites')

if switchover_leader is None:
if force:
switchover_leader = leader_site
else:
prompt = 'Leader site'
switchover_leader = click.prompt(prompt, type=str, default=leader_site)

if leader_site != switchover_leader:
raise PatroniCtlException(f'Site {switchover_leader} is not the leader of cluster {cluster_name}')

# multisite_dcs = get_dcs(cluster_name, group, True)
# multisite_cluster = multisite_dcs.get_cluster()

candidate_names = [str(m.multisite['name']) for m in cluster.members
if m.multisite and m.multisite['name'] != leader_site]
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think this should not work, the other sites members are not listed in this sites DCS. Meaning candidate_names will only contain the site name that this is executed on. This is only correct when executed on standby site in a 2 node cluster.

# We sort the names for consistent output to the client
candidate_names.sort()

# TODO: once there is a reliable way for getting the candidate sites when on the leader site, turn this back on
# if not candidate_names:
# raise PatroniCtlException('No candidates found to switch over to')

if candidate is None and not force:
candidate = click.prompt('Candidate ' + str(candidate_names), type=str, default='')

if candidate and candidate not in candidate_names:
if candidate == cluster_leader:
raise PatroniCtlException(
f'Site {candidate} is already the leader of cluster {cluster_name}')
raise PatroniCtlException(
f'Site {candidate} does not exist in cluster {cluster_name}')

scheduled_at_str = None
scheduled_at = None

if scheduled is None and not force:
next_hour = (datetime.datetime.now() + datetime.timedelta(hours=1)).strftime('%Y-%m-%dT%H:%M')
scheduled = click.prompt('When should the switchover take place (e.g. ' + next_hour + ' ) ',
type=str, default='now')

scheduled_at = parse_scheduled(scheduled)
if scheduled_at:
if config.is_paused:
raise PatroniCtlException("Can't schedule switchover in the paused state")
scheduled_at_str = scheduled_at.isoformat()

switchover_value = {'candidate': candidate}

if scheduled_at_str:
switchover_value['scheduled_at'] = scheduled_at_str
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This value is ignored by the multisite code. Scheduled switchover is not yet implemented.


logging.debug(switchover_value)

# By now we have established that the leader site exists and the candidate also exists
if not force:
demote_msg = f' to another site, demoting current site {cluster_leader}' if cluster_leader else ''
if scheduled_at_str:
if not click.confirm(f'Are you sure you want to schedule switchover of cluster '
f'{cluster_name} at {scheduled_at_str}{demote_msg}?'):
raise PatroniCtlException('Aborting scheduled switchover')
else:
if not click.confirm(f'Are you sure you want to switch over cluster {cluster_name}{demote_msg}?'):
raise PatroniCtlException('Aborting switchover')

r = None
try:
# We would already have thrown an exception if there was no leader
member = cluster.leader.member if cluster.leader else candidate and cluster.get_member(candidate, False)
if TYPE_CHECKING: # pragma: no cover
assert isinstance(member, Member)
r = request_patroni(member, 'post', 'site_switchover', switchover_value)

if r.status in (200, 202):
logging.debug(r)
cluster = dcs.get_cluster()
logging.debug(cluster)
click.echo('{0} {1}'.format(timestamp(), r.data.decode('utf-8')))
else:
click.echo('Multisite switchover failed, details: {0}, {1}'.format(r.status, r.data.decode('utf-8')))
return
except Exception:
logging.exception(r)
logging.warning('Failing over to DCS')
click.echo('{0} Could not perform site switchover using Patroni API, falling back to DCS'.format(timestamp()))
dcs.manual_failover(leader=switchover_leader, candidate='', target_site=candidate, scheduled_at=scheduled_at)

output_members(cluster, cluster_name, group=group)


@ctl.command('failover', help='Failover to a replica')
@arg_cluster_name
@option_citus_group
Expand Down Expand Up @@ -1455,6 +1612,36 @@ def switchover(cluster_name: str, group: Optional[int], leader: Optional[str],
_do_failover_or_switchover('switchover', cluster_name, group, candidate, force, leader, scheduled)


@ctl.command('site-switchover', help='Switchover to another data centre')
@arg_cluster_name
@option_citus_group
@click.option('--leader-site', '--primary-site', 'leader_site', help='The name of the current leader site',
default=None)
@click.option('--candidate-site', 'candidate_site', help='The name of the candidate', default=None)
@click.option('--scheduled', help='Timestamp of a scheduled switchover in unambiguous format (e.g. ISO 8601)',
default=None)
@option_force
def site_switchover(cluster_name: str, group: Optional[int], leader_site: Optional[str],
candidate_site: Optional[str], force: bool, scheduled: Optional[str]) -> None:
"""Process ``multisite-switchover`` command of ``patronictl`` utility.

Perform a site switchover operation in the multisite cluster.

.. seealso::
Refer to :func:`_do_site_switchover` for details.

:param cluster_name: name of the Patroni cluster.
:param group: filter Citus group within we should perform a switchover. If ``None``, user will be prompted for
filling it -- unless *force* is ``True``, in which case an exception is raised by
:func:`_do_failover_or_switchover`.
:param leader-site: name of the current leader site.
:param candidate-site: name of a standby site to be promoted.
:param force: perform the switchover without asking for confirmations.
:param scheduled: timestamp when the switchover should be scheduled to occur. If ``now`` perform immediately.
"""
_do_site_switchover(cluster_name, group, leader_site, candidate_site, force, scheduled)


def generate_topology(level: int, member: Dict[str, Any],
topology: Dict[Optional[str], List[Dict[str, Any]]]) -> Iterator[Dict[str, Any]]:
"""Recursively yield members with their names adjusted according to their *level* in the cluster topology.
Expand Down Expand Up @@ -1543,13 +1730,12 @@ def get_cluster_service_info(cluster: Dict[str, Any]) -> List[str]:
"""
service_info: List[str] = []



if 'multisite' in cluster:
info = f"Multisite {cluster['multisite']['name'] or ''} is {cluster['multisite']['status'].lower()}"
info = f"Multisite {cluster['multisite'].get('name') or ''} is {cluster['multisite']['status'].lower()}"
standby_config = cluster['multisite'].get('standby_config', {})
if standby_config and standby_config.get('host'):
info += f", replicating from {standby_config['host']}:{standby_config.get('port', 5432)}"
info += f", replicating from {standby_config['leader_site']}"
info += f" ({standby_config['host']}:{standby_config.get('port', 5432)})"
service_info.append(info)

if cluster.get('pause'):
Expand Down
15 changes: 13 additions & 2 deletions patroni/dcs/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -332,6 +332,11 @@ def receive_lsn(self) -> Optional[int]:
def replay_lsn(self) -> Optional[int]:
return parse_int(self.data.get('replay_lsn'))

@property
def multisite(self) -> Optional[Dict[str, Any]]:
"""The ``multisite`` dict of the member if multisite is on."""
return self.data.get('multisite')


class RemoteMember(Member):
"""Represents a remote member (typically a primary) for a standby cluster.
Expand Down Expand Up @@ -434,6 +439,11 @@ def checkpoint_after_promote(self) -> Optional[bool]:
and 'checkpoint_after_promote' not in self.data
return None

@property
def multisite(self) -> Optional[Dict[str, Any]]:
"""Multisite dict of the member data of the :class:`Member` instance if multisite is active"""
return self.member.data.get('multisite')


class Failover(NamedTuple):
"""Immutable object (namedtuple) representing configuration information required for failover/switchover capability.
Expand Down Expand Up @@ -503,14 +513,15 @@ def from_node(version: _Version, value: Union[str, Dict[str, str]]) -> 'Failover
t = [a.strip() for a in value.split(':')]
leader = t[0]
candidate = t[1] if len(t) > 1 else None
return Failover(version, leader, candidate, None)
return Failover(version, leader, candidate, None, '')
else:
data = {}

if data.get('scheduled_at'):
data['scheduled_at'] = dateutil.parser.parse(data['scheduled_at'])

return Failover(version, data.get('leader'), data.get('member'), data.get('scheduled_at'), data.get('target_site'))
return Failover(version, data.get('leader'), data.get('member'), data.get('scheduled_at'),
data.get('target_site'))

def __len__(self) -> int:
"""Implement ``len`` function capability.
Expand Down
3 changes: 2 additions & 1 deletion patroni/dcs/etcd3.py
Original file line number Diff line number Diff line change
Expand Up @@ -765,7 +765,8 @@ def _cluster_from_nodes(self, nodes: Dict[str, Any]) -> Cluster:

# get leader
leader = nodes.get(self._LEADER)
if not self._ctl and not self._multisite and leader and leader['value'] == self._name and self._lease != leader.get('lease'):
if not self._ctl and not self._multisite and leader and leader['value'] == self._name and \
self._lease != leader.get('lease'):
logger.warning('I am the leader but not owner of the lease')

if leader:
Expand Down
9 changes: 5 additions & 4 deletions patroni/ha.py
Original file line number Diff line number Diff line change
Expand Up @@ -358,7 +358,7 @@ def acquire_lock(self) -> bool:
self.set_is_leader(ret)
multisite_ret = self.patroni.multisite.resolve_leader()
if multisite_ret:
logger.error("Releasing leader lock because multi site status is: "+multisite_ret)
logger.error("Releasing leader lock because multi site status is: " + multisite_ret)
self.dcs.delete_leader()
return False
return ret
Expand Down Expand Up @@ -1582,7 +1582,7 @@ def demote(self, mode: str) -> Optional[bool]:
'graceful': dict(stop='fast', checkpoint=True, release=True, offline=False, async_req=False), # noqa: E241,E501
'immediate': dict(stop='immediate', checkpoint=False, release=True, offline=False, async_req=True), # noqa: E241,E501
'immediate-nolock': dict(stop='immediate', checkpoint=False, release=False, offline=False, async_req=True), # noqa: E241,E501
'multisite': dict(stop='fast', checkpoint=True, release=False, offline=True, async_req=False), # noqa: E241,E501
'multisite': dict(stop='fast', checkpoint=True, release=False, offline=True, async_req=False), # noqa: E241,E501
}[mode]

logger.info('Demoting self (%s)', mode)
Expand All @@ -1604,7 +1604,7 @@ def on_shutdown(checkpoint_location: int, prev_location: int) -> None:
status['released'] = True

if mode == 'multisite':
on_shutdown = self.patroni.multisite.on_shutdown
on_shutdown = self.patroni.multisite.on_shutdown # noqa: F811

def before_shutdown() -> None:
if self.state_handler.mpp_handler.is_coordinator():
Expand Down Expand Up @@ -1739,7 +1739,8 @@ def process_unhealthy_cluster(self) -> str:
if failover:
if self.is_paused() and failover.leader and failover.candidate:
logger.info('Updating failover key after acquiring leader lock...')
self.dcs.manual_failover('', failover.candidate, failover.scheduled_at, version=failover.version)
self.dcs.manual_failover('', failover.candidate, failover.scheduled_at,
version=failover.version)
else:
logger.info('Cleaning up failover key after acquiring leader lock...')
self.dcs.manual_failover('', '')
Expand Down
Loading
Loading