diff --git a/managed/devops/opscli/setup.py b/managed/devops/opscli/setup.py index 87241054063a..a4a683450f1c 100644 --- a/managed/devops/opscli/setup.py +++ b/managed/devops/opscli/setup.py @@ -23,6 +23,7 @@ 'ybops/cloud/gcp', 'ybops/cloud/onprem', 'ybops/cloud/azure', + 'ybops/cloud/kubernetes', 'ybops/common', 'ybops/node_agent', 'ybops/utils' diff --git a/managed/devops/opscli/ybops/cloud/common/method.py b/managed/devops/opscli/ybops/cloud/common/method.py index 0541efab31d1..4757c098232d 100644 --- a/managed/devops/opscli/ybops/cloud/common/method.py +++ b/managed/devops/opscli/ybops/cloud/common/method.py @@ -365,6 +365,11 @@ def get_server_host_port(self, host_info, custom_ssh_port, default_port=False): "node_agent_ip": host_info["private_ip"], "node_agent_port": self.extra_vars["node_agent_port"] } + if connection_type == 'kubectl': + return { + "kubectl_pod": host_info["name"], + "kubectl_namespace": host_info.get("namespace", "default") + } raise YBOpsRuntimeError("Unknown connection type: {}".format(connection_type)) def get_server_ports_to_check(self, args): @@ -379,6 +384,9 @@ def get_server_ports_to_check(self, args): server_ports.append(int(args.custom_ssh_port)) elif connection_type == 'node_agent_rpc': server_ports.append(self.extra_vars["node_agent_port"]) + elif connection_type == 'kubectl': + # Kubectl doesn't use ports, return empty list + pass return list(set(server_ports)) diff --git a/managed/devops/opscli/ybops/cloud/kubernetes/__init__.py b/managed/devops/opscli/ybops/cloud/kubernetes/__init__.py new file mode 100644 index 000000000000..541335027e2a --- /dev/null +++ b/managed/devops/opscli/ybops/cloud/kubernetes/__init__.py @@ -0,0 +1 @@ +# Keep \ No newline at end of file diff --git a/managed/devops/opscli/ybops/cloud/kubernetes/cloud.py b/managed/devops/opscli/ybops/cloud/kubernetes/cloud.py new file mode 100644 index 000000000000..863542ed1558 --- /dev/null +++ b/managed/devops/opscli/ybops/cloud/kubernetes/cloud.py @@ -0,0 +1,133 @@ +#!/usr/bin/env python + +import logging +import subprocess +from ybops.cloud.common.cloud import AbstractCloud +from ybops.cloud.common.method import AbstractInstancesMethod +from ybops.cloud.kubernetes.command import KubernetesInstanceCommand, KubernetesAccessCommand +from ybops.common.exceptions import YBOpsRuntimeError + + +class KubernetesCloud(AbstractCloud): + """Subclass specific to Kubernetes cloud related functionality. + Commands are executed in target pods using kubectl exec. + """ + def __init__(self): + super(KubernetesCloud, self).__init__("kubernetes") + + def add_extra_args(self): + """Override to setup cloud-specific command line flags. + """ + super(KubernetesCloud, self).add_extra_args() + self.parser.add_argument("--namespace", required=False, default="default", + help="Kubernetes namespace") + self.parser.add_argument("--kubeconfig", required=False, + help="Path to kubeconfig file") + + def add_subcommands(self): + """Override to setup the cloud-specific instances of the subcommands. + """ + self.add_subcommand(KubernetesInstanceCommand()) + self.add_subcommand(KubernetesAccessCommand()) + + def _find_pod_by_labels(self, node_name, namespace, kubeconfig=None): + """Find the actual pod name using Kubernetes label selectors. + + Args: + node_name: Simplified node name (e.g., yb-master-0_eu-west-1a or yb-master-0) + namespace: Kubernetes namespace + kubeconfig: Path to kubeconfig file (optional) + + Returns: + Tuple of (pod_name, container_name) where container_name is 'yb-master' or 'yb-tserver' + """ + # Parse the node name to extract server type, index, and zone + # Format: yb-master-0 or yb-master-0_eu-west-1a + parts = node_name.split('_') + base_name = parts[0] # yb-master-0 or yb-tserver-0 + zone = parts[1] if len(parts) > 1 else None # eu-west-1a (optional) + + # Determine server type and extract pod index + # The server type is also the container name + if base_name.startswith('yb-master-'): + server_type = 'yb-master' + pod_index = base_name[len('yb-master-'):] + elif base_name.startswith('yb-tserver-'): + server_type = 'yb-tserver' + pod_index = base_name[len('yb-tserver-'):] + else: + raise YBOpsRuntimeError( + f"Invalid node name format: {node_name}. Expected yb-master-N or yb-tserver-N") + + # Build kubectl command with label selectors + kubectl_cmd = ["kubectl", "get", "pods"] + + if kubeconfig: + kubectl_cmd.extend(["--kubeconfig", kubeconfig]) + + kubectl_cmd.extend(["-n", namespace]) + + # Build label selector (comma-separated for AND logic) + labels = f"app.kubernetes.io/name={server_type},apps.kubernetes.io/pod-index={pod_index}" + if zone: + labels += f",yugabyte.io/zone={zone}" + + kubectl_cmd.extend(["-l", labels]) + + kubectl_cmd.extend(["-o", "jsonpath={.items[0].metadata.name}"]) + + logging.debug(f"Finding pod with command: {' '.join(kubectl_cmd)}") + + try: + result = subprocess.run( + kubectl_cmd, + capture_output=True, + text=True, + check=True + ) + + if not result.stdout.strip(): + zone_info = f", zone={zone}" if zone else "" + raise YBOpsRuntimeError( + f"No pod found for node '{node_name}' with labels " + f"component={server_type}, pod-index={pod_index}{zone_info}") + + pod_name = result.stdout.strip() + return pod_name, server_type + + except subprocess.CalledProcessError as e: + raise YBOpsRuntimeError(f"Failed to find pod for node '{node_name}': {e.stderr}") + + def get_host_info(self, args, get_all=False): + """Override to get host info for Kubernetes pods. + + For kubernetes, search_pattern is the simplified node name (e.g., yb-tserver-0_eu-west-1b). + We use label selectors to find the actual pod name. + """ + node_name = args.search_pattern if hasattr(args, 'search_pattern') else None + + if not node_name: + return None + + namespace = args.namespace if hasattr(args, 'namespace') else "default" + kubeconfig = args.kubeconfig if hasattr(args, 'kubeconfig') else None + + # Find the actual pod name and container using label selectors + actual_pod_name, container_name = self._find_pod_by_labels(node_name, namespace, kubeconfig) + + result = dict( + name=actual_pod_name, # Use actual pod name for kubectl exec + # For kubernetes, we use kubectl exec, not direct IP + public_ip=actual_pod_name, + private_ip=actual_pod_name, + region=args.region if hasattr(args, 'region') else "local", + zone=args.zone if hasattr(args, 'zone') else "local", + namespace=namespace, + container=container_name, # Include container name derived from node name + instance_type="kubernetes-pod", + server_type=AbstractInstancesMethod.YB_SERVER_TYPE + ) + + if get_all: + return [result] + return result diff --git a/managed/devops/opscli/ybops/cloud/kubernetes/command.py b/managed/devops/opscli/ybops/cloud/kubernetes/command.py new file mode 100644 index 000000000000..4e99fcbd8df5 --- /dev/null +++ b/managed/devops/opscli/ybops/cloud/kubernetes/command.py @@ -0,0 +1,48 @@ +#!/usr/bin/env python + +from ybops.cloud.common.command import InstanceCommand, AccessCommand +from ybops.cloud.common.method import ( + AccessEditVaultMethod, AccessCreateVaultMethod, AccessDeleteKeyMethod +) +from ybops.cloud.kubernetes.method import ( + KubernetesCreateInstancesMethod, KubernetesProvisionInstancesMethod, + KubernetesListInstancesMethod, KubernetesAccessAddKeyMethod, + KubernetesConfigureInstancesMethod, KubernetesInitYSQLMethod, + KubernetesCronCheckMethod, KubernetesTransferXClusterCerts, + KubernetesRunHooks, KubernetesWaitForConnection, + KubernetesManageOtelCollector +) + + +class KubernetesInstanceCommand(InstanceCommand): + """Subclass for Kubernetes specific instance command. + Most methods are reused from common, only override what's specific to kubernetes. + """ + def __init__(self): + super(KubernetesInstanceCommand, self).__init__() + + def add_methods(self): + # All Kubernetes methods use kubectl exec instead of SSH + self.add_method(KubernetesProvisionInstancesMethod(self)) + self.add_method(KubernetesCreateInstancesMethod(self)) + self.add_method(KubernetesListInstancesMethod(self)) + self.add_method(KubernetesConfigureInstancesMethod(self)) + self.add_method(KubernetesInitYSQLMethod(self)) + self.add_method(KubernetesCronCheckMethod(self)) + self.add_method(KubernetesTransferXClusterCerts(self)) + self.add_method(KubernetesRunHooks(self)) + self.add_method(KubernetesWaitForConnection(self)) + self.add_method(KubernetesManageOtelCollector(self)) + + +class KubernetesAccessCommand(AccessCommand): + """Subclass for Kubernetes specific access command. + """ + def __init__(self): + super(KubernetesAccessCommand, self).__init__() + + def add_methods(self): + self.add_method(KubernetesAccessAddKeyMethod(self)) + self.add_method(AccessCreateVaultMethod(self)) + self.add_method(AccessEditVaultMethod(self)) + self.add_method(AccessDeleteKeyMethod(self)) diff --git a/managed/devops/opscli/ybops/cloud/kubernetes/method.py b/managed/devops/opscli/ybops/cloud/kubernetes/method.py new file mode 100644 index 000000000000..3385c5a2f1b8 --- /dev/null +++ b/managed/devops/opscli/ybops/cloud/kubernetes/method.py @@ -0,0 +1,237 @@ +#!/usr/bin/env python + +from ybops.cloud.common.method import ( + AbstractAccessMethod, CreateInstancesMethod, + ProvisionInstancesMethod, ListInstancesMethod, + ConfigureInstancesMethod, InitYSQLMethod, CronCheckMethod, + TransferXClusterCerts, RunHooks, WaitForConnection, ManageOtelCollector +) +import json +import logging +from six import iteritems + + +class KubernetesCreateInstancesMethod(CreateInstancesMethod): + """Subclass for creating instances in Kubernetes. + For Kubernetes, pods already exist, so this just validates connectivity. + """ + def __init__(self, base_command): + super(KubernetesCreateInstancesMethod, self).__init__(base_command) + + def update_ansible_vars_with_args(self, args): + """Override to set connection_type to kubectl.""" + super(KubernetesCreateInstancesMethod, self).update_ansible_vars_with_args(args) + self.extra_vars['connection_type'] = 'kubectl' + + # Extract namespace from runtime_args if provided + if hasattr(args, 'runtime_args') and args.runtime_args: + try: + runtime_args = json.loads(args.runtime_args) + if 'YB_NAMESPACE' in runtime_args: + args.namespace = runtime_args['YB_NAMESPACE'] + except (json.JSONDecodeError, TypeError) as e: + logging.warning(f"Failed to parse runtime_args: {str(e)}") + + host_info = self.cloud.get_host_info(args) + if host_info: + # Clear any pre-existing kubectl_container to avoid stale values + self.extra_vars.pop('kubectl_container', None) + + self.extra_vars['kubectl_pod'] = host_info['name'] + self.extra_vars['kubectl_namespace'] = host_info.get('namespace', 'default') + if host_info.get('container'): + self.extra_vars['kubectl_container'] = host_info['container'] + if hasattr(args, 'kubeconfig') and args.kubeconfig: + self.extra_vars['kubectl_kubeconfig'] = args.kubeconfig + + def callback(self, args): + # Pods are already created in Kubernetes, just update vars and validate + self.update_ansible_vars_with_args(args) + self.wait_for_host(args) + + +class KubernetesProvisionInstancesMethod(ProvisionInstancesMethod): + """Subclass for provisioning instances in Kubernetes. + Skips pre-provisioning since pods are pre-existing. + """ + def __init__(self, base_command): + super(KubernetesProvisionInstancesMethod, self).__init__(base_command) + + def update_ansible_vars_with_args(self, args): + """Override to set connection_type to kubectl.""" + super(KubernetesProvisionInstancesMethod, self).update_ansible_vars_with_args(args) + self.extra_vars['connection_type'] = 'kubectl' + + # Extract namespace from runtime_args if provided + if hasattr(args, 'runtime_args') and args.runtime_args: + try: + runtime_args = json.loads(args.runtime_args) + if 'YB_NAMESPACE' in runtime_args: + args.namespace = runtime_args['YB_NAMESPACE'] + except (json.JSONDecodeError, TypeError) as e: + logging.warning(f"Failed to parse runtime_args: {str(e)}") + + host_info = self.cloud.get_host_info(args) + if host_info: + # Clear any pre-existing kubectl_container to avoid stale values + self.extra_vars.pop('kubectl_container', None) + + self.extra_vars['kubectl_pod'] = host_info['name'] + self.extra_vars['kubectl_namespace'] = host_info.get('namespace', 'default') + if host_info.get('container'): + self.extra_vars['kubectl_container'] = host_info['container'] + if hasattr(args, 'kubeconfig') and args.kubeconfig: + self.extra_vars['kubectl_kubeconfig'] = args.kubeconfig + + def callback(self, args): + # For kubernetes, pods are pre-existing (managed by operator/helm) + args.skip_preprovision = True + super(KubernetesProvisionInstancesMethod, self).callback(args) + + +class KubernetesListInstancesMethod(ListInstancesMethod): + """Subclass for listing instances in Kubernetes. + """ + def __init__(self, base_command): + super(KubernetesListInstancesMethod, self).__init__(base_command) + + def add_extra_args(self): + super(KubernetesListInstancesMethod, self).add_extra_args() + self.parser.add_argument("--namespace", default="default", + help="Kubernetes namespace (default: default)") + + def callback(self, args): + logging.debug(f"Listing Kubernetes instances with args: {args}") + + host_info = self.cloud.get_host_info(args, get_all=args.as_json) + if not host_info: + return None + + if args.as_json: + print(json.dumps(host_info)) + else: + # Single host info dict + if isinstance(host_info, dict): + print('\n'.join([f"{k}={v}" for k, v in iteritems(host_info)])) + # List of host info dicts + else: + for info in host_info: + print('\n'.join([f"{k}={v}" for k, v in iteritems(info)])) + + +class KubernetesAccessAddKeyMethod(AbstractAccessMethod): + """Subclass for adding SSH keys in Kubernetes. + For Kubernetes with kubectl exec, SSH keys are less relevant but we provide basic support. + """ + def __init__(self, base_command): + super(KubernetesAccessAddKeyMethod, self).__init__(base_command, "add-key") + + def callback(self, args): + logging.info("Adding SSH key for Kubernetes pod access") + # For kubernetes, this is typically not needed as we use kubectl exec + # But we implement it for compatibility + print(json.dumps({"status": "success", "message": "Key management via kubectl"})) + + +def _setup_kubectl_connection(method_instance, args): + """Helper function to set up kubectl connection parameters in extra_vars. + This should be called by all Kubernetes methods that need remote execution. + """ + # Extract namespace from runtime_args if provided + if hasattr(args, 'runtime_args') and args.runtime_args: + try: + runtime_args = json.loads(args.runtime_args) + if 'YB_NAMESPACE' in runtime_args: + args.namespace = runtime_args['YB_NAMESPACE'] + except (json.JSONDecodeError, TypeError) as e: + logging.warning(f"Failed to parse runtime_args: {str(e)}") + + host_info = method_instance.cloud.get_host_info(args) + if host_info: + # Clear any pre-existing kubectl_container to avoid stale values + method_instance.extra_vars.pop('kubectl_container', None) + + method_instance.extra_vars['connection_type'] = 'kubectl' + method_instance.extra_vars['kubectl_pod'] = host_info['name'] + method_instance.extra_vars['kubectl_namespace'] = host_info.get('namespace', 'default') + if host_info.get('container'): + method_instance.extra_vars['kubectl_container'] = host_info['container'] + if hasattr(args, 'kubeconfig') and args.kubeconfig: + method_instance.extra_vars['kubectl_kubeconfig'] = args.kubeconfig + + +class KubernetesRunHooks(RunHooks): + """Kubernetes-specific RunHooks that uses kubectl exec instead of SSH.""" + def __init__(self, base_command): + super(KubernetesRunHooks, self).__init__(base_command) + + def update_ansible_vars_with_args(self, args): + """Override to set connection_type to kubectl.""" + super(KubernetesRunHooks, self).update_ansible_vars_with_args(args) + _setup_kubectl_connection(self, args) + + +class KubernetesConfigureInstancesMethod(ConfigureInstancesMethod): + """Kubernetes-specific ConfigureInstances that uses kubectl exec.""" + def __init__(self, base_command): + super(KubernetesConfigureInstancesMethod, self).__init__(base_command) + + def update_ansible_vars_with_args(self, args): + """Override to set connection_type to kubectl.""" + super(KubernetesConfigureInstancesMethod, self).update_ansible_vars_with_args(args) + _setup_kubectl_connection(self, args) + + +class KubernetesInitYSQLMethod(InitYSQLMethod): + """Kubernetes-specific InitYSQL that uses kubectl exec.""" + def __init__(self, base_command): + super(KubernetesInitYSQLMethod, self).__init__(base_command) + + def update_ansible_vars_with_args(self, args): + """Override to set connection_type to kubectl.""" + super(KubernetesInitYSQLMethod, self).update_ansible_vars_with_args(args) + _setup_kubectl_connection(self, args) + + +class KubernetesCronCheckMethod(CronCheckMethod): + """Kubernetes-specific CronCheck that uses kubectl exec.""" + def __init__(self, base_command): + super(KubernetesCronCheckMethod, self).__init__(base_command) + + def update_ansible_vars_with_args(self, args): + """Override to set connection_type to kubectl.""" + super(KubernetesCronCheckMethod, self).update_ansible_vars_with_args(args) + _setup_kubectl_connection(self, args) + + +class KubernetesTransferXClusterCerts(TransferXClusterCerts): + """Kubernetes-specific TransferXClusterCerts that uses kubectl exec.""" + def __init__(self, base_command): + super(KubernetesTransferXClusterCerts, self).__init__(base_command) + + def update_ansible_vars_with_args(self, args): + """Override to set connection_type to kubectl.""" + super(KubernetesTransferXClusterCerts, self).update_ansible_vars_with_args(args) + _setup_kubectl_connection(self, args) + + +class KubernetesWaitForConnection(WaitForConnection): + """Kubernetes-specific WaitForConnection that uses kubectl exec.""" + def __init__(self, base_command): + super(KubernetesWaitForConnection, self).__init__(base_command) + + def update_ansible_vars_with_args(self, args): + """Override to set connection_type to kubectl.""" + super(KubernetesWaitForConnection, self).update_ansible_vars_with_args(args) + _setup_kubectl_connection(self, args) + + +class KubernetesManageOtelCollector(ManageOtelCollector): + """Kubernetes-specific ManageOtelCollector that uses kubectl exec.""" + def __init__(self, base_command): + super(KubernetesManageOtelCollector, self).__init__(base_command) + + def update_ansible_vars_with_args(self, args): + """Override to set connection_type to kubectl.""" + super(KubernetesManageOtelCollector, self).update_ansible_vars_with_args(args) + _setup_kubectl_connection(self, args) diff --git a/managed/devops/opscli/ybops/cloud/ybcloud.py b/managed/devops/opscli/ybops/cloud/ybcloud.py index eae2eac82425..ded11d739fe3 100644 --- a/managed/devops/opscli/ybops/cloud/ybcloud.py +++ b/managed/devops/opscli/ybops/cloud/ybcloud.py @@ -17,6 +17,7 @@ from ybops.cloud.gcp.cloud import GcpCloud from ybops.cloud.onprem.cloud import OnPremCloud from ybops.cloud.azure.cloud import AzureCloud +from ybops.cloud.kubernetes.cloud import KubernetesCloud from ybops.cloud.common.base import AbstractCommandParser from ybops.utils import init_env, init_logging from ybops.common.exceptions import YBOpsExitCodeException @@ -38,6 +39,7 @@ def add_subcommands(self): self.add_subcommand(GcpCloud()) self.add_subcommand(OnPremCloud()) self.add_subcommand(AzureCloud()) + self.add_subcommand(KubernetesCloud()) def add_extra_args(self): """Setting up the top level flags for the entire program. diff --git a/managed/devops/opscli/ybops/utils/kubectl_shell.py b/managed/devops/opscli/ybops/utils/kubectl_shell.py new file mode 100644 index 000000000000..b9e99f4f17e2 --- /dev/null +++ b/managed/devops/opscli/ybops/utils/kubectl_shell.py @@ -0,0 +1,211 @@ +#!/usr/bin/env python + +import logging +import subprocess + +from ybops.common.exceptions import YBOpsRecoverableError, YBOpsRuntimeError + + +class KubectlClient(object): + """KubectlClient class is used to run commands in Kubernetes pods using kubectl exec. + """ + + def __init__(self, pod_name, namespace="default", kubeconfig=None, container=None): + """ + Initialize kubectl client. + + Args: + pod_name: Name of the pod to execute commands in + namespace: Kubernetes namespace (default: "default") + kubeconfig: Path to kubeconfig file (optional) + container: Container name within the pod (optional) + """ + self.pod_name = pod_name + self.namespace = namespace + self.kubeconfig = kubeconfig + self.container = container + + def _build_kubectl_cmd(self, command): + """Build the base kubectl exec command.""" + kubectl_cmd = ["kubectl"] + + if self.kubeconfig: + kubectl_cmd.extend(["--kubeconfig", self.kubeconfig]) + + kubectl_cmd.extend([ + "exec", + "-n", self.namespace, + self.pod_name + ]) + + if self.container: + kubectl_cmd.extend(["-c", self.container]) + + kubectl_cmd.append("--") + + # Add the actual command + if isinstance(command, list): + kubectl_cmd.extend(command) + else: + kubectl_cmd.extend(["/bin/bash", "-c", command]) + + return kubectl_cmd + + def exec_command(self, command, output_only=False, **kwargs): + """ + Execute a command in the pod using kubectl exec. + + Args: + command: Command to execute (string or list) + output_only: If True, return only stdout. If False, return (rc, stdout, stderr) + **kwargs: Additional options (ignored for compatibility) + + Returns: + If output_only=True: stdout string + If output_only=False: tuple of (return_code, stdout, stderr) + """ + kubectl_cmd = self._build_kubectl_cmd(command) + logging.debug(f"Executing kubectl command: {' '.join(kubectl_cmd)}") + + try: + result = subprocess.run( + kubectl_cmd, + capture_output=True, + text=True, + timeout=kwargs.get('timeout', None) + ) + + if output_only: + if result.returncode != 0: + raise YBOpsRecoverableError( + f"Command '{command}' failed with return code {result.returncode} and error: {result.stderr}") + return result.stdout + else: + # Log the output for YBA to capture + if result.stdout: + logging.info(result.stdout) + if result.stderr: + logging.warning(result.stderr) + + # Return stdout/stderr as lists of lines (compatible with SSH implementation) + stdout_lines = result.stdout.splitlines(keepends=True) if result.stdout else [] + stderr_lines = result.stderr.splitlines(keepends=True) if result.stderr else [] + return result.returncode, stdout_lines, stderr_lines + + except subprocess.TimeoutExpired: + raise YBOpsRecoverableError(f"Command timed out: {command}") + except Exception as e: + raise YBOpsRecoverableError(f"Failed to execute command: {str(e)}") + + def exec_script(self, local_script_name, params): + """ + Execute a local script on the remote pod. + + Args: + local_script_name: Path to local script file + params: Parameters to pass to the script (string or list) + + Returns: + stdout of the script execution + """ + if not isinstance(params, str): + params = ' '.join(params) + + with open(local_script_name, "r") as f: + local_script = f.read() + + # Use heredoc syntax to pass script content + command = f"/bin/bash -s {params} <<'EOF'\n{local_script}\nEOF" + + return self.exec_command(command, output_only=True) + + def upload_file_to_remote_server(self, local_path, remote_path, **kwargs): + """ + Upload a file to the pod using kubectl cp. + + Args: + local_path: Path to local file + remote_path: Destination path in the pod + **kwargs: Additional options (chmod, etc.) + """ + kubectl_cmd = ["kubectl"] + + if self.kubeconfig: + kubectl_cmd.extend(["--kubeconfig", self.kubeconfig]) + + kubectl_cmd.extend(["cp", "-n", self.namespace]) + + if self.container: + kubectl_cmd.extend(["-c", self.container]) + + kubectl_cmd.extend([local_path, f"{self.pod_name}:{remote_path}"]) + + logging.debug(f"Copying file with command: {' '.join(kubectl_cmd)}") + + try: + result = subprocess.run( + kubectl_cmd, + capture_output=True, + text=True, + check=True + ) + + if result.stderr: + logging.warning(f"kubectl cp stderr: {result.stderr}") + + logging.info(f"kubectl cp completed with rc={result.returncode}") + + # Apply chmod if specified + if kwargs.get('chmod'): + # Mask out file type bits, keep permission bits, and ensure execution bit is set + permissions = (kwargs.get('chmod') & 0o7777) | 0o0111 + chmod_cmd = f"chmod {permissions:o} {remote_path}" + logging.debug(f"Applying chmod: {chmod_cmd}") + self.exec_command(chmod_cmd, output_only=True) + + except subprocess.CalledProcessError as e: + logging.error(f"kubectl cp failed: stdout={e.stdout}, stderr={e.stderr}, rc={e.returncode}") + raise YBOpsRuntimeError(f"Failed to copy file {local_path} to pod: {e.stderr}") + except Exception as e: + logging.error(f"Unexpected error during kubectl cp: {str(e)}") + raise + + def download_file_from_remote_server(self, remote_file_name, local_file_name, **kwargs): + """ + Download a file from the pod using kubectl cp. + + Args: + remote_file_name: Path to file in the pod + local_file_name: Destination path on local filesystem + **kwargs: Additional options + """ + kubectl_cmd = ["kubectl"] + + if self.kubeconfig: + kubectl_cmd.extend(["--kubeconfig", self.kubeconfig]) + + kubectl_cmd.extend(["cp", "-n", self.namespace]) + + if self.container: + kubectl_cmd.extend(["-c", self.container]) + + kubectl_cmd.extend([f"{self.pod_name}:{remote_file_name}", local_file_name]) + + logging.debug(f"Downloading file with command: {' '.join(kubectl_cmd)}") + + try: + subprocess.run( + kubectl_cmd, + capture_output=True, + text=True, + check=True + ) + + logging.info(f"Successfully downloaded {self.pod_name}:{remote_file_name} to {local_file_name}") + + except subprocess.CalledProcessError as e: + raise YBOpsRuntimeError(f"Failed to download file {remote_file_name} from pod: {e.stderr}") + + def close_connection(self): + """Close connection (no-op for kubectl, but kept for compatibility).""" + pass diff --git a/managed/devops/opscli/ybops/utils/remote_shell.py b/managed/devops/opscli/ybops/utils/remote_shell.py index 6678548c8e36..f7187ca6162e 100644 --- a/managed/devops/opscli/ybops/utils/remote_shell.py +++ b/managed/devops/opscli/ybops/utils/remote_shell.py @@ -16,6 +16,7 @@ from ybops.common.exceptions import YBOpsRecoverableError, YBOpsRuntimeError from ybops.utils.ssh import SSHClient from ybops.node_agent.rpc import RpcClient +from ybops.utils.kubectl_shell import KubectlClient CONNECTION_ATTEMPTS = 5 CONNECTION_ATTEMPT_DELAY_SEC = 3 @@ -116,6 +117,10 @@ def get_host_port_user(connect_options): connect_options['host'] = connect_options['node_agent_ip'] connect_options['port'] = connect_options['node_agent_port'] connect_options['user'] = connect_options['node_agent_user'] + elif connection_type == 'kubectl': + connect_options['host'] = connect_options['kubectl_pod'] + connect_options['port'] = None # Not applicable for kubectl + connect_options['user'] = connect_options.get('kubectl_user', 'yugabyte') else: raise YBOpsRuntimeError("Unknown connection_type '{}'".format(connection_type)) return connect_options @@ -150,6 +155,12 @@ class RemoteShell(object): node_agent_port - Node agent port. node_agent_cert_path - Path to node agent cert. node_agent_auth_token - JWT to authenticate the client. + For Kubectl: + connection_type - set to kubectl to enable Kubernetes pod exec. + kubectl_pod - Pod name. + kubectl_namespace - Kubernetes namespace (default: "default"). + kubectl_kubeconfig - Path to kubeconfig file (optional). + kubectl_container - Container name within pod (optional). """ @@ -159,6 +170,8 @@ def __init__(self, connect_options): self.delegate = _SshRemoteShell(connect_options) elif connection_type == 'node_agent_rpc': self.delegate = _RpcRemoteShell(connect_options) + elif connection_type == 'kubectl': + self.delegate = _KubectlRemoteShell(connect_options) else: raise YBOpsRuntimeError("Unknown connection_type '{}'".format(connection_type)) @@ -367,3 +380,79 @@ def invoke_method(self, param, **kwargs): result.stderr) ) return result.obj + + +class _KubectlRemoteShell(object): + """_KubectlRemoteShell class is used to run remote shell commands in Kubernetes pods + using kubectl exec. + """ + + def __init__(self, connect_options): + assert connect_options.get("kubectl_pod") is not None, 'kubectl_pod is required' + + self.kubectl_conn = KubectlClient( + pod_name=connect_options.get("kubectl_pod"), + namespace=connect_options.get("kubectl_namespace", "default"), + kubeconfig=connect_options.get("kubectl_kubeconfig"), + container=connect_options.get("kubectl_container") + ) + self.connect_options = connect_options + self.connected = True + + def get_host_port_user(self): + return get_host_port_user(self.connect_options) + + def close(self): + if self.connected: + self.kubectl_conn.close_connection() + + def run_command_raw(self, command, **kwargs): + result = RemoteShellOutput() + try: + kwargs.setdefault('output_only', True) + output = self.kubectl_conn.exec_command(command, **kwargs) + result.stdout = output + result.exited = 0 + except Exception as e: + result.stderr = str(e) + result.exited = 1 + + return result + + def run_command(self, command, **kwargs): + result = self.run_command_raw(command, **kwargs) + + if result.exited: + cmd = ' '.join(command).encode('utf-8') if isinstance(command, list) else command + raise YBOpsRecoverableError( + f"Remote shell command '{cmd}' failed with " + f"return code '{result.exited}' and error '{result.stderr}'" + ) + return result + + def exec_command(self, command, **kwargs): + output_only = kwargs.get('output_only', False) + if output_only: + result = self.run_command(command, **kwargs) + return result.stdout + else: + # This returns rc, stdout, stderr. + return self.kubectl_conn.exec_command(command, **kwargs) + + def exec_script(self, local_script_name, params): + return self.kubectl_conn.exec_script(local_script_name, params) + + def put_file(self, local_path, remote_path, **kwargs): + self.kubectl_conn.upload_file_to_remote_server(local_path, remote_path, **kwargs) + + # Checks if the file exists on the remote, and if not, it puts it there. + def put_file_if_not_exists(self, local_path, remote_path, file_name, **kwargs): + result = self.run_command('ls ' + remote_path, **kwargs) + if file_name not in result.stdout: + self.put_file(local_path, os.path.join(remote_path, file_name), **kwargs) + + def fetch_file(self, remote_file_name, local_file_name, **kwargs): + self.kubectl_conn.download_file_from_remote_server(remote_file_name, local_file_name, **kwargs) + + def invoke_method(self, input, **kwargs): + raise NotImplementedError("kubectl does not support method invocation") diff --git a/managed/devops/vars/cloud/kubernetes.yml b/managed/devops/vars/cloud/kubernetes.yml new file mode 100644 index 000000000000..e69de29bb2d1