diff --git a/src/k8s-extension/HISTORY.rst b/src/k8s-extension/HISTORY.rst index 2d46c7b4699..5dfc11ceb0e 100644 --- a/src/k8s-extension/HISTORY.rst +++ b/src/k8s-extension/HISTORY.rst @@ -3,6 +3,10 @@ Release History =============== +1.7.0 ++++++++++++++++++++ +* Added the `az k8s-extension troubleshoot` command to simplify log collection and diagnostics for extensions. + 1.6.7 +++++++++++++++++++ * microsoft.azuremonitor.containers: Extend ContainerInsights Extension for high log scale mode support. diff --git a/src/k8s-extension/azext_k8s_extension/__init__.py b/src/k8s-extension/azext_k8s_extension/__init__.py index 5241483b0b4..964503477ad 100644 --- a/src/k8s-extension/azext_k8s_extension/__init__.py +++ b/src/k8s-extension/azext_k8s_extension/__init__.py @@ -5,9 +5,11 @@ from azure.cli.core import AzCommandsLoader from . import consts +from typing import Union from ._help import helps # pylint: disable=unused-import +from knack.commands import CLICommand class K8sExtensionCommandsLoader(AzCommandsLoader): @@ -20,7 +22,7 @@ def __init__(self, cli_ctx=None): super().__init__(cli_ctx=cli_ctx, custom_command_type=k8s_extension_custom) - def load_command_table(self, args): + def load_command_table(self, args: Union[list[str], None]) -> dict[str, CLICommand]: from .commands import load_command_table from azure.cli.core.aaz import load_aaz_command_table try: @@ -34,9 +36,10 @@ def load_command_table(self, args): args=args ) load_command_table(self, args) - return self.command_table + command_table: dict[str, CLICommand] = self.command_table + return command_table - def load_arguments(self, command): + def load_arguments(self, command: CLICommand): from ._params import load_arguments load_arguments(self, command) diff --git a/src/k8s-extension/azext_k8s_extension/_help.py b/src/k8s-extension/azext_k8s_extension/_help.py index b16f5d61505..9480ab9fdda 100644 --- a/src/k8s-extension/azext_k8s_extension/_help.py +++ b/src/k8s-extension/azext_k8s_extension/_help.py @@ -94,6 +94,18 @@ --config-protected-file=protected-settings-file """ +helps[f'{consts.EXTENSION_NAME} troubleshoot'] = f""" + type: command + short-summary: Perform diagnostic checks on a Kubernetes Extension. + long-summary: This command is used to troubleshoot a Kubernetes Extension. It \ +collects logs and other information that can be used to diagnose issues with the extension. + examples: + - name: Troubleshoot a Kubernetes Extension + text: |- + az {consts.EXTENSION_NAME} troubleshoot --name extension-name \ +--namespace-list "namespace1,namespace2" +""" + helps[f'{consts.EXTENSION_NAME} extension-types'] = """ type: group short-summary: Commands to discover Kubernetes Extension Types. diff --git a/src/k8s-extension/azext_k8s_extension/_params.py b/src/k8s-extension/azext_k8s_extension/_params.py index 48db687fcc8..3f394715205 100644 --- a/src/k8s-extension/azext_k8s_extension/_params.py +++ b/src/k8s-extension/azext_k8s_extension/_params.py @@ -15,8 +15,9 @@ AddConfigurationProtectedSettings, ) +from knack.commands import CLICommand -def load_arguments(self, _): +def load_arguments(self, _: CLICommand) -> None: with self.argument_context(consts.EXTENSION_NAME) as c: c.argument('location', validator=get_default_location_from_resource_group) @@ -131,3 +132,20 @@ def load_arguments(self, _): c.argument('show_latest', arg_type=get_three_state_flag(), help='Filter results by only the latest version. For example, if this flag is used the latest version of the extensionType will be shown.') + + with self.argument_context(f"{consts.EXTENSION_NAME} troubleshoot") as c: + c.argument('name', + options_list=['--name', '-n'], + help='Name of the Kubernetes extension') + c.argument('namespace_list', + options_list=['--namespace-list'], + help='Comma-separated list of namespaces to troubleshoot') + c.argument('kube_config', + options_list=['--kube-config'], + help='Path to the kube config file. If not specified, the default kube config file will be used.') + c.argument('kube_context', + options_list=['--kube-context'], + help='Kubeconfig context from current machine. If not specified, the current context from kube config file will be used.') + c.argument('skip_ssl_verification', + action="store_true", + help='Skip SSL verification for any cluster connection.') diff --git a/src/k8s-extension/azext_k8s_extension/commands.py b/src/k8s-extension/azext_k8s_extension/commands.py index 9875c6b1231..69c10cbac51 100644 --- a/src/k8s-extension/azext_k8s_extension/commands.py +++ b/src/k8s-extension/azext_k8s_extension/commands.py @@ -23,6 +23,7 @@ def load_command_table(self, _): g.custom_command('list', 'list_k8s_extension', table_transformer=k8s_extension_list_table_format) g.custom_show_command('show', 'show_k8s_extension', table_transformer=k8s_extension_show_table_format) g.custom_command('update', 'update_k8s_extension', supports_no_wait=True) + g.custom_command('troubleshoot', 'troubleshoot_extension', is_preview=True) # Subgroup - k8s-extension extension-types k8s_cluster_extension_type_sdk = CliCommandType( diff --git a/src/k8s-extension/azext_k8s_extension/consts.py b/src/k8s-extension/azext_k8s_extension/consts.py index 0de1ce60576..92d3d74ee60 100644 --- a/src/k8s-extension/azext_k8s_extension/consts.py +++ b/src/k8s-extension/azext_k8s_extension/consts.py @@ -26,3 +26,27 @@ HYBRIDCONTAINERSERVICE_API_VERSION = "2022-05-01-preview" EXTENSION_TYPE_API_VERSION = "2023-05-01-preview" + +# Fault type constants for error categorization. +# Used to classify different types of faults encountered during diagnostics. +LOAD_KUBECONFIG_FAULT_TYPE = "kubeconfig-load-error" # Error loading kubeconfig file. + +# Warning messages for diagnostic failures. +KUBECONFIG_LOAD_FAILED_WARNING = """Unable to load the kubeconfig file. +Please check +https://learn.microsoft.com/en-us/azure/azure-arc/kubernetes/diagnose-connection-issues#is-kubeconfig-pointing-to-the-right-cluster""" + +EXTRACT_HELMEXE_FAULT_TYPE = "helm-client-extract-error" # Error extracting Helm client executable. + +HELM_VERSION = "v3.12.2" + +DOWNLOAD_AND_INSTALL_KUBECTL_FAULT_TYPE = "Failed to download and install kubectl" # Error downloading/installing kubectl. + +KUBEAPI_CONNECTIVITY_FAILED_WARNING = """Unable to verify connectivity to the Kubernetes cluster. +Please check https://learn.microsoft.com/en-us/azure/azure-arc/kubernetes/diagnose-connection-issues""" + +KUBERNETES_CONNECTIVITY_FAULT_TYPE = "kubernetes-cluster-connection-error" # Error connecting to Kubernetes cluster. + +# Diagnostic log file path constant. +# Used to specify the name of the file where extension diagnostic logs are stored. +ARC_EXT_DIAGNOSTIC_LOGS = "arc_ext_diagnostic_logs" diff --git a/src/k8s-extension/azext_k8s_extension/custom.py b/src/k8s-extension/azext_k8s_extension/custom.py index c88461bb48e..f7626d06e58 100644 --- a/src/k8s-extension/azext_k8s_extension/custom.py +++ b/src/k8s-extension/azext_k8s_extension/custom.py @@ -5,6 +5,14 @@ # pylint: disable=unused-argument,too-many-locals +import contextlib +import logging +import os +import platform +import shutil +import stat +import time + from .utils import ( get_cluster_rp_api_version, is_dogfood_cluster, @@ -12,8 +20,14 @@ ) from knack.log import get_logger +from azure.cli.core import get_default_cli, telemetry + from azure.cli.core.azclierror import ( + ClientRequestError, + CLIInternalError, + FileOperationError, ResourceNotFoundError, + ManualInterrupt, MutuallyExclusiveArgumentError, RequiredArgumentMissingError, ) @@ -35,11 +49,23 @@ user_confirmation_factory, ) from . import consts +from . import utils from ._client_factory import cf_resources -logger = get_logger(__name__) +from kubernetes import client as kube_client +from kubernetes import config +from kubernetes.config.kube_config import KubeConfigMerger +from kubernetes.client.rest import ApiException +from kubernetes.client import CoreV1Api, V1NodeList +from typing import Optional, Union + +import oras.client + +from knack.commands import CLICommand + +logger = get_logger(__name__) # A factory method to return the correct extension class based off of the extension name def ExtensionFactory(extension_name): @@ -529,6 +555,326 @@ def show_extension_type_version_by_cluster( version) +def troubleshoot_extension( + cmd: CLICommand, + name: str, + namespace_list: str, + kube_config: Optional[str] = None, + kube_context: Optional[str] = None, + skip_ssl_verification: bool = False, +) -> None: + + """Troubleshoot an existing Kubernetes Extension.""" + + try: + print("Collecting diagnostics information from the namespaces provided. This operation may take a while to complete ...\n") + + namespaces = [ns.strip() for ns in namespace_list.split(',') if ns.strip()] + + # Check if namespaces list is empty and throw an exception + if not namespaces: + raise RequiredArgumentMissingError( + "No valid namespaces provided. Please provide at least one namespace." + ) + + # Setting the intial values as True + storage_space_available = True + + # Setting kube_config + kube_config = set_kube_config(kube_config) + kube_client.rest.logger.setLevel(logging.WARNING) + + # Loading the kubeconfig file in kubernetes client configuration + load_kube_config(kube_config, kube_context, skip_ssl_verification) + + # Install helm client + helm_client_location = install_helm_client(cmd) + + # Install kubectl client + kubectl_client_location = install_kubectl_client() + + # Checking the connection to kubernetes cluster. + check_kube_connection() + + # Creating timestamp folder to store all the diagnoser logs + diagnostic_logs_folder_name = utils.create_unique_folder_name(name) + + # Generate the diagnostic folder in a given location + filepath_with_timestamp, diagnostic_folder_status = ( + utils.create_folder_diagnosticlogs( + diagnostic_logs_folder_name, consts.ARC_EXT_DIAGNOSTIC_LOGS + ) + ) + + if diagnostic_folder_status is not True: + storage_space_available = False + + api_instance = kube_client.CoreV1Api() + + for namespace in namespaces: + collect_namespace_status = collect_namespace(api_instance,filepath_with_timestamp, namespace) + if collect_namespace_status is not True: + storage_space_available = False + + if storage_space_available: + print( + f"The diagnoser logs have been saved at this path: '{filepath_with_timestamp}'.\n" + "These logs can be attached while filing a " + "support ticket for further assistance.\n" + ) + else: + logger.warning( + "The diagnoser was unable to save logs to your machine. Please check whether sufficient storage is " + "available and run the troubleshoot command again." + ) + + # Handling the user manual interrupt + except KeyboardInterrupt: + raise ManualInterrupt("Process terminated externally.") + +def collect_namespace(api_instance: CoreV1Api, base_path: str, namespace: str) -> bool: + print(f"Step: {utils.get_utctimestring()}: Collecting diagnostics information for namespace '{namespace}'...") + + collection_success = True + + if not utils.check_namespace_exists(api_instance, namespace): + logger.warning(f"Namespace '{namespace}' does not exist. Skipping...") + return collection_success + + folder_namespace, folder_namespace_status = ( + utils.create_folder_diagnostics_namespace(base_path, namespace) + ) + + if folder_namespace_status is not True: + logger.error(f"Failed to create diagnostics folder for namespace '{namespace}'.") + collection_success = False + + logs_status = ( + utils.collect_namespace_configmaps(api_instance, folder_namespace, namespace) + ) + + if logs_status is not True: + collection_success = False + + pods_status = ( + utils.walk_through_pods(api_instance, folder_namespace, namespace) + ) + + if pods_status is not True: + collection_success = False + + return collection_success + +def set_kube_config(kube_config: Union[str, None]) -> Union[str, None]: + print(f"Step: {utils.get_utctimestring()}: Setting KubeConfig") + if kube_config: + # Trim kubeconfig. This is required for windows os. + if kube_config.startswith(("'", '"')): + kube_config = kube_config[1:] + if kube_config.endswith(("'", '"')): + kube_config = kube_config[:-1] + return kube_config + return None + +def load_kube_config( + kube_config: Union[str, None], kube_context: Union[str, None], skip_ssl_verification: bool +) -> None: + try: + config.load_kube_config(config_file=kube_config, context=kube_context) + if skip_ssl_verification: + from kubernetes.client import Configuration + + default_config = Configuration.get_default_copy() + default_config.verify_ssl = False + Configuration.set_default(default_config) + except Exception as e: + telemetry.set_exception( + exception=e, + fault_type=consts.LOAD_KUBECONFIG_FAULT_TYPE, + summary="Problem loading the kubeconfig file", + ) + logger.warning(consts.KUBECONFIG_LOAD_FAILED_WARNING) + raise FileOperationError("Problem loading the kubeconfig file. " + str(e)) + +def install_helm_client(cmd: CLICommand) -> str: + print( + f"Step: {utils.get_utctimestring()}: Install Helm client if it does not exist" + ) + # Return helm client path set by user + helm_client_path = os.getenv("HELM_CLIENT_PATH") + if helm_client_path: + return helm_client_path + + # Fetch system related info + operating_system = platform.system().lower() + machine_type = platform.machine() + + # Send machine telemetry + telemetry.add_extension_event( + "connectedk8s", {"Context.Default.AzureCLI.MachineType": machine_type} + ) + # Set helm binary download & install locations + if operating_system == "windows": + download_location_string = f".azure\\helm\\{consts.HELM_VERSION}" + download_file_name = f"helm-{consts.HELM_VERSION}-{operating_system}-amd64.zip" + install_location_string = ( + f".azure\\helm\\{consts.HELM_VERSION}\\{operating_system}-amd64\\helm.exe" + ) + artifactTag = f"helm-{consts.HELM_VERSION}-{operating_system}-amd64" + elif operating_system == "linux" or operating_system == "darwin": + download_location_string = f".azure/helm/{consts.HELM_VERSION}" + download_file_name = ( + f"helm-{consts.HELM_VERSION}-{operating_system}-amd64.tar.gz" + ) + install_location_string = ( + f".azure/helm/{consts.HELM_VERSION}/{operating_system}-amd64/helm" + ) + artifactTag = f"helm-{consts.HELM_VERSION}-{operating_system}-amd64" + else: + telemetry.set_exception( + exception="Unsupported OS for installing helm client", + fault_type=consts.Helm_Unsupported_OS_Fault_Type, + summary=f"{operating_system} is not supported for installing helm client", + ) + raise ClientRequestError( + f"The {operating_system} platform is not currently supported for installing helm client." + ) + + download_location = os.path.expanduser(os.path.join("~", download_location_string)) + download_dir = os.path.dirname(download_location) + install_location = os.path.expanduser(os.path.join("~", install_location_string)) + + # Download compressed Helm binary if not already present + if not os.path.isfile(install_location): + # Creating the helm folder if it doesnt exist + if not os.path.exists(download_dir): + try: + os.makedirs(download_dir) + except Exception as e: + telemetry.set_exception( + exception=e, + fault_type=consts.Create_Directory_Fault_Type, + summary="Unable to create helm directory", + ) + raise ClientRequestError("Failed to create helm directory." + str(e)) + + # Downloading compressed helm client executable + logger.warning( + "Downloading helm client for first time. This can take few minutes..." + ) + + mcr_url = utils.get_mcr_path(cmd) + + client = oras.client.OrasClient(hostname=mcr_url) + retry_count = 3 + retry_delay = 5 + for i in range(retry_count): + try: + client.pull( + target=f"{mcr_url}/{consts.HELM_MCR_URL}:{artifactTag}", + outdir=download_location, + ) + break + except Exception as e: + if i == retry_count - 1: + if "Connection reset by peer" in str(e): + telemetry.set_user_fault() + telemetry.set_exception( + exception=e, + fault_type=consts.Download_Helm_Fault_Type, + summary="Unable to download helm client.", + ) + raise CLIInternalError( + f"Failed to download helm client: {e}", + recommendation="Please check your internet connection.", + ) + time.sleep(retry_delay) + + # Extract the archive. + try: + extract_dir = download_location + download_location = os.path.expanduser( + os.path.join(download_location, download_file_name) + ) + shutil.unpack_archive(download_location, extract_dir) + os.chmod(install_location, os.stat(install_location).st_mode | stat.S_IXUSR) + except Exception as e: + telemetry.set_exception( + exception=e, + fault_type=consts.EXTRACT_HELMEXE_FAULT_TYPE, + summary="Unable to extract helm executable", + ) + reco_str = f"Please ensure that you delete the directory '{extract_dir}' before trying again." + raise ClientRequestError( + "Failed to extract helm executable." + str(e), recommendation=reco_str + ) + + return install_location + +def install_kubectl_client() -> str: + print( + f"Step: {utils.get_utctimestring()}: Install Kubectl client if it does not exist" + ) + # Return kubectl client path set by user + kubectl_client_path = os.getenv("KUBECTL_CLIENT_PATH") + if kubectl_client_path: + return kubectl_client_path + + try: + # Fetching the current directory where the cli installs the kubectl executable + home_dir = os.path.expanduser("~") + kubectl_filepath = os.path.join(home_dir, ".azure", "kubectl-client") + + with contextlib.suppress(FileExistsError): + os.makedirs(kubectl_filepath) + + # Setting path depending on the OS being used + operating_system = platform.system().lower() + kubectl = "kubectl.exe" if operating_system == "windows" else "kubectl" + kubectl_path = os.path.join(kubectl_filepath, kubectl) + + if os.path.isfile(kubectl_path): + return kubectl_path + + # Downloading kubectl executable if its not present in the machine + logger.warning( + "Downloading kubectl client for first time. This can take few minutes..." + ) + logging.disable(logging.CRITICAL) + get_default_cli().invoke( + ["aks", "install-cli", "--install-location", kubectl_path] + ) + logging.disable(logging.NOTSET) + logger.warning("\n") + # Return the path of the kubectl executable + return kubectl_path + + except Exception as e: + telemetry.set_exception( + exception=e, + fault_type=consts.DOWNLOAD_AND_INSTALL_KUBECTL_FAULT_TYPE, + summary="Failed to download and install kubectl", + ) + raise CLIInternalError(f"Unable to install kubectl. Error: {e}") + +def check_kube_connection() -> str: + print(f"Step: {utils.get_utctimestring()}: Checking Connectivity to Cluster") + api_instance = kube_client.VersionApi() + try: + api_response = api_instance.get_code() + git_version: str = api_response.git_version + return git_version + except Exception as e: # pylint: disable=broad-except + logger.warning(consts.KUBEAPI_CONNECTIVITY_FAILED_WARNING) + utils.kubernetes_exception_handler( + e, + consts.KUBERNETES_CONNECTIVITY_FAULT_TYPE, + "Unable to verify connectivity to the Kubernetes cluster", + ) + + raise CLIInternalError( + "Unable to verify connectivity to the Kubernetes cluster. No version information could be retrieved.") + def __create_identity(cmd, resource_group_name, cluster_name, cluster_type, cluster_rp): subscription_id = get_subscription_id(cmd.cli_ctx) resources = cf_resources(cmd.cli_ctx, subscription_id) diff --git a/src/k8s-extension/azext_k8s_extension/utils.py b/src/k8s-extension/azext_k8s_extension/utils.py index 66740036efa..767550ccef8 100644 --- a/src/k8s-extension/azext_k8s_extension/utils.py +++ b/src/k8s-extension/azext_k8s_extension/utils.py @@ -4,11 +4,25 @@ # -------------------------------------------------------------------------------------------- import json -from typing import Tuple +import shutil +import time +import re +import os +import contextlib + +from typing import Tuple, Union from urllib.parse import urlparse from . import consts -from azure.cli.core.azclierror import InvalidArgumentValueError, RequiredArgumentMissingError +from azure.cli.core import telemetry +from azure.cli.core.azclierror import InvalidArgumentValueError, RequiredArgumentMissingError, ValidationError + +from kubernetes.client import CoreV1Api, V1NodeList +from kubernetes.client.rest import ApiException + +from knack.log import get_logger +from knack.commands import CLICommand +logger = get_logger(__name__) def get_cluster_rp_api_version(cluster_type, cluster_rp=None) -> Tuple[str, str]: if cluster_type.lower() == consts.PROVISIONED_CLUSTER_TYPE: @@ -68,3 +82,313 @@ def is_skip_prerequisites_specified(configuration_settings): has_skip_prerequisites_set = True return has_skip_prerequisites_set + +def get_utctimestring() -> str: + return time.strftime("%Y-%m-%dT%H-%M-%SZ", time.gmtime()) + +def validate_node_api_response(api_instance: CoreV1Api) -> Union[V1NodeList, None]: + try: + node_api_response = api_instance.list_node() + return node_api_response + except Exception: + logger.debug( + "Error occurred while listing nodes on this kubernetes cluster:", + exc_info=True, + ) + return None + +def kubernetes_exception_handler( + ex: Exception, + fault_type: str, + summary: str, + error_message: str = "Error occurred while connecting to the kubernetes cluster: ", + message_for_unauthorized_request: str = "The user does not have required privileges on the " + "kubernetes cluster to deploy Azure Arc enabled Kubernetes agents. Please " + "ensure you have cluster admin privileges on the cluster to onboard.", + message_for_not_found: str = "The requested kubernetes resource was not found.", + raise_error: bool = True, +) -> None: + telemetry.set_user_fault() + if isinstance(ex, ApiException): + status_code = ex.status + if status_code == 403: + logger.error(message_for_unauthorized_request) + elif status_code == 404: + logger.error(message_for_not_found) + else: + logger.debug("Kubernetes Exception: ", exc_info=True) + if raise_error: + telemetry.set_exception( + exception=ex, fault_type=fault_type, summary=summary + ) + raise ValidationError(error_message + "\nError Response: " + str(ex.body)) + else: + if raise_error: + telemetry.set_exception( + exception=ex, fault_type=fault_type, summary=summary + ) + raise ValidationError(error_message + "\nError: " + str(ex)) + + logger.debug("Kubernetes Exception", exc_info=True) + +def create_unique_folder_name(base_name: str) -> str: + """Create a unique folder name using the base name and current timestamp. + + Args: + base_name: The base name for the folder (will be sanitized) + + Returns: + A string in the format: sanitized_base_name-YYYY-MM-DD-HH.MM.SS + + Example: + create_unique_folder_name("name1@#$") -> "name1-2025-08-05-15.59.26" + """ + + sanitized_base_name = re.sub(r'[^a-zA-Z0-9_-]', '', base_name) + timestamp = time.strftime("%Y-%m-%d-%H.%M.%S", time.localtime()) + return f"{sanitized_base_name}-{timestamp}" + +def create_folder_diagnostics_namespace(base_folder: str, namespace: str) -> tuple[str, bool]: + print( + f"Step: {get_utctimestring()}: Creating folder for namespace '{namespace}'" + ) + namespace_folder_name = os.path.join(base_folder, namespace) + try: + os.makedirs(namespace_folder_name, exist_ok=True) + except Exception as e: + logger.error(f"Failed to create diagnostics folder for namespace '{namespace}': {e}") + return "", False + + return namespace_folder_name, True + +def collect_namespace_configmaps(api_instance, namespace_folder_name: str, namespace: str) -> bool: + print( + f"Step: {get_utctimestring()}: Collecting configurations for namespace '{namespace}'" + ) + + namespace_configuration_folder_name = os.path.join(namespace_folder_name, "configuration") + try: + os.makedirs(namespace_configuration_folder_name, exist_ok=True) + except Exception as e: + logger.error(f"Failed to create configuration folder for namespace '{namespace}': {e}") + return False + + config_maps = api_instance.list_namespaced_config_map(namespace) + + for cm in config_maps.items: + cm_name = cm.metadata.name + cm_data = cm.data or {} + cm_file_path = os.path.join(namespace_configuration_folder_name, f"{cm_name}.json") + + try: + with open(cm_file_path, "w") as f: + json.dump(cm_data, f, indent=4) + except Exception as e: + logger.error(f"Failed to save ConfigMap '{cm_name}': {e}") + return False + + return True + +def walk_through_pods(api_instance: CoreV1Api, folder_namespace: str, namespace: str) -> bool: + print( + f"Step: {get_utctimestring()}: Collecting information from pods in namespace '{namespace}'" + ) + + try: + pods = api_instance.list_namespaced_pod(namespace) + except Exception as e: + logger.error(f"Failed to list pods in namespace '{namespace}': {e}") + return False + + pods_folder_name = os.path.join(folder_namespace, "pods") + try: + os.makedirs(pods_folder_name, exist_ok=True) + except Exception as e: + logger.error(f"Failed to create pods folder for namespace '{namespace}': {e}") + return False + + for pod in pods.items: + pod_name = pod.metadata.name + pod_information_status = collect_pod_information(api_instance, pods_folder_name, namespace, pod) + if not pod_information_status: + logger.error(f"Failed to collect information for pod '{pod_name}'") + return False + containers_folder_name = os.path.join(pods_folder_name, pod_name, "containers") + try: + os.makedirs(containers_folder_name, exist_ok=True) + except Exception as e: + logger.error(f"Failed to create containers folder for pod '{pod_name}': {e}") + return False + + if pod.spec is not None: + if pod.spec.init_containers is not None: + for init_container in pod.spec.init_containers: + init_container_logs_status = collect_container_logs(api_instance, containers_folder_name, namespace, pod_name, init_container) + if not init_container_logs_status: + logger.error(f"Failed to collect logs from init container '{init_container.name}' in pod '{pod_name}'") + if pod.spec.containers is not None: + for container in pod.spec.containers: + container_logs_status = collect_container_logs(api_instance, containers_folder_name, namespace, pod_name, container) + if not container_logs_status: + logger.error(f"Failed to collect logs from container '{container.name}' in pod '{pod_name}'") + return False + + return True + +def collect_container_logs(api_instance: CoreV1Api, containers_folder_name: str, namespace: str, pod_name: str, container) -> bool: + print( + f"Step: {get_utctimestring()}: Collecting logs from container '{container.name}' in pod '{pod_name}'" + ) + + container_name = container.name + + container_log = api_instance.read_namespaced_pod_log( + name=pod_name, container=container_name, namespace=namespace + ) + + container_logs_file_name = os.path.join(containers_folder_name, f"{container_name}_logs.txt") + + try: + with open(container_logs_file_name, "w") as logs_file: + logs_file.write(str(container_log)) + except Exception as e: + logger.error(f"Failed to save logs for container '{container_name}' in pod '{pod_name}': {e}") + return False + + return True + +def convert_to_pod_dict(pod) -> dict: + if pod.metadata is None or pod.status is None: + return None + pod_metadata = pod.metadata + pod_status = pod.status.phase + return { + "name": pod_metadata.name, + "namespace": pod_metadata.namespace, + "labels": pod_metadata.labels, + "annotations": pod_metadata.annotations, + "status": pod_status, + } + +def collect_pod_information(api_instance: CoreV1Api, pods_folder_name: str, namespace: str, pod) -> bool: + pod_metadata = convert_to_pod_dict(pod) + if pod_metadata is None: + logger.error(f"Failed to collect metadata for pod in namespace '{namespace}'") + return False + + pod_name = pod_metadata["name"] + print( + f"Step: {get_utctimestring()}: Collecting information for pod '{pod_name}' in namespace '{namespace}'" + ) + + pod_folder_name = os.path.join(pods_folder_name, pod_name) + try: + os.makedirs(pod_folder_name, exist_ok=True) + except Exception as e: + logger.error(f"Failed to create folder for pod '{pod_name}': {e}") + return False + + return save_pod_metadata(pod_folder_name, pod_metadata) + +def save_pod_metadata(pod_folder_name: str, pod: dict) -> bool: + metadata_file = os.path.join(pod_folder_name, "metadata.json") + + return save_as_json(metadata_file, pod) + +def save_as_json(destination: str, data) -> bool: + try: + with open(destination, "w") as f: + json.dump(data, f, indent=4) + return True + except Exception as e: + logger.error(f"Failed to save data to {destination}: {e}") + return False + +def create_folder_diagnosticlogs(folder_name: str, base_folder_name: str) -> tuple[str, bool]: + print( + f"Step: {get_utctimestring()}: Creating folder for extension Diagnostic Checks Logs" + ) + try: + # Fetching path to user directory to create the arc diagnostic folder + home_dir = os.path.expanduser("~") + filepath = os.path.join(home_dir, ".azure", base_folder_name) + # Creating Diagnostic folder and its subfolder with the given timestamp and cluster name to store all the logs + with contextlib.suppress(FileExistsError): + os.mkdir(filepath) + filepath_with_timestamp = os.path.join(filepath, folder_name) + try: + os.mkdir(filepath_with_timestamp) + except FileExistsError: + # Deleting the folder if present with the same timestamp to prevent overriding in the same folder and then + # creating it again + shutil.rmtree(filepath_with_timestamp, ignore_errors=True) + os.mkdir(filepath_with_timestamp) + + return filepath_with_timestamp, True + + # For handling storage or OS exception that may occur during the execution + except OSError as e: + if "[Errno 28]" in str(e): + if filepath_with_timestamp: + shutil.rmtree(filepath_with_timestamp, ignore_errors=False) + telemetry.set_exception( + exception=e, + fault_type=consts.No_Storage_Space_Available_Fault_Type, + summary="No space left on device", + ) + return "", False + logger.exception( + "An exception has occurred while creating the diagnostic logs folder in " + "your local machine." + ) + telemetry.set_exception( + exception=e, + fault_type=consts.Diagnostics_Folder_Creation_Failed_Fault_Type, + summary="Error while trying to create diagnostic logs folder", + ) + return "", False + + # To handle any exception that may occur during the execution + except Exception as e: + logger.exception( + "An exception has occurred while creating the diagnostic logs folder in " + "your local machine." + ) + telemetry.set_exception( + exception=e, + fault_type=consts.Diagnostics_Folder_Creation_Failed_Fault_Type, + summary="Error while trying to create diagnostic logs folder", + ) + return "", False + +def get_mcr_path(cmd: CLICommand) -> str: + active_directory_array = cmd.cli_ctx.cloud.endpoints.active_directory.split(".") + + # default for public, mc, ff clouds + mcr_postfix = active_directory_array[2] + # special cases for USSec, exclude part of suffix + if len(active_directory_array) == 4 and active_directory_array[2] == "microsoft": + mcr_postfix = active_directory_array[3] + # special case for USNat + elif len(active_directory_array) == 5: + mcr_postfix = ( + active_directory_array[2] + + "." + + active_directory_array[3] + + "." + + active_directory_array[4] + ) + + mcr_url = f"mcr.microsoft.{mcr_postfix}" + return mcr_url + +def check_namespace_exists(api_instance, namespace: str) -> bool: + print(f"Step: {get_utctimestring()}: Checking if namespace '{namespace}' exists...") + try: + api_instance.read_namespace(name=namespace) + return True + except ApiException as e: + if e.status == 404: + return False + else: + raise # Re-raise other exceptions diff --git a/src/k8s-extension/setup.py b/src/k8s-extension/setup.py index ecd79402ef7..08f8a1bac79 100644 --- a/src/k8s-extension/setup.py +++ b/src/k8s-extension/setup.py @@ -30,10 +30,12 @@ "License :: OSI Approved :: MIT License", ] -# TODO: Add any additional SDK dependencies here -DEPENDENCIES = [] +DEPENDENCIES = [ + "kubernetes==24.2.0", + "oras==0.2.25", +] -VERSION = "1.6.7" +VERSION = "1.7.0" with open("README.rst", "r", encoding="utf-8") as f: README = f.read()