|
1 | 1 | # Copyright 2022 Canonical Ltd.
|
2 | 2 | # See LICENSE file for licensing details.
|
3 | 3 | import asyncio
|
| 4 | +import json |
4 | 5 | import os
|
5 | 6 | import string
|
6 | 7 | import subprocess
|
|
16 | 17 | from kubernetes import config
|
17 | 18 | from kubernetes.client.api import core_v1_api
|
18 | 19 | from kubernetes.stream import stream
|
19 |
| -from lightkube.core.client import Client |
20 |
| -from lightkube.resources.core_v1 import Pod |
| 20 | +from lightkube.core.client import Client, GlobalResource |
| 21 | +from lightkube.resources.core_v1 import ( |
| 22 | + PersistentVolume, |
| 23 | + PersistentVolumeClaim, |
| 24 | + Pod, |
| 25 | +) |
21 | 26 | from pytest_operator.plugin import OpsTest
|
22 | 27 | from tenacity import (
|
23 | 28 | RetryError,
|
|
32 | 37 | APPLICATION_NAME,
|
33 | 38 | app_name,
|
34 | 39 | db_connect,
|
| 40 | + execute_query_on_unit, |
35 | 41 | get_password,
|
| 42 | + get_password_on_unit, |
36 | 43 | get_primary,
|
37 | 44 | get_unit_address,
|
| 45 | + run_command_on_unit, |
38 | 46 | )
|
39 | 47 |
|
40 | 48 | PORT = 5432
|
@@ -477,7 +485,7 @@ async def get_sync_standby(model: Model, application_name: str) -> str:
|
477 | 485 | async def is_connection_possible(ops_test: OpsTest, unit_name: str) -> bool:
|
478 | 486 | """Test a connection to a PostgreSQL server."""
|
479 | 487 | app = unit_name.split("/")[0]
|
480 |
| - password = await get_password(ops_test, database_app_name=app, down_unit=unit_name) |
| 488 | + password = await get_password(ops_test, database_app_name=app, unit_name=unit_name) |
481 | 489 | address = await get_unit_address(ops_test, unit_name)
|
482 | 490 | try:
|
483 | 491 | for attempt in Retrying(stop=stop_after_delay(60), wait=wait_fixed(3)):
|
@@ -819,3 +827,227 @@ async def stop_continuous_writes(ops_test: OpsTest) -> int:
|
819 | 827 | )
|
820 | 828 | action = await action.wait()
|
821 | 829 | return int(action.results["writes"])
|
| 830 | + |
| 831 | + |
| 832 | +async def get_storage_id(ops_test: OpsTest, unit_name: str) -> str: |
| 833 | + """Retrieves storage id associated with provided unit. |
| 834 | +
|
| 835 | + Note: this function exists as a temporary solution until this issue is ported to libjuju 2: |
| 836 | + https://github.com/juju/python-libjuju/issues/694 |
| 837 | + """ |
| 838 | + model_name = ops_test.model.info.name |
| 839 | + proc = subprocess.check_output(f"juju storage --model={model_name}".split()) |
| 840 | + proc = proc.decode("utf-8") |
| 841 | + for line in proc.splitlines(): |
| 842 | + if "Storage" in line: |
| 843 | + continue |
| 844 | + |
| 845 | + if len(line) == 0: |
| 846 | + continue |
| 847 | + |
| 848 | + if "detached" in line: |
| 849 | + continue |
| 850 | + |
| 851 | + if line.split()[0] == unit_name: |
| 852 | + return line.split()[1] |
| 853 | + |
| 854 | + |
| 855 | +def is_pods_exists(ops_test: OpsTest, unit_name: str) -> bool: |
| 856 | + client = Client(namespace=ops_test.model.name) |
| 857 | + pods = client.list(Pod, namespace=ops_test.model.name) |
| 858 | + |
| 859 | + for pod in pods: |
| 860 | + print( |
| 861 | + f"Pod: {pod.metadata.name} STATUS: {pod.status.phase} TAGGED: {unit_name.replace('/', '-')}" |
| 862 | + ) |
| 863 | + if (pod.metadata.name == unit_name.replace("/", "-")) and (pod.status.phase == "Running"): |
| 864 | + return True |
| 865 | + |
| 866 | + return False |
| 867 | + |
| 868 | + |
| 869 | +async def is_storage_exists(ops_test: OpsTest, storage_id: str) -> bool: |
| 870 | + """Returns True if storage exists by provided storage ID.""" |
| 871 | + complete_command = [ |
| 872 | + "show-storage", |
| 873 | + "-m", |
| 874 | + f"{ops_test.controller_name}:{ops_test.model.info.name}", |
| 875 | + storage_id, |
| 876 | + "--format=json", |
| 877 | + ] |
| 878 | + return_code, stdout, _ = await ops_test.juju(*complete_command) |
| 879 | + if return_code != 0: |
| 880 | + if return_code == 1: |
| 881 | + return storage_id in stdout |
| 882 | + raise Exception( |
| 883 | + "Expected command %s to succeed instead it failed: %s with code: ", |
| 884 | + complete_command, |
| 885 | + stdout, |
| 886 | + return_code, |
| 887 | + ) |
| 888 | + return storage_id in str(stdout) |
| 889 | + |
| 890 | + |
| 891 | +@retry(stop=stop_after_attempt(8), wait=wait_fixed(15), reraise=True) |
| 892 | +async def create_db(ops_test: OpsTest, app: str, db: str) -> None: |
| 893 | + """Creates database with specified name.""" |
| 894 | + unit = ops_test.model.applications[app].units[0] |
| 895 | + unit_address = await get_unit_address(ops_test, unit.name) |
| 896 | + password = await get_password_on_unit(ops_test, "operator", unit, app) |
| 897 | + |
| 898 | + conn = db_connect(unit_address, password) |
| 899 | + conn.autocommit = True |
| 900 | + cursor = conn.cursor() |
| 901 | + cursor.execute(f"CREATE DATABASE {db};") |
| 902 | + cursor.close() |
| 903 | + conn.close() |
| 904 | + |
| 905 | + |
| 906 | +@retry(stop=stop_after_attempt(8), wait=wait_fixed(15), reraise=True) |
| 907 | +async def check_db(ops_test: OpsTest, app: str, db: str) -> bool: |
| 908 | + """Returns True if database with specified name already exists.""" |
| 909 | + unit = ops_test.model.applications[app].units[0] |
| 910 | + unit_address = await get_unit_address(ops_test, unit.name) |
| 911 | + password = await get_password_on_unit(ops_test, "operator", unit, app) |
| 912 | + |
| 913 | + query = await execute_query_on_unit( |
| 914 | + unit_address, |
| 915 | + password, |
| 916 | + f"select datname from pg_catalog.pg_database where datname = '{db}';", |
| 917 | + ) |
| 918 | + |
| 919 | + if "ERROR" in query: |
| 920 | + raise Exception(f"Database check is failed with postgresql err: {query}") |
| 921 | + |
| 922 | + return db in query |
| 923 | + |
| 924 | + |
| 925 | +async def get_any_deatached_storage(ops_test: OpsTest) -> str: |
| 926 | + """Returns any of the current available deatached storage.""" |
| 927 | + return_code, storages_list, stderr = await ops_test.juju( |
| 928 | + "storage", "-m", f"{ops_test.controller_name}:{ops_test.model.info.name}", "--format=json" |
| 929 | + ) |
| 930 | + if return_code != 0: |
| 931 | + raise Exception(f"failed to get storages info with error: {stderr}") |
| 932 | + |
| 933 | + parsed_storages_list = json.loads(storages_list) |
| 934 | + for storage_name, storage in parsed_storages_list["storage"].items(): |
| 935 | + if (str(storage["status"]["current"]) == "detached") and (str(storage["life"] == "alive")): |
| 936 | + return storage_name |
| 937 | + |
| 938 | + raise Exception("failed to get deatached storage") |
| 939 | + |
| 940 | + |
| 941 | +async def check_system_id_mismatch(ops_test: OpsTest, unit_name: str) -> bool: |
| 942 | + """Returns True if system id mismatch if found in logs.""" |
| 943 | + log_str = f'CRITICAL: system ID mismatch, node {unit_name.replace("/", "-")} belongs to a different cluster' |
| 944 | + stdout = await run_command_on_unit( |
| 945 | + ops_test, |
| 946 | + unit_name, |
| 947 | + """cat /var/log/postgresql/*""", |
| 948 | + ) |
| 949 | + |
| 950 | + return log_str in str(stdout) |
| 951 | + |
| 952 | + |
| 953 | +def delete_pvc(ops_test: OpsTest, pvc: GlobalResource): |
| 954 | + """Deletes PersistentVolumeClaim.""" |
| 955 | + client = Client(namespace=ops_test.model.name) |
| 956 | + client.delete(PersistentVolumeClaim, namespace=ops_test.model.name, name=pvc.metadata.name) |
| 957 | + |
| 958 | + |
| 959 | +def get_pvc(ops_test: OpsTest, unit_name: str): |
| 960 | + """Get PersistentVolumeClaim for unit.""" |
| 961 | + client = Client(namespace=ops_test.model.name) |
| 962 | + pvc_list = client.list(PersistentVolumeClaim, namespace=ops_test.model.name) |
| 963 | + for pvc in pvc_list: |
| 964 | + if unit_name.replace("/", "-") in pvc.metadata.name: |
| 965 | + return pvc |
| 966 | + return None |
| 967 | + |
| 968 | + |
| 969 | +def get_pv(ops_test: OpsTest, unit_name: str): |
| 970 | + """Get PersistentVolume for unit.""" |
| 971 | + client = Client(namespace=ops_test.model.name) |
| 972 | + pv_list = client.list(PersistentVolume, namespace=ops_test.model.name) |
| 973 | + for pv in pv_list: |
| 974 | + if unit_name.replace("/", "-") in str(pv.spec.hostPath.path): |
| 975 | + return pv |
| 976 | + return None |
| 977 | + |
| 978 | + |
| 979 | +def change_pv_reclaim_policy(ops_test: OpsTest, pvc_config: PersistentVolumeClaim, policy: str): |
| 980 | + """Change PersistentVolume reclaim policy config value.""" |
| 981 | + client = Client(namespace=ops_test.model.name) |
| 982 | + res = client.patch( |
| 983 | + PersistentVolume, |
| 984 | + pvc_config.metadata.name, |
| 985 | + {"spec": {"persistentVolumeReclaimPolicy": f"{policy}"}}, |
| 986 | + namespace=ops_test.model.name, |
| 987 | + ) |
| 988 | + return res |
| 989 | + |
| 990 | + |
| 991 | +def remove_pv_claimref(ops_test: OpsTest, pv_config: PersistentVolume): |
| 992 | + """Remove claimRef config value for PersistentVolume.""" |
| 993 | + client = Client(namespace=ops_test.model.name) |
| 994 | + client.patch( |
| 995 | + PersistentVolume, |
| 996 | + pv_config.metadata.name, |
| 997 | + {"spec": {"claimRef": None}}, |
| 998 | + namespace=ops_test.model.name, |
| 999 | + ) |
| 1000 | + |
| 1001 | + |
| 1002 | +def change_pvc_pv_name( |
| 1003 | + pvc_config: PersistentVolumeClaim, pv_name_new: str |
| 1004 | +) -> PersistentVolumeClaim: |
| 1005 | + """Change PersistentVolume name config value for PersistentVolumeClaim.""" |
| 1006 | + pvc_config.spec.volumeName = pv_name_new |
| 1007 | + del pvc_config.metadata.annotations["pv.kubernetes.io/bind-completed"] |
| 1008 | + del pvc_config.metadata.uid |
| 1009 | + return pvc_config |
| 1010 | + |
| 1011 | + |
| 1012 | +def apply_pvc_config(ops_test: OpsTest, pvc_config: PersistentVolumeClaim): |
| 1013 | + """Apply provided PersistentVolumeClaim config.""" |
| 1014 | + client = Client(namespace=ops_test.model.name) |
| 1015 | + pvc_config.metadata.managedFields = None |
| 1016 | + client.apply(pvc_config, namespace=ops_test.model.name, field_manager="lightkube") |
| 1017 | + |
| 1018 | + |
| 1019 | +async def remove_unit_force(ops_test: OpsTest, num_units: int): |
| 1020 | + """Removes unit with --force --no-wait.""" |
| 1021 | + app_name_str = await app_name(ops_test) |
| 1022 | + scale = len(ops_test.model.applications[app_name_str].units) - num_units |
| 1023 | + complete_command = [ |
| 1024 | + "remove-unit", |
| 1025 | + f"{app_name_str}", |
| 1026 | + "--force", |
| 1027 | + "--no-wait", |
| 1028 | + "--no-prompt", |
| 1029 | + "--num-units", |
| 1030 | + num_units, |
| 1031 | + ] |
| 1032 | + return_code, stdout, stderr = await ops_test.juju(*complete_command) |
| 1033 | + if return_code != 0: |
| 1034 | + raise Exception( |
| 1035 | + "Expected command %s to succeed instead it failed: %s with err: %s with code: %s", |
| 1036 | + complete_command, |
| 1037 | + stdout, |
| 1038 | + stderr, |
| 1039 | + return_code, |
| 1040 | + ) |
| 1041 | + |
| 1042 | + if scale == 0: |
| 1043 | + await ops_test.model.block_until( |
| 1044 | + lambda: len(ops_test.model.applications[app_name_str].units) == scale, |
| 1045 | + timeout=1000, |
| 1046 | + ) |
| 1047 | + else: |
| 1048 | + await ops_test.model.wait_for_idle( |
| 1049 | + apps=[app_name_str], |
| 1050 | + status="active", |
| 1051 | + timeout=1000, |
| 1052 | + wait_for_exact_units=scale, |
| 1053 | + ) |
0 commit comments