|
63 | 63 |
|
64 | 64 | from . import global_config |
65 | 65 | from .config import Config |
66 | | -from .dcs import AbstractDCS, Cluster, get_dcs as _get_dcs, Member |
| 66 | +from .dcs import AbstractDCS, Cluster, get_dcs as _get_dcs, Leader, Member |
67 | 67 | from .exceptions import PatroniException |
68 | 68 | from .postgresql.misc import postgres_version_to_int, PostgresqlRole, PostgresqlState |
69 | 69 | from .postgresql.mpp import get_mpp |
@@ -512,11 +512,14 @@ def watching(w: bool, watch: Optional[int], max_count: Optional[int] = None, cle |
512 | 512 | return |
513 | 513 |
|
514 | 514 | counter = 1 |
| 515 | + yield_time = time.time() |
515 | 516 | while watch and counter <= (max_count or counter): |
516 | | - time.sleep(watch) |
| 517 | + elapsed = time.time() - yield_time |
| 518 | + time.sleep(max(0, watch - elapsed)) |
517 | 519 | counter += 1 |
518 | 520 | if clear: |
519 | 521 | click.clear() |
| 522 | + yield_time = time.time() |
520 | 523 | yield 0 |
521 | 524 |
|
522 | 525 |
|
@@ -2314,3 +2317,115 @@ def format_pg_version(version: int) -> str: |
2314 | 2317 | return "{0}.{1}.{2}".format(version // 10000, version // 100 % 100, version % 100) |
2315 | 2318 | else: |
2316 | 2319 | return "{0}.{1}".format(version // 10000, version % 100) |
| 2320 | + |
| 2321 | + |
| 2322 | +def change_cluster_role(cluster_name: str, force: bool, standby_config: Optional[Dict[str, Any]]) -> None: |
| 2323 | + """Demote or promote cluster. |
| 2324 | +
|
| 2325 | + :param cluster_name: name of the Patroni cluster. |
| 2326 | + :param force: if ``True`` run cluster demotion without asking for confirmation. |
| 2327 | + :param standby_config: standby cluster configuration to be applied if demotion is requested. |
| 2328 | + """ |
| 2329 | + demote = bool(standby_config) |
| 2330 | + action_name = 'demot' if demote else 'promot' |
| 2331 | + target_role = PostgresqlRole.STANDBY_LEADER if demote else PostgresqlRole.PRIMARY |
| 2332 | + |
| 2333 | + dcs = get_dcs(cluster_name, None) |
| 2334 | + cluster = dcs.get_cluster() |
| 2335 | + leader_name = cluster.leader and cluster.leader.name |
| 2336 | + if not leader_name: |
| 2337 | + raise PatroniCtlException(f'Cluster has no leader, {action_name}ion is not possible') |
| 2338 | + if cluster.leader and cluster.leader.data.get('role') == target_role: |
| 2339 | + raise PatroniCtlException('Cluster is already in the required state') |
| 2340 | + |
| 2341 | + click.echo('Current cluster topology') |
| 2342 | + output_members(cluster, cluster_name) |
| 2343 | + if not force: |
| 2344 | + confirm = click.confirm(f'Are you sure you want to {action_name}e {cluster_name} cluster?') |
| 2345 | + if not confirm: |
| 2346 | + raise PatroniCtlException(f'Aborted cluster {action_name}ion') |
| 2347 | + |
| 2348 | + try: |
| 2349 | + if TYPE_CHECKING: # pragma: no cover |
| 2350 | + assert isinstance(cluster.leader, Leader) |
| 2351 | + r = request_patroni(cluster.leader.member, 'patch', 'config', {'standby_cluster': standby_config}) |
| 2352 | + |
| 2353 | + if r.status != 200: |
| 2354 | + raise PatroniCtlException( |
| 2355 | + f'Failed to {action_name}e {cluster_name} cluster: ' |
| 2356 | + f'/config PATCH status code={r.status}, ({r.data.decode("utf-8")})') |
| 2357 | + except Exception as err: |
| 2358 | + raise PatroniCtlException(f'Failed to {action_name}e {cluster_name} cluster: {err}') |
| 2359 | + |
| 2360 | + for _ in watching(True, 1, clear=False): |
| 2361 | + cluster = dcs.get_cluster() |
| 2362 | + is_unlocked = cluster.is_unlocked() |
| 2363 | + leader_role = cluster.leader and cluster.leader.data.get('role') |
| 2364 | + leader_state = cluster.leader and cluster.leader.data.get('state') |
| 2365 | + old_leader = cluster.get_member(leader_name, False) |
| 2366 | + old_leader_state = old_leader and old_leader.data.get('state') |
| 2367 | + |
| 2368 | + if not is_unlocked and leader_role == target_role and leader_state == PostgresqlState.RUNNING: |
| 2369 | + if not demote or old_leader_state == PostgresqlState.RUNNING: |
| 2370 | + click.echo( |
| 2371 | + f'{datetime.datetime.now().strftime("%Y-%m-%d %H:%M:%S")} cluster is successfully {action_name}ed') |
| 2372 | + break |
| 2373 | + |
| 2374 | + state_prts = [f'{datetime.datetime.now().strftime("%Y-%m-%d %H:%M:%S")} cluster is unlocked: {is_unlocked}', |
| 2375 | + f'leader role: {leader_role}', |
| 2376 | + f'leader state: {leader_state}'] |
| 2377 | + if demote and cluster.leader and leader_name != cluster.leader.name and old_leader_state: |
| 2378 | + state_prts.append(f'previous leader state: {repr(old_leader_state)}') |
| 2379 | + click.echo(", ".join(state_prts)) |
| 2380 | + output_members(cluster, cluster_name) |
| 2381 | + |
| 2382 | + |
| 2383 | +@ctl.command('demote-cluster', help="Demote cluster to a standby cluster") |
| 2384 | +@arg_cluster_name |
| 2385 | +@option_force |
| 2386 | +@click.option('--host', help='Address of the remote node', required=False) |
| 2387 | +@click.option('--port', help='Port of the remote node', type=int, required=False) |
| 2388 | +@click.option('--restore-command', help='Command to restore WAL records from the remote primary', required=False) |
| 2389 | +@click.option('--primary-slot-name', help='Name of the slot on the remote node to use for replication', required=False) |
| 2390 | +def demote_cluster(cluster_name: str, force: bool, host: Optional[str], port: Optional[int], |
| 2391 | + restore_command: Optional[str], primary_slot_name: Optional[str]) -> None: |
| 2392 | + """Process ``demote-cluster`` command of ``patronictl`` utility. |
| 2393 | +
|
| 2394 | + Demote cluster to a standby cluster. |
| 2395 | +
|
| 2396 | + :param cluster_name: name of the Patroni cluster. |
| 2397 | + :param force: if ``True`` run cluster demotion without asking for confirmation. |
| 2398 | + :param host: address of the remote node. |
| 2399 | + :param port: port of the remote node. |
| 2400 | + :param restore_command: command to restore WAL records from the remote primary'. |
| 2401 | + :param primary_slot_name: name of the slot on the remote node to use for replication. |
| 2402 | +
|
| 2403 | + :raises: |
| 2404 | + :class:`PatroniCtlException`: if: |
| 2405 | + * neither ``host`` nor ``port`` nor ``restore_command`` is provided; or |
| 2406 | + * cluster has no leader; or |
| 2407 | + * cluster is already in the required state; or |
| 2408 | + * operation is aborted. |
| 2409 | + """ |
| 2410 | + if not any((host, port, restore_command)): |
| 2411 | + raise PatroniCtlException('At least --host, --port or --restore-command should be specified') |
| 2412 | + |
| 2413 | + data = {k: v for k, v in {'host': host, |
| 2414 | + 'port': port, |
| 2415 | + 'primary_slot_name': primary_slot_name, |
| 2416 | + 'restore_command': restore_command}.items() if v} |
| 2417 | + change_cluster_role(cluster_name, force, data) |
| 2418 | + |
| 2419 | + |
| 2420 | +@ctl.command('promote-cluster', help="Promote cluster, make it run standalone") |
| 2421 | +@arg_cluster_name |
| 2422 | +@option_force |
| 2423 | +def promote_cluster(cluster_name: str, force: bool) -> None: |
| 2424 | + """Process ``promote-cluster`` command of ``patronictl`` utility. |
| 2425 | +
|
| 2426 | + Promote cluster, make it run standalone. |
| 2427 | +
|
| 2428 | + :param cluster_name: name of the Patroni cluster. |
| 2429 | + :param force: if ``True`` run cluster demotion without asking for confirmation. |
| 2430 | + """ |
| 2431 | + change_cluster_role(cluster_name, force, None) |
0 commit comments