|
| 1 | +import os |
| 2 | +import yaml |
| 3 | +import subprocess |
| 4 | +import base64 |
| 5 | +import time |
| 6 | +import logging |
| 7 | +from interface import KubernetesClusterPlugin |
| 8 | + |
| 9 | +logger = logging.getLogger("PluginClusterStacks") |
| 10 | + |
| 11 | +# Helper functions |
| 12 | +def wait_for_pods(self, namespaces, timeout=240, interval=15, kubeconfig=None): |
| 13 | + """ |
| 14 | + Waits for all pods in specified namespaces to reach the condition 'Ready'. |
| 15 | +
|
| 16 | + :param namespaces: List of namespaces to check for pod readiness. |
| 17 | + :param timeout: Total time to wait in seconds before giving up. |
| 18 | + :param interval: Time to wait between checks in seconds. |
| 19 | + :param kubeconfig: Optional path to the kubeconfig file for the target Kubernetes cluster. |
| 20 | + :return: True if all pods are ready within the given timeout, raises TimeoutError otherwise. |
| 21 | + """ |
| 22 | + start_time = time.time() |
| 23 | + |
| 24 | + while time.time() - start_time < timeout: |
| 25 | + all_pods_ready = True |
| 26 | + |
| 27 | + for namespace in namespaces: |
| 28 | + try: |
| 29 | + # Get pod status in the namespace |
| 30 | + wait_pods_command = ( |
| 31 | + f"kubectl wait -n {namespace} --for=condition=Ready --timeout={timeout}s pod --all" |
| 32 | + ) |
| 33 | + result = self._run_subprocess(wait_pods_command, f"Error fetching pods in {namespace}", shell=True, capture_output=True, text=True, kubeconfig=kubeconfig) |
| 34 | + |
| 35 | + if result.returncode != 0: |
| 36 | + logger.warning(f"Not all pods in namespace {namespace} are ready. Details: {result.stderr}") |
| 37 | + all_pods_ready = False |
| 38 | + else: |
| 39 | + logger.info(f"All pods in namespace {namespace} are ready.") |
| 40 | + |
| 41 | + except subprocess.CalledProcessError as error: |
| 42 | + logger.error(f"Error checking pods in {namespace}: {error}") |
| 43 | + all_pods_ready = False |
| 44 | + |
| 45 | + if all_pods_ready: |
| 46 | + logger.info("All specified pods are ready in all namespaces.") |
| 47 | + return True |
| 48 | + |
| 49 | + logger.info("Waiting for all pods in specified namespaces to become ready...") |
| 50 | + time.sleep(interval) |
| 51 | + |
| 52 | + raise TimeoutError(f"Timed out after {timeout} seconds waiting for pods in namespaces {namespaces} to become ready.") |
| 53 | + |
| 54 | + |
| 55 | +def load_config(config_path): |
| 56 | + """ |
| 57 | + Loads the configuration from a YAML file. |
| 58 | + """ |
| 59 | + |
| 60 | + with open(config_path, 'r') as file: |
| 61 | + config = yaml.safe_load(file) or {} |
| 62 | + return config |
| 63 | + |
| 64 | +class PluginClusterStacksRemoteAPI(KubernetesClusterPlugin): |
| 65 | + def __init__(self, config_file=None): |
| 66 | + self.config = load_config(config_file) if config_file else {} |
| 67 | + logger.debug(self.config) |
| 68 | + self.working_directory = os.getcwd() |
| 69 | + logger.debug(f"Working from {self.working_directory}") |
| 70 | + self.kubeconfig_mgmnt = self.config['kubeconfig'] |
| 71 | + self.workloadclusters = self.config['workloadcluster'] |
| 72 | + self.cs_namespace = self.config['namespace'] |
| 73 | + |
| 74 | + |
| 75 | + def create_cluster(self, cluster_name=None, version=None, kubeconfig_filepath=None): |
| 76 | + self.cluster_name = cluster_name |
| 77 | + self.cluster_version = version |
| 78 | + self.kubeconfig_cs_cluster = kubeconfig_filepath |
| 79 | + |
| 80 | + # Create workload cluster |
| 81 | + self._apply_yaml(self.workloadclusters, "Error applying cluster.yaml", kubeconfig=self.kubeconfig_mgmnt) |
| 82 | + |
| 83 | + #TODO:!!! We also need to introduce a waiting function here |
| 84 | + |
| 85 | + print("retrieve kubeconfig to path") |
| 86 | + self._retrieve_kubeconfig(namespace=self.cs_namespace, kubeconfig=self.kubeconfig_mgmnt) |
| 87 | + |
| 88 | + # Wait for workload system pods to be ready |
| 89 | + # wait_for_workload_pods_ready(kubeconfig_path=self.kubeconfig_cs_cluster) |
| 90 | + # ~ wait_for_pods(self, ["kube-system"], timeout=600, interval=15, kubeconfig=self.kubeconfig_cs_cluster) |
| 91 | + |
| 92 | + |
| 93 | + def delete_cluster(self, cluster_name=None): #TODO:!!! need to adjust delete method |
| 94 | + self.cluster_name = cluster_name |
| 95 | + #Get the name of the workloadcluster from the config file |
| 96 | + workload_cluster_config = load_config(self.workloadclusters) |
| 97 | + workload_cluster_name = workload_cluster_config['metadata']['name'] |
| 98 | + try: |
| 99 | + # Check if the cluster exists |
| 100 | + check_cluster_command = f"kubectl --kubeconfig={self.kubeconfig_mgmnt} get cluster {workload_cluster_name} -n {self.cs_namespace}" |
| 101 | + result = self._run_subprocess(check_cluster_command, "Failed to get cluster resource", shell=True, capture_output=True, text=True) |
| 102 | + |
| 103 | + # Proceed with deletion only if the cluster exists |
| 104 | + if result.returncode == 0: |
| 105 | + delete_command = f"kubectl --kubeconfig={self.kubeconfig_mgmnt} delete cluster {workload_cluster_name} --timeout=600s -n {self.cs_namespace}" |
| 106 | + self._run_subprocess(delete_command, "Timeout while deleting the cluster", shell=True) |
| 107 | + |
| 108 | + except subprocess.CalledProcessError as error: |
| 109 | + if "NotFound" in error.stderr: |
| 110 | + logger.info(f"Cluster {workload_cluster_name} not found. Skipping deletion.") |
| 111 | + else: |
| 112 | + raise RuntimeError(f"Error checking for cluster existence: {error}") |
| 113 | + |
| 114 | + |
| 115 | + def _apply_yaml(self, yaml_file, error_msg, kubeconfig=None): |
| 116 | + """ |
| 117 | + Applies a Kubernetes YAML configuration file to the cluster, substituting environment variables as needed. |
| 118 | +
|
| 119 | + :param yaml_file: The name of the YAML file to apply. |
| 120 | + :param kubeconfig: Optional path to a kubeconfig file, which specifies which Kubernetes cluster |
| 121 | + to apply the YAML configuration to. |
| 122 | + """ |
| 123 | + try: |
| 124 | + # Determine if the file is a local path or a URL |
| 125 | + if os.path.isfile(yaml_file): |
| 126 | + command = f"kubectl --kubeconfig={self.kubeconfig_mgmnt} apply -f {yaml_file} -n {self.cs_namespace}" |
| 127 | + else: |
| 128 | + raise ValueError(f"Unknown file: {yaml_file}") |
| 129 | + |
| 130 | + self._run_subprocess(command, error_msg, shell=True) |
| 131 | + |
| 132 | + except subprocess.CalledProcessError as error: |
| 133 | + raise RuntimeError(f"{error_msg}: {error}") |
| 134 | + |
| 135 | + |
| 136 | + def _retrieve_kubeconfig(self, namespace="default", kubeconfig=None): |
| 137 | + """ |
| 138 | + Retrieves the kubeconfig for the specified cluster and saves it to a local file. |
| 139 | +
|
| 140 | + :param namespace: The namespace of the cluster to retrieve the kubeconfig for. |
| 141 | + :param kubeconfig: Optional path to the kubeconfig file for the target Kubernetes cluster. |
| 142 | + """ |
| 143 | + |
| 144 | + #Get the name of the workloadcluster from the config file |
| 145 | + workload_cluster_config = load_config(self.workloadclusters) |
| 146 | + workload_cluster_name = workload_cluster_config['metadata']['name'] |
| 147 | + |
| 148 | + command_args = [ |
| 149 | + "kubectl ", |
| 150 | + f"--kubeconfig={self.kubeconfig_mgmnt}", |
| 151 | + f"-n {self.cs_namespace}", |
| 152 | + f"get secret {workload_cluster_name}-kubeconfig", |
| 153 | + "-o go-template='{{.data.value|base64decode}}'", |
| 154 | + f"> {self.kubeconfig_cs_cluster}", |
| 155 | + ] |
| 156 | + kubeconfig_command = "" |
| 157 | + for entry in command_args: |
| 158 | + kubeconfig_command += entry + " " |
| 159 | + self._run_subprocess(kubeconfig_command, "Error retrieving kubeconfig", shell=True) |
| 160 | + |
| 161 | + |
| 162 | + def _run_subprocess(self, command, error_msg, shell=False, capture_output=False, text=False): |
| 163 | + """ |
| 164 | + Executes a subprocess command with the specified environment variables and parameters. |
| 165 | +
|
| 166 | + :param command: The shell command to be executed. This can be a string or a list of arguments to pass to the subprocess. |
| 167 | + :param error_msg: A custom error message to be logged and raised if the subprocess fails. |
| 168 | + :param shell: Whether to execute the command through the shell (default: `False`). |
| 169 | + :param capture_output: Whether to capture the command's standard output and standard error (default: `False`). |
| 170 | + :param text: Whether to treat the command's output and error as text (default: `False`). |
| 171 | + :return: The result of the `subprocess.run` command |
| 172 | + """ |
| 173 | + try: |
| 174 | + # Run the subprocess |
| 175 | + result = subprocess.run(command, shell=shell, capture_output=capture_output, text=text, check=True) |
| 176 | + return result |
| 177 | + except subprocess.CalledProcessError as error: |
| 178 | + logger.error(f"{error_msg}: {error}") |
| 179 | + raise |
0 commit comments