11import os
22import yaml
33import subprocess
4- import base64
54import time
65import logging
76from interface import KubernetesClusterPlugin
87
98logger = logging .getLogger ("PluginClusterStacks" )
109
10+
11+ # Default configuration values
12+ DEFAULTS = {
13+ 'cs_name' : 'scs' ,
14+ }
15+
16+ # Keys needed for environment variables
17+ ENV_KEYS = {'cs_name' , 'cs_version' , 'cs_channel' , 'cs_secretname' , 'cs_class_name' ,
18+ 'cs_namespace' , 'cs_pod_cidr' , 'cs_service_cidr' , 'cs_external_id' , 'cs_k8s_patch_version' ,
19+ 'cs_cluster_name' , 'cs_k8s_version' }
20+
21+
1122# Helper functions
1223def wait_for_pods (self , namespaces , timeout = 240 , interval = 15 , kubeconfig = None ):
1324 """
@@ -27,13 +38,20 @@ def wait_for_pods(self, namespaces, timeout=240, interval=15, kubeconfig=None):
2738 for namespace in namespaces :
2839 try :
2940 # Get pod status in the namespace
30- wait_pods_command = (
31- f"kubectl wait -n { namespace } --for=condition=Ready --timeout={ timeout } s pod --all"
41+ wait_pods_command = f"kubectl wait -n { namespace } --for=condition=Ready --timeout={ timeout } s pod --all"
42+ result = self ._run_subprocess (
43+ wait_pods_command ,
44+ f"Error fetching pods in { namespace } " ,
45+ shell = True ,
46+ capture_output = True ,
47+ text = True ,
48+ kubeconfig = kubeconfig
3249 )
33- result = self ._run_subprocess (wait_pods_command , f"Error fetching pods in { namespace } " , shell = True , capture_output = True , text = True , kubeconfig = kubeconfig )
3450
3551 if result .returncode != 0 :
36- logger .warning (f"Not all pods in namespace { namespace } are ready. Details: { result .stderr } " )
52+ logger .warning (
53+ f"Not all pods in namespace { namespace } are ready. Details: { result .stderr } "
54+ )
3755 all_pods_ready = False
3856 else :
3957 logger .info (f"All pods in namespace { namespace } are ready." )
@@ -49,69 +67,121 @@ def wait_for_pods(self, namespaces, timeout=240, interval=15, kubeconfig=None):
4967 logger .info ("Waiting for all pods in specified namespaces to become ready..." )
5068 time .sleep (interval )
5169
52- raise TimeoutError (f"Timed out after { timeout } seconds waiting for pods in namespaces { namespaces } to become ready." )
70+ raise TimeoutError (
71+ f"Timed out after { timeout } seconds waiting for pods in namespaces { namespaces } to become ready."
72+ )
5373
5474
5575def load_config (config_path ):
5676 """
5777 Loads the configuration from a YAML file.
5878 """
5979
60- with open (config_path , 'r' ) as file :
80+ with open (config_path , "r" ) as file :
6181 config = yaml .safe_load (file ) or {}
82+
83+ base_dir = os .path .dirname (config_path )
84+ if 'kubeconfig' in config :
85+ config ['kubeconfig' ] = os .path .join (base_dir , config ['kubeconfig' ])
86+ if 'workloadcluster' in config :
87+ config ['workloadcluster' ] = os .path .join (base_dir , config ['workloadcluster' ])
88+ if 'clusterstack' in config :
89+ config ['clusterstack' ] = os .path .join (base_dir , config ['clusterstack' ])
90+
6291 return config
6392
93+
94+ def setup_environment_variables (self ):
95+ """
96+ Constructs and returns a dictionary of required environment variables
97+ based on the configuration.
98+
99+ :raises ValueError: If the `GIT_ACCESS_TOKEN` environment variable is not set.
100+
101+ :return: A dictionary of required environment variables with necessary values and
102+ encodings for Kubernetes and Git-related configurations.
103+ """
104+ # Calculate values that need to be set dynamically
105+ if hasattr (self , 'cluster_version' ):
106+ self .config ['cs_k8s_version' ] = self .cluster_version
107+ self .config ['cs_namespace' ] = self .cs_namespace
108+ self .config ['cs_class_name' ] = (
109+ f"openstack-{ self .config ['cs_name' ]} -{ str (self .config ['cs_k8s_version' ]).replace ('.' , '-' )} -"
110+ f"{ self .config ['cs_version' ]} "
111+ )
112+ if hasattr (self , 'cluster_name' ):
113+ self .config ['cs_cluster_name' ] = self .cluster_name
114+
115+ # Construct general environment variables
116+ required_env = {key .upper (): value for key , value in self .config .items () if key in ENV_KEYS }
117+
118+ return required_env
119+
120+
64121class PluginClusterStacksRemoteAPI (KubernetesClusterPlugin ):
65- def __init__ (self , config_file = None ):
122+ def __init__ (self , config_file ):
66123 self .config = load_config (config_file ) if config_file else {}
67124 logger .debug (self .config )
68125 self .working_directory = os .getcwd ()
69126 logger .debug (f"Working from { self .working_directory } " )
70- self .kubeconfig_mgmnt = self .config ['kubeconfig' ]
71- self .workloadclusters = self .config ['workloadcluster' ]
72- self .cs_namespace = self .config ['namespace' ]
73-
74-
75- def create_cluster (self , cluster_name = None , version = None , kubeconfig_filepath = None ):
127+ for key , value in DEFAULTS .items ():
128+ self .config .setdefault (key , value )
129+ self .kubeconfig_mgmnt = self .config ["kubeconfig" ]
130+ self .workloadclusters = self .config ["workloadcluster" ]
131+ self .clusterstack = self .config ["clusterstack" ]
132+ self .cs_namespace = self .config ["namespace" ]
133+
134+ def create_cluster (self , cluster_name , version , kubeconfig_filepath ):
76135 self .cluster_name = cluster_name
77136 self .cluster_version = version
78137 self .kubeconfig_cs_cluster = kubeconfig_filepath
79138
80- # Create workload cluster
81- self ._apply_yaml (self .workloadclusters , "Error applying cluster .yaml" , kubeconfig = self .kubeconfig_mgmnt )
139+ # Create cluster-stack resource
140+ self ._apply_yaml (self .clusterstack , "Error applying clusterstack .yaml" , kubeconfig = self .kubeconfig_mgmnt )
82141
83- #TODO:!!! We also need to introduce a waiting function here
84-
85- print ("retrieve kubeconfig to path" )
142+ # Create workload cluster
143+ self ._apply_yaml (
144+ self .workloadclusters ,
145+ "Error applying cluster.yaml" ,
146+ kubeconfig = self .kubeconfig_mgmnt ,
147+ )
148+
149+ # Get and wait on kubeadmcontrolplane and retrieve workload cluster kubeconfig
150+ kcp_name = self ._get_kubeadm_control_plane_name (namespace = self .cs_namespace , kubeconfig = self .kubeconfig_mgmnt )
151+ self ._wait_kcp_ready (kcp_name , namespace = self .cs_namespace , kubeconfig = self .kubeconfig_mgmnt )
86152 self ._retrieve_kubeconfig (namespace = self .cs_namespace , kubeconfig = self .kubeconfig_mgmnt )
87153
88154 # Wait for workload system pods to be ready
89- # wait_for_workload_pods_ready(kubeconfig_path=self.kubeconfig_cs_cluster)
90- # ~ wait_for_pods(self, ["kube-system"], timeout=600, interval=15, kubeconfig=self.kubeconfig_cs_cluster)
91-
155+ wait_for_pods (self , ["kube-system" ], timeout = 600 , interval = 15 , kubeconfig = self .kubeconfig_cs_cluster )
92156
93- def delete_cluster (self , cluster_name = None ): #TODO:!!! need to adjust delete method
94- self .cluster_name = cluster_name
95- #Get the name of the workloadcluster from the config file
96- workload_cluster_config = load_config (self .workloadclusters )
97- workload_cluster_name = workload_cluster_config ['metadata' ]['name' ]
157+ def delete_cluster (self , cluster_name ):
98158 try :
99159 # Check if the cluster exists
100- check_cluster_command = f"kubectl --kubeconfig={ self .kubeconfig_mgmnt } get cluster { workload_cluster_name } -n { self .cs_namespace } "
101- result = self ._run_subprocess (check_cluster_command , "Failed to get cluster resource" , shell = True , capture_output = True , text = True )
160+ check_cluster_command = f"kubectl get cluster { cluster_name } -n { self .cs_namespace } "
161+ result = self ._run_subprocess (
162+ check_cluster_command ,
163+ "Failed to get cluster resource" ,
164+ shell = True ,
165+ capture_output = True ,
166+ text = True ,
167+ kubeconfig = self .kubeconfig_mgmnt
168+ )
102169
103170 # Proceed with deletion only if the cluster exists
104171 if result .returncode == 0 :
105- delete_command = f"kubectl --kubeconfig={ self .kubeconfig_mgmnt } delete cluster { workload_cluster_name } --timeout=600s -n { self .cs_namespace } "
106- self ._run_subprocess (delete_command , "Timeout while deleting the cluster" , shell = True )
172+ delete_command = f"kubectl delete cluster { cluster_name } --timeout=600s -n { self .cs_namespace } "
173+ self ._run_subprocess (
174+ delete_command , "Timeout while deleting the cluster" , shell = True , kubeconfig = self .kubeconfig_mgmnt
175+ )
107176
108177 except subprocess .CalledProcessError as error :
109178 if "NotFound" in error .stderr :
110- logger .info (f"Cluster { workload_cluster_name } not found. Skipping deletion." )
179+ logger .info (
180+ f"Cluster { cluster_name } not found. Skipping deletion."
181+ )
111182 else :
112183 raise RuntimeError (f"Error checking for cluster existence: { error } " )
113184
114-
115185 def _apply_yaml (self , yaml_file , error_msg , kubeconfig = None ):
116186 """
117187 Applies a Kubernetes YAML configuration file to the cluster, substituting environment variables as needed.
@@ -123,15 +193,63 @@ def _apply_yaml(self, yaml_file, error_msg, kubeconfig=None):
123193 try :
124194 # Determine if the file is a local path or a URL
125195 if os .path .isfile (yaml_file ):
126- command = f"kubectl --kubeconfig= { self . kubeconfig_mgmnt } apply -f { yaml_file } -n { self . cs_namespace } "
196+ command = f"/tmp/envsubst < { yaml_file } | kubectl apply -f - "
127197 else :
128198 raise ValueError (f"Unknown file: { yaml_file } " )
129199
130- self ._run_subprocess (command , error_msg , shell = True )
200+ self ._run_subprocess (command , error_msg , shell = True , kubeconfig = kubeconfig )
131201
132202 except subprocess .CalledProcessError as error :
133203 raise RuntimeError (f"{ error_msg } : { error } " )
134204
205+ def _get_kubeadm_control_plane_name (self , namespace = "default" , kubeconfig = None ):
206+ """
207+ Retrieves the name of the KubeadmControlPlane resource for the Kubernetes cluster
208+ in the specified namespace.
209+
210+ :param namespace: The namespace to search for the KubeadmControlPlane resource.
211+ :param kubeconfig: Optional path to the kubeconfig file for the target Kubernetes cluster.
212+
213+ :return: The name of the KubeadmControlPlane resource as a string.
214+ """
215+ max_retries = 6
216+ delay_between_retries = 20
217+ for _ in range (max_retries ):
218+ try :
219+ kcp_command = (
220+ f"kubectl get kubeadmcontrolplane -n { namespace } "
221+ "-o=jsonpath='{.items[0].metadata.name}'"
222+ )
223+ kcp_name = self ._run_subprocess (kcp_command , "Error retrieving kcp_name" , shell = True , capture_output = True , text = True , kubeconfig = kubeconfig )
224+ logger .info (kcp_name )
225+ kcp_name_stdout = kcp_name .stdout .strip ()
226+ if kcp_name_stdout :
227+ print (f"KubeadmControlPlane name: { kcp_name_stdout } " )
228+ return kcp_name_stdout
229+ except subprocess .CalledProcessError as error :
230+ print (f"Error getting kubeadmcontrolplane name: { error } " )
231+ # Wait before retrying
232+ time .sleep (delay_between_retries )
233+ else :
234+ raise RuntimeError ("Failed to get kubeadmcontrolplane name" )
235+
236+ def _wait_kcp_ready (self , kcp_name , namespace = "default" , kubeconfig = None ):
237+ """
238+ Waits for the specified KubeadmControlPlane resource to become 'Available'.
239+
240+ :param kcp_name: The name of the KubeadmControlPlane resource to check for availability.
241+ :param namespace: The namespace where the KubeadmControlPlane resource is.
242+ :param kubeconfig: Optional path to the kubeconfig file for the target Kubernetes cluster.
243+ """
244+ try :
245+ self ._run_subprocess (
246+ f"kubectl wait kubeadmcontrolplane/{ kcp_name } --for=condition=Available --timeout=600s -n { namespace } " ,
247+ "Error waiting for kubeadmcontrolplane availability" ,
248+ shell = True ,
249+ kubeconfig = kubeconfig
250+ )
251+ except subprocess .CalledProcessError as error :
252+ raise RuntimeError (f"Error waiting for kubeadmcontrolplane to be ready: { error } " )
135253
136254 def _retrieve_kubeconfig (self , namespace = "default" , kubeconfig = None ):
137255 """
@@ -140,26 +258,12 @@ def _retrieve_kubeconfig(self, namespace="default", kubeconfig=None):
140258 :param namespace: The namespace of the cluster to retrieve the kubeconfig for.
141259 :param kubeconfig: Optional path to the kubeconfig file for the target Kubernetes cluster.
142260 """
261+ kubeconfig_command = (
262+ f"sudo -E clusterctl get kubeconfig { self .cluster_name } -n { namespace } > { self .kubeconfig_cs_cluster } "
263+ )
264+ self ._run_subprocess (kubeconfig_command , "Error retrieving kubeconfig" , shell = True , kubeconfig = kubeconfig )
143265
144- #Get the name of the workloadcluster from the config file
145- workload_cluster_config = load_config (self .workloadclusters )
146- workload_cluster_name = workload_cluster_config ['metadata' ]['name' ]
147-
148- command_args = [
149- "kubectl " ,
150- f"--kubeconfig={ self .kubeconfig_mgmnt } " ,
151- f"-n { self .cs_namespace } " ,
152- f"get secret { workload_cluster_name } -kubeconfig" ,
153- "-o go-template='{{.data.value|base64decode}}'" ,
154- f"> { self .kubeconfig_cs_cluster } " ,
155- ]
156- kubeconfig_command = ""
157- for entry in command_args :
158- kubeconfig_command += entry + " "
159- self ._run_subprocess (kubeconfig_command , "Error retrieving kubeconfig" , shell = True )
160-
161-
162- def _run_subprocess (self , command , error_msg , shell = False , capture_output = False , text = False ):
266+ def _run_subprocess (self , command , error_msg , shell = False , capture_output = False , text = False , kubeconfig = None ):
163267 """
164268 Executes a subprocess command with the specified environment variables and parameters.
165269
@@ -168,12 +272,24 @@ def _run_subprocess(self, command, error_msg, shell=False, capture_output=False,
168272 :param shell: Whether to execute the command through the shell (default: `False`).
169273 :param capture_output: Whether to capture the command's standard output and standard error (default: `False`).
170274 :param text: Whether to treat the command's output and error as text (default: `False`).
275+ :param kubeconfig: Optional path to the kubeconfig file for the target Kubernetes cluster.
276+
171277 :return: The result of the `subprocess.run` command
172278 """
173279 try :
174- # Run the subprocess
175- result = subprocess .run (command , shell = shell , capture_output = capture_output , text = text , check = True )
280+ env = setup_environment_variables (self )
281+ env ['PATH' ] = f'/usr/local/bin:/usr/bin:{ self .working_directory } '
282+ # Set env variable DISPLAY which you need to open the oidc window automatically
283+ env ['DISPLAY' ] = ':0'
284+ env ['HOME' ] = self .working_directory
285+ if kubeconfig :
286+ env ['KUBECONFIG' ] = kubeconfig
287+
288+ # Run the subprocess with the environment
289+ result = subprocess .run (command , shell = shell , capture_output = capture_output , text = text , check = True , env = env )
290+
176291 return result
292+
177293 except subprocess .CalledProcessError as error :
178294 logger .error (f"{ error_msg } : { error } " )
179295 raise
0 commit comments