diff --git a/src/kube_galaxy/cmd/status.py b/src/kube_galaxy/cmd/status.py index 9964a08..6441f2a 100644 --- a/src/kube_galaxy/cmd/status.py +++ b/src/kube_galaxy/cmd/status.py @@ -1,11 +1,21 @@ """Status command handler.""" import shutil +from collections.abc import Callable import typer +from kube_galaxy.pkg.utils.client import ( + get_cluster_info, + get_context, + get_nodes, + get_pods, + wait_for_nodes, + wait_for_pods, +) +from kube_galaxy.pkg.utils.errors import ClusterError from kube_galaxy.pkg.utils.logging import error, info, print_dict, section, success, warning -from kube_galaxy.pkg.utils.shell import ShellError, run +from kube_galaxy.pkg.utils.shell import run def status(wait: bool = False, timeout: int = 300) -> None: @@ -25,7 +35,6 @@ def _print_dependency_status() -> None: info("Dependencies:") deps = { "kubectl": _check_command("kubectl"), - "kubeadm": _check_command("kubeadm"), "spread": _check_command("spread"), } print_dict(deps) @@ -39,22 +48,17 @@ def _print_cluster_context() -> None: info("") try: - result = run(["kubectl", "config", "current-context"], capture_output=True, check=False) - context = result.stdout.strip() if result.returncode == 0 else "none" + context = get_context() info(f"Active Cluster: {context}") - except Exception: - info("Active Cluster: error checking") - - try: - result = run(["kubectl", "get", "nodes"], capture_output=True, check=False) - if result.returncode == 0 and result.stdout: - lines = result.stdout.strip().split("\n") + nodes_output = get_nodes() + if nodes_output: + lines = nodes_output.strip().split("\n") info(f"Cluster Nodes: {len(lines) - 1}") for line in lines[1:]: if line: info(f" {line}") - except Exception: - pass + except ClusterError: + info("Active Cluster: error checking") def _verify_cluster_health(timeout: int) -> None: @@ -63,52 +67,31 @@ def _verify_cluster_health(timeout: int) -> None: error("kubectl is required for --wait health checks", show_traceback=False) raise typer.Exit(code=1) - timeout_arg = f"--timeout={timeout}s" section("Cluster Health Verification") info("Waiting for nodes to be Ready...") try: - run( - ["kubectl", "wait", "--for=condition=Ready", "node", "--all", timeout_arg], - capture_output=True, - ) - run( - [ - "kubectl", - "wait", - "--for=condition=Ready", - "pod", - "--all", - "-n", - "kube-system", - timeout_arg, - ], - capture_output=True, - ) - except ShellError as exc: - if exc.stderr.strip(): - error(exc.stderr.strip(), show_traceback=False) + wait_for_nodes(timeout=timeout) + wait_for_pods(namespace="kube-system", timeout=timeout) + except ClusterError as exc: + error(str(exc), show_traceback=False) error("Cluster readiness checks failed", show_traceback=False) raise typer.Exit(code=1) from exc - _print_command_output(["kubectl", "cluster-info"], "Cluster Info") - _print_command_output(["kubectl", "get", "nodes", "-o", "wide"], "Nodes") - _print_command_output(["kubectl", "get", "pods", "-A", "-o", "wide"], "Pods") + _print_command_output(get_cluster_info, "Cluster Info") + _print_command_output(get_nodes, "Nodes") + _print_command_output(get_pods, "Pods") -def _print_command_output(command: list[str], title: str) -> None: +def _print_command_output(command: Callable[[], str], title: str) -> None: """Run command and print its output with a section label.""" info("") info(f"{title}:") try: - result = run(command, capture_output=True) - output = result.stdout.strip() - if output: + if output := command().strip(): info(output) - except ShellError as exc: - if exc.stderr.strip(): - error(exc.stderr.strip(), show_traceback=False) - error(f"Failed to run: {' '.join(command)}", show_traceback=False) + except ClusterError as exc: + error(f"Failed to run: {title}", show_traceback=False) raise typer.Exit(code=1) from exc @@ -122,12 +105,6 @@ def _check_command(cmd: str) -> str: capture_output=True, check=False, ) - elif cmd == "kubeadm": - result = run( - [cmd, "version"], - capture_output=True, - check=False, - ) else: result = run( [cmd, "--version"], diff --git a/src/kube_galaxy/cmd/test.py b/src/kube_galaxy/cmd/test.py index 03c0a9b..26524c4 100644 --- a/src/kube_galaxy/cmd/test.py +++ b/src/kube_galaxy/cmd/test.py @@ -1,6 +1,5 @@ """Test command handler.""" -import subprocess from pathlib import Path import typer @@ -9,6 +8,8 @@ from kube_galaxy.pkg.manifest.loader import load_manifest from kube_galaxy.pkg.manifest.validator import validate_manifest from kube_galaxy.pkg.testing.spread import collect_test_results, run_spread_tests +from kube_galaxy.pkg.utils.client import get_context, verify_connectivity +from kube_galaxy.pkg.utils.errors import ClusterError from kube_galaxy.pkg.utils.logging import error, exception, info, section, success, warning @@ -22,25 +23,10 @@ def spread(manifest_path: str) -> None: try: # Check if kubectl can connect - result = subprocess.run( - ["kubectl", "cluster-info"], - capture_output=True, - text=True, - check=False, - ) - if result.returncode != 0: - error("No Kubernetes cluster available. Please set up a cluster first.") - info("You can create a test cluster with: kube-galaxy setup") - raise typer.Exit(code=1) + verify_connectivity() # Get cluster context - result = subprocess.run( - ["kubectl", "config", "current-context"], - capture_output=True, - text=True, - check=True, - ) - cluster_context = result.stdout.strip() + cluster_context = get_context() success(f"Connected to cluster: {cluster_context}") # Run spread tests from manifest @@ -57,7 +43,7 @@ def spread(manifest_path: str) -> None: success("Spread tests completed") - except Exception as e: + except ClusterError as e: exception("Spread tests failed", e) raise typer.Exit(code=1) from e diff --git a/src/kube_galaxy/pkg/components/_base.py b/src/kube_galaxy/pkg/components/_base.py index 15fd5a1..7bc8164 100644 --- a/src/kube_galaxy/pkg/components/_base.py +++ b/src/kube_galaxy/pkg/components/_base.py @@ -18,13 +18,14 @@ from kube_galaxy.pkg.arch.detector import ArchInfo from kube_galaxy.pkg.literals import Commands, Permissions, SystemPaths, Timeouts from kube_galaxy.pkg.manifest.models import ComponentConfig, InstallMethod, Manifest +from kube_galaxy.pkg.utils.client import apply_manifest from kube_galaxy.pkg.utils.components import ( download_file, extract_archive, format_component_pattern, install_binary, ) -from kube_galaxy.pkg.utils.errors import ComponentError +from kube_galaxy.pkg.utils.errors import ClusterError, ComponentError from kube_galaxy.pkg.utils.logging import info from kube_galaxy.pkg.utils.shell import run @@ -243,8 +244,10 @@ def bootstrap_hook(self) -> None: raise ComponentError( f"{comp_name} manifest not downloaded. Run download hook first." ) - run(["kubectl", "apply", "-f", str(self.manifest_path)], check=True) - info(f"Applied manifest for {comp_name}") + try: + apply_manifest(self.manifest_path) + except ClusterError as e: + raise ComponentError(f"Failed to apply manifest for {comp_name}") from e pass diff --git a/src/kube_galaxy/pkg/components/kubeadm.py b/src/kube_galaxy/pkg/components/kubeadm.py index 331e121..366d476 100644 --- a/src/kube_galaxy/pkg/components/kubeadm.py +++ b/src/kube_galaxy/pkg/components/kubeadm.py @@ -14,6 +14,11 @@ from kube_galaxy.pkg.components import ClusterComponentBase, register_component from kube_galaxy.pkg.literals import Commands, SystemPaths, URLs +from kube_galaxy.pkg.utils.client import ( + get_api_server_status, + verify_connectivity, + wait_for_nodes, +) from kube_galaxy.pkg.utils.errors import ComponentError from kube_galaxy.pkg.utils.logging import info from kube_galaxy.pkg.utils.shell import run @@ -184,26 +189,9 @@ def verify_hook(self) -> None: Checks cluster connectivity and waits for nodes/pods to be ready. """ - - # Check cluster info - run(["kubectl", "cluster-info"], check=True) - - # Wait for nodes to be ready - run( - ["kubectl", "wait", "--for=condition=Ready", "nodes", "--all", "--timeout=300s"], - check=True, - ) - - # Wait for api-server to be ready - run( - [ - "kubectl", - "get", - "--raw=/readyz", - "--request-timeout=300s", - ], - check=True, - ) + verify_connectivity() + wait_for_nodes(timeout=300) + get_api_server_status(timeout=300) def stop_hook(self) -> None: """ diff --git a/src/kube_galaxy/pkg/testing/spread.py b/src/kube_galaxy/pkg/testing/spread.py index e969560..aef1f3f 100644 --- a/src/kube_galaxy/pkg/testing/spread.py +++ b/src/kube_galaxy/pkg/testing/spread.py @@ -18,6 +18,7 @@ task_path_for_component, validate_component_test_structure, ) +from kube_galaxy.pkg.utils.client import create_namespace, delete_namespace, verify_connectivity from kube_galaxy.pkg.utils.errors import ClusterError from kube_galaxy.pkg.utils.logging import error, info, section, success, warning from kube_galaxy.pkg.utils.shell import ShellError, run @@ -111,9 +112,7 @@ def run_spread_tests( def _verify_test_prerequisites() -> None: """Verify kubectl and spread are available.""" try: - info("Verifying cluster connectivity...") - run(["kubectl", "cluster-info"], check=True, capture_output=True) - success("Connected to Kubernetes cluster") + verify_connectivity() # Check for spread info("Verifying spread test framework...") @@ -147,54 +146,12 @@ def _create_test_namespace(component_name: str) -> str: # Normalize component name for namespace (lowercase, hyphens only) namespace = f"kube-galaxy-test-{component_name.lower().replace('_', '-')}" - try: - info(f" Creating test namespace: {namespace}") - - # Apply with labels - run(["kubectl", "create", "namespace", namespace], check=True) - - # Label namespace - label = "app.kubernetes.io/managed-by=kube-galaxy" - run( - ["kubectl", "label", "namespace", namespace, label, f"component={component_name}"], - check=True, - ) - - success(f"Namespace created: {namespace}") - return namespace - - except ShellError as exc: - raise ClusterError(f"Failed to create namespace {namespace}: {exc}") from exc - - -def _cleanup_test_namespace(namespace: str, timeout: int = 60) -> None: - """ - Delete test namespace and wait for termination. - - Args: - namespace: Namespace to delete - timeout: Maximum seconds to wait for deletion - - Raises: - ClusterError: If namespace deletion fails - """ - try: - info(f" Cleaning up namespace: {namespace}") - - # Delete namespace - run( - ["kubectl", "delete", "namespace", namespace, "--timeout", f"{timeout}s"], - check=True, - ) - - success(f"Namespace deleted: {namespace}") - - except ShellError as exc: - # Don't fail if namespace doesn't exist - if "not found" in str(exc): - warning(f" Namespace {namespace} not found (may already be deleted)") - else: - raise ClusterError(f"Failed to delete namespace {namespace}: {exc}") from exc + labels = { + "app.kubernetes.io/managed-by": "kube-galaxy", + "component": component_name, + } + create_namespace(namespace, labels) + return namespace def _generate_orchestration_spread_yaml( @@ -350,11 +307,7 @@ def _run_component_tests( raise ClusterError("Component validation failed") # Generate orchestration spread.yaml - try: - component_suites = _generate_orchestration_spread_yaml(spread_components, kubeconfig) - except ClusterError as exc: - error(f"Failed to generate orchestration spread.yaml: {exc}") - raise + component_suites = _generate_orchestration_spread_yaml(spread_components, kubeconfig) # Track test results test_results = [] @@ -412,7 +365,7 @@ def _run_component_tests( # Step 4: Cleanup namespace (always executed) if namespace: try: - _cleanup_test_namespace(namespace) + delete_namespace(namespace) except Exception as cleanup_exc: warning(f" Namespace cleanup failed: {cleanup_exc}") diff --git a/src/kube_galaxy/pkg/utils/client.py b/src/kube_galaxy/pkg/utils/client.py new file mode 100644 index 0000000..90ffea2 --- /dev/null +++ b/src/kube_galaxy/pkg/utils/client.py @@ -0,0 +1,388 @@ +"""Kubernetes client operations wrapper.""" + +import json +import shutil +from pathlib import Path +from typing import Any + +from kube_galaxy.pkg.utils.errors import ClusterError +from kube_galaxy.pkg.utils.logging import info, success, warning +from kube_galaxy.pkg.utils.shell import ShellError, run + + +def verify_connectivity() -> None: + """ + Verify kubectl connectivity to Kubernetes cluster. + + Raises: + ClusterError: If kubectl is not available or cannot connect to cluster + """ + if not shutil.which("kubectl"): + raise ClusterError("kubectl not found in PATH") + + try: + info("Verifying cluster connectivity...") + run(["kubectl", "cluster-info"], check=True, capture_output=True) + success("Connected to Kubernetes cluster") + except ShellError as exc: + raise ClusterError(f"Failed to connect to cluster: {exc}") from exc + + +def get_context() -> str: + """ + Get the current Kubernetes context. + + Returns: + Current context name + + Raises: + ClusterError: If context cannot be determined + """ + try: + result = run( + ["kubectl", "config", "current-context"], check=True, capture_output=True, text=True + ) + return result.stdout.strip() + except ShellError as exc: + raise ClusterError(f"Failed to get current context: {exc}") from exc + + +def wait_for_nodes(timeout: int = 300, condition: str = "Ready") -> None: + """ + Wait for all nodes to reach a specified condition. + + Args: + timeout: Maximum seconds to wait + condition: Node condition to wait for (default: Ready) + + Raises: + ClusterError: If nodes do not reach condition within timeout + """ + try: + info(f"Waiting for nodes to be {condition}...") + run( + [ + "kubectl", + "wait", + f"--for=condition={condition}", + "nodes", + "--all", + f"--timeout={timeout}s", + ], + check=True, + capture_output=True, + ) + success(f"All nodes are {condition}") + except ShellError as exc: + raise ClusterError(f"Nodes failed to reach {condition} condition: {exc}") from exc + + +def wait_for_pods( + namespace: str = "kube-system", timeout: int = 300, condition: str = "Ready" +) -> None: + """ + Wait for pods in a namespace to reach a specified condition. + + Args: + namespace: Kubernetes namespace to monitor (default: kube-system) + timeout: Maximum seconds to wait + condition: Pod condition to wait for (default: Ready) + + Raises: + ClusterError: If pods do not reach condition within timeout + """ + try: + info(f"Waiting for pods in {namespace} to be {condition}...") + run( + [ + "kubectl", + "wait", + f"--for=condition={condition}", + "pod", + "--all", + "-n", + namespace, + f"--timeout={timeout}s", + ], + check=True, + capture_output=True, + ) + success(f"Pods in {namespace} are {condition}") + except ShellError as exc: + raise ClusterError(f"Pods in {namespace} failed to reach {condition}: {exc}") from exc + + +def get_api_server_status(timeout: int = 300) -> None: + """ + Check API server readiness via /readyz endpoint. + + Args: + timeout: Maximum seconds to wait + + Raises: + ClusterError: If API server is not ready + """ + try: + info("Checking API server readiness...") + run( + [ + "kubectl", + "get", + "--raw=/readyz", + f"--request-timeout={timeout}s", + ], + check=True, + capture_output=True, + ) + success("API server is ready") + except ShellError as exc: + raise ClusterError(f"API server not ready: {exc}") from exc + + +def get_cluster_info() -> str: + """ + Get cluster information. + + Returns: + Cluster info as string + + Raises: + ClusterError: If cluster info cannot be retrieved + """ + try: + result = run(["kubectl", "cluster-info"], check=True, capture_output=True, text=True) + return result.stdout + except ShellError as exc: + raise ClusterError(f"Failed to retrieve cluster info: {exc}") from exc + + +def get_nodes(wide: bool = False) -> str: + """ + Get nodes information. + + Args: + wide: Return wide output (includes internal IP, kernel version, etc.) + + Returns: + Node information as string + + Raises: + ClusterError: If node info cannot be retrieved + """ + try: + cmd = ["kubectl", "get", "nodes"] + if wide: + cmd.append("-o") + cmd.append("wide") + result = run(cmd, check=True, capture_output=True, text=True) + return result.stdout + except ShellError as exc: + raise ClusterError(f"Failed to retrieve nodes: {exc}") from exc + + +def get_pods(namespace: str = "", wide: bool = False, output_format: str = "") -> str: + """ + Get pods information. + + Args: + namespace: Kubernetes namespace (empty = all namespaces) + wide: Return wide output + output_format: Output format (json, yaml, etc.) + + Returns: + Pod information as string + + Raises: + ClusterError: If pod info cannot be retrieved + """ + try: + cmd = ["kubectl", "get", "pods"] + + if not namespace: + cmd.append("-A") + else: + cmd.extend(["-n", namespace]) + + if wide: + cmd.extend(["-o", "wide"]) + elif output_format: + cmd.extend(["-o", output_format]) + + result = run(cmd, check=True, capture_output=True, text=True) + return result.stdout + except ShellError as exc: + raise ClusterError(f"Failed to retrieve pods: {exc}") from exc + + +def get_pod_data_json(namespace: str = "") -> list[dict[str, Any]]: + """ + Get pods information as JSON for structured parsing. + + Args: + namespace: Kubernetes namespace (empty = all namespaces) + + Returns: + List of pod dictionaries + + Raises: + ClusterError: If pod data cannot be retrieved + """ + try: + cmd = ["kubectl", "get", "pods"] + if not namespace: + cmd.append("-A") + else: + cmd.extend(["-n", namespace]) + cmd.extend(["-o", "json"]) + + result = run(cmd, check=True, capture_output=True, text=True) + data = json.loads(result.stdout) + items: list[dict[str, Any]] = data.get("items", []) + return items + except (ShellError, json.JSONDecodeError) as exc: + raise ClusterError(f"Failed to retrieve pods data: {exc}") from exc + + +def describe_nodes() -> str: + """ + Get detailed node descriptions. + + Returns: + Node descriptions as string + + Raises: + ClusterError: If descriptions cannot be retrieved + """ + try: + result = run(["kubectl", "describe", "nodes"], check=True, capture_output=True, text=True) + return result.stdout + except ShellError as exc: + raise ClusterError(f"Failed to describe nodes: {exc}") from exc + + +def get_events(namespace: str = "", all_namespaces: bool = True) -> str: + """ + Get Kubernetes events. + + Args: + namespace: Specific namespace (ignored if all_namespaces=True) + all_namespaces: Get events from all namespaces (default: True) + + Returns: + Events as string + + Raises: + ClusterError: If events cannot be retrieved + """ + try: + cmd = ["kubectl", "get", "events"] + if all_namespaces: + cmd.append("-A") + elif namespace: + cmd.extend(["-n", namespace]) + + result = run(cmd, check=True, capture_output=True, text=True) + return result.stdout + except ShellError as exc: + raise ClusterError(f"Failed to retrieve events: {exc}") from exc + + +def get_pod_logs(namespace: str, pod_name: str, tail: int = 100) -> str: + """ + Get logs from a specific pod. + + Args: + namespace: Kubernetes namespace + pod_name: Pod name + tail: Number of lines to retrieve from end of logs + + Returns: + Pod logs as string. Returns empty string if pod has no logs. + """ + result = run( + ["kubectl", "logs", "-n", namespace, pod_name, f"--tail={tail}"], + check=False, + capture_output=True, + text=True, + ) + # Non-zero exit is OK if pod has no logs; return empty + return result.stdout if result.returncode == 0 else "" + + +def create_namespace(name: str, labels: dict[str, str] | None = None) -> None: + """ + Create a Kubernetes namespace with optional labels. + + Args: + name: Namespace name + labels: Optional dict of labels to apply + + Raises: + ClusterError: If namespace creation fails + """ + try: + info(f"Creating namespace: {name}") + run(["kubectl", "create", "namespace", name], check=True, capture_output=True) + + if labels: + label_strs = [f"{k}={v}" for k, v in labels.items()] + run( + ["kubectl", "label", "namespace", name, *label_strs], + check=True, + capture_output=True, + ) + success(f"Namespace created with labels: {name}") + else: + success(f"Namespace created: {name}") + + except ShellError as exc: + raise ClusterError(f"Failed to create namespace {name}: {exc}") from exc + + +def delete_namespace(name: str, timeout: int = 60) -> None: + """ + Delete a Kubernetes namespace with timeout. + + Args: + name: Namespace name + timeout: Maximum seconds to wait for deletion + + Raises: + ClusterError: If namespace deletion fails (actual error, not not-found) + """ + try: + info(f"Deleting namespace: {name}") + run( + ["kubectl", "delete", "namespace", name, "--timeout", f"{timeout}s"], + check=True, + capture_output=True, + ) + success(f"Namespace deleted: {name}") + except ShellError as exc: + # Don't fail if namespace doesn't exist + if "not found" in str(exc).lower(): + warning(f"Namespace {name} not found (may already be deleted)") + else: + raise ClusterError(f"Failed to delete namespace {name}: {exc}") from exc + + +def apply_manifest(manifest_path: Path | str) -> None: + """ + Apply a Kubernetes manifest file. + + Args: + manifest_path: Path to manifest file + + Raises: + ClusterError: If manifest application fails + """ + manifest_path = Path(manifest_path) + if not manifest_path.exists(): + raise ClusterError(f"Manifest not found: {manifest_path}") + + try: + info(f"Applying manifest: {manifest_path.name}") + run(["kubectl", "apply", "-f", str(manifest_path)], check=True, capture_output=True) + success(f"Manifest applied: {manifest_path.name}") + except ShellError as exc: + raise ClusterError(f"Failed to apply manifest {manifest_path}: {exc}") from exc diff --git a/src/kube_galaxy/pkg/utils/logs.py b/src/kube_galaxy/pkg/utils/logs.py index da5e081..64bc71f 100644 --- a/src/kube_galaxy/pkg/utils/logs.py +++ b/src/kube_galaxy/pkg/utils/logs.py @@ -1,12 +1,19 @@ """Kubernetes log collection and debugging utilities.""" -import json from datetime import datetime from pathlib import Path +from kube_galaxy.pkg.utils.client import ( + describe_nodes, + get_cluster_info, + get_events, + get_nodes, + get_pod_data_json, + get_pod_logs, + get_pods, +) from kube_galaxy.pkg.utils.errors import ClusterError from kube_galaxy.pkg.utils.logging import info, section, success, warning -from kube_galaxy.pkg.utils.shell import ShellError, run def collect_kubernetes_logs(output_dir: str = "debug-logs") -> str: @@ -56,10 +63,10 @@ def _collect_cluster_info(output_path: Path) -> None: info("Collecting cluster information...") try: - result = run(["kubectl", "cluster-info"], capture_output=True, text=True, check=True) - (output_path / "cluster-info.txt").write_text(result.stdout) + cluster_info = get_cluster_info() + (output_path / "cluster-info.txt").write_text(cluster_info) success(" Cluster info saved") - except ShellError as exc: + except ClusterError as exc: warning(f" Failed to collect cluster info: {exc}") @@ -69,25 +76,15 @@ def _collect_node_info(output_path: Path) -> None: try: # Get node descriptions - result = run( - ["kubectl", "describe", "nodes"], - capture_output=True, - text=True, - check=True, - ) - (output_path / "nodes-describe.txt").write_text(result.stdout) + node_descriptions = describe_nodes() + (output_path / "nodes-describe.txt").write_text(node_descriptions) # Get node status - result = run( - ["kubectl", "get", "nodes", "-o", "wide"], - capture_output=True, - text=True, - check=True, - ) - (output_path / "nodes-status.txt").write_text(result.stdout) + node_status = get_nodes(wide=True) + (output_path / "nodes-status.txt").write_text(node_status) success(" Node info saved") - except ShellError as exc: + except ClusterError as exc: warning(f" Failed to collect node info: {exc}") @@ -100,40 +97,25 @@ def _collect_pod_logs(output_path: Path) -> None: try: # Get all pods - result = run( - ["kubectl", "get", "pods", "-A", "-o", "json"], - capture_output=True, - text=True, - check=True, - ) - - pods_data = json.loads(result.stdout) + pods_data = get_pod_data_json() pod_count = 0 - for pod_item in pods_data.get("items", []): + for pod_item in pods_data: namespace = pod_item["metadata"]["namespace"] pod_name = pod_item["metadata"]["name"] - try: - # Get pod logs - log_result = run( - ["kubectl", "logs", "-n", namespace, pod_name, "--tail=100"], - capture_output=True, - text=True, - ) + # Get pod logs + log_content = get_pod_logs(namespace, pod_name, tail=100) - log_dir = pods_dir / namespace / pod_name - log_dir.mkdir(parents=True, exist_ok=True) - (log_dir / "logs.txt").write_text(log_result.stdout) + log_dir = pods_dir / namespace / pod_name + log_dir.mkdir(parents=True, exist_ok=True) + (log_dir / "logs.txt").write_text(log_content) - pod_count += 1 - except ShellError: - # Pod might not have logs, skip - pass + pod_count += 1 success(f" Pod logs saved ({pod_count} pods)") - except ShellError as exc: + except ClusterError as exc: warning(f" Failed to collect pod logs: {exc}") @@ -142,15 +124,10 @@ def _collect_events(output_path: Path) -> None: info("Collecting events...") try: - result = run( - ["kubectl", "get", "events", "-A"], - capture_output=True, - text=True, - check=True, - ) - (output_path / "events.txt").write_text(result.stdout) + events = get_events(all_namespaces=True) + (output_path / "events.txt").write_text(events) success(" Events saved") - except ShellError as exc: + except ClusterError as exc: warning(f" Failed to collect events: {exc}") @@ -160,25 +137,13 @@ def _collect_system_logs(output_path: Path) -> None: try: # Get kube-system namespace pods - result = run( - ["kubectl", "get", "pods", "-n", "kube-system", "-o", "wide"], - capture_output=True, - text=True, - check=True, - ) - (output_path / "kube-system-pods.txt").write_text(result.stdout) - - # Get kube-system namespace events - result = run( - ["kubectl", "get", "events", "-n", "kube-system"], - capture_output=True, - text=True, - check=True, - ) - (output_path / "kube-system-events.txt").write_text(result.stdout) + pods_str = get_pods(namespace="kube-system", wide=True) + (output_path / "kube-system-pods.txt").write_text(pods_str) + # Events are now collected from all namespaces in _collect_events() + # so we don't need a separate kube-system events call success(" System logs saved") - except ShellError as exc: + except ClusterError as exc: warning(f" Failed to collect system logs: {exc}") diff --git a/tests/unit/components/test_container_manifest.py b/tests/unit/components/test_container_manifest.py index bc671bb..f4f6e95 100644 --- a/tests/unit/components/test_container_manifest.py +++ b/tests/unit/components/test_container_manifest.py @@ -168,21 +168,19 @@ def test_bootstrap_hook_applies_manifest(component, monkeypatch, tmp_path): manifest_path.write_text("apiVersion: v1\nkind: ConfigMap\n") component.manifest_path = manifest_path - run_calls = [] + apply_manifest_calls = [] - def fake_run(cmd, **kwargs): - run_calls.append((list(cmd), kwargs)) + def fake_apply_manifest(path): + apply_manifest_calls.append(path) - monkeypatch.setattr("kube_galaxy.pkg.components._base.run", fake_run) + monkeypatch.setattr("kube_galaxy.pkg.components._base.apply_manifest", fake_apply_manifest) # Call bootstrap hook component.bootstrap_hook() - # Verify kubectl apply was called - assert len(run_calls) == 1 - cmd, kwargs = run_calls[0] - assert cmd == ["kubectl", "apply", "-f", str(manifest_path)] - assert kwargs.get("check") is True + # Verify apply_manifest was called with the correct path + assert len(apply_manifest_calls) == 1 + assert apply_manifest_calls[0] == manifest_path def test_bootstrap_hook_fails_if_manifest_not_downloaded(component): diff --git a/tests/unit/test_client.py b/tests/unit/test_client.py new file mode 100644 index 0000000..4dad53c --- /dev/null +++ b/tests/unit/test_client.py @@ -0,0 +1,715 @@ +"""Unit tests for Kubernetes client operations wrapper.""" + +import json +from unittest.mock import MagicMock + +import pytest + +from kube_galaxy.pkg.utils.client import ( + apply_manifest, + create_namespace, + delete_namespace, + describe_nodes, + get_api_server_status, + get_cluster_info, + get_context, + get_events, + get_nodes, + get_pod_data_json, + get_pod_logs, + get_pods, + verify_connectivity, + wait_for_nodes, + wait_for_pods, +) +from kube_galaxy.pkg.utils.errors import ClusterError +from kube_galaxy.pkg.utils.shell import ShellError + + +class TestVerifyConnectivity: + """Tests for verify_connectivity().""" + + def test_verify_connectivity_success(self, monkeypatch): + """Test successful cluster connectivity verification.""" + mock_which = MagicMock(return_value=True) + mock_run = MagicMock() + monkeypatch.setattr("kube_galaxy.pkg.utils.client.shutil.which", mock_which) + monkeypatch.setattr("kube_galaxy.pkg.utils.client.run", mock_run) + + verify_connectivity() + + mock_which.assert_called_once_with("kubectl") + mock_run.assert_called_once_with( + ["kubectl", "cluster-info"], check=True, capture_output=True + ) + + def test_verify_connectivity_kubectl_not_found(self, monkeypatch): + """Test error when kubectl is not available.""" + mock_which = MagicMock(return_value=None) + monkeypatch.setattr("kube_galaxy.pkg.utils.client.shutil.which", mock_which) + + with pytest.raises(ClusterError, match="kubectl not found in PATH"): + verify_connectivity() + + def test_verify_connectivity_cluster_error(self, monkeypatch): + """Test error when cluster connection fails.""" + mock_which = MagicMock(return_value=True) + mock_run = MagicMock( + side_effect=ShellError(["kubectl", "cluster-info"], 1, "Connection refused") + ) + monkeypatch.setattr("kube_galaxy.pkg.utils.client.shutil.which", mock_which) + monkeypatch.setattr("kube_galaxy.pkg.utils.client.run", mock_run) + + with pytest.raises(ClusterError, match="Failed to connect to cluster"): + verify_connectivity() + + +class TestGetContext: + """Tests for get_context().""" + + def test_get_context_success(self, monkeypatch): + """Test successful retrieval of current context.""" + result = MagicMock() + result.stdout = "docker-desktop\n" + mock_run = MagicMock(return_value=result) + monkeypatch.setattr("kube_galaxy.pkg.utils.client.run", mock_run) + + context = get_context() + + assert context == "docker-desktop" + mock_run.assert_called_once_with( + ["kubectl", "config", "current-context"], check=True, capture_output=True, text=True + ) + + def test_get_context_error(self, monkeypatch): + """Test error when context cannot be determined.""" + mock_run = MagicMock( + side_effect=ShellError(["kubectl", "config", "current-context"], 1, "Config error") + ) + monkeypatch.setattr("kube_galaxy.pkg.utils.client.run", mock_run) + + with pytest.raises(ClusterError, match="Failed to get current context"): + get_context() + + +class TestWaitForNodes: + """Tests for wait_for_nodes().""" + + def test_wait_for_nodes_success(self, monkeypatch): + """Test successful node readiness wait.""" + mock_run = MagicMock() + monkeypatch.setattr("kube_galaxy.pkg.utils.client.run", mock_run) + + wait_for_nodes(timeout=300, condition="Ready") + + mock_run.assert_called_once_with( + [ + "kubectl", + "wait", + "--for=condition=Ready", + "nodes", + "--all", + "--timeout=300s", + ], + check=True, + capture_output=True, + ) + + def test_wait_for_nodes_custom_condition(self, monkeypatch): + """Test wait with custom condition.""" + mock_run = MagicMock() + monkeypatch.setattr("kube_galaxy.pkg.utils.client.run", mock_run) + + wait_for_nodes(timeout=120, condition="Scheduled") + + mock_run.assert_called_once() + args, _kwargs = mock_run.call_args + assert "--for=condition=Scheduled" in args[0] + assert "--timeout=120s" in args[0] + + def test_wait_for_nodes_timeout_error(self, monkeypatch): + """Test timeout error when nodes don't reach condition.""" + mock_run = MagicMock( + side_effect=ShellError( + ["kubectl", "wait", "--for=condition=Ready", "nodes", "--all", "--timeout=300s"], + 124, + "Timeout", + ) + ) + monkeypatch.setattr("kube_galaxy.pkg.utils.client.run", mock_run) + + with pytest.raises(ClusterError, match="Nodes failed to reach Ready condition"): + wait_for_nodes() + + +class TestWaitForPods: + """Tests for wait_for_pods().""" + + def test_wait_for_pods_success(self, monkeypatch): + """Test successful pod readiness wait.""" + mock_run = MagicMock() + monkeypatch.setattr("kube_galaxy.pkg.utils.client.run", mock_run) + + wait_for_pods(namespace="kube-system", timeout=300, condition="Ready") + + mock_run.assert_called_once_with( + [ + "kubectl", + "wait", + "--for=condition=Ready", + "pod", + "--all", + "-n", + "kube-system", + "--timeout=300s", + ], + check=True, + capture_output=True, + ) + + def test_wait_for_pods_custom_namespace(self, monkeypatch): + """Test wait for pods in custom namespace.""" + mock_run = MagicMock() + monkeypatch.setattr("kube_galaxy.pkg.utils.client.run", mock_run) + + wait_for_pods(namespace="default", timeout=60) + + args, _kwargs = mock_run.call_args + assert "-n" in args[0] + assert "default" in args[0] + + def test_wait_for_pods_error(self, monkeypatch): + """Test error when pods don't reach condition.""" + mock_run = MagicMock( + side_effect=ShellError( + [ + "kubectl", + "wait", + "--for=condition=Ready", + "pod", + "--all", + "-n", + "kube-system", + "--timeout=300s", + ], + 1, + "Pod failed", + ) + ) + monkeypatch.setattr("kube_galaxy.pkg.utils.client.run", mock_run) + + with pytest.raises(ClusterError, match="Pods in kube-system failed to reach Ready"): + wait_for_pods() + + +class TestGetApiServerStatus: + """Tests for get_api_server_status().""" + + def test_get_api_server_status_success(self, monkeypatch): + """Test successful API server readiness check.""" + mock_run = MagicMock() + monkeypatch.setattr("kube_galaxy.pkg.utils.client.run", mock_run) + + get_api_server_status(timeout=300) + + mock_run.assert_called_once_with( + [ + "kubectl", + "get", + "--raw=/readyz", + "--request-timeout=300s", + ], + check=True, + capture_output=True, + ) + + def test_get_api_server_status_custom_timeout(self, monkeypatch): + """Test API server check with custom timeout.""" + mock_run = MagicMock() + monkeypatch.setattr("kube_galaxy.pkg.utils.client.run", mock_run) + + get_api_server_status(timeout=120) + + args, _kwargs = mock_run.call_args + assert "--request-timeout=120s" in args[0] + + def test_get_api_server_status_error(self, monkeypatch): + """Test error when API server is not ready.""" + mock_run = MagicMock( + side_effect=ShellError( + ["kubectl", "get", "--raw=/readyz", "--request-timeout=300s"], 1, "Not ready" + ) + ) + monkeypatch.setattr("kube_galaxy.pkg.utils.client.run", mock_run) + + with pytest.raises(ClusterError, match="API server not ready"): + get_api_server_status() + + +class TestGetClusterInfo: + """Tests for get_cluster_info().""" + + def test_get_cluster_info_success(self, monkeypatch): + """Test successful cluster info retrieval.""" + result = MagicMock() + result.stdout = "Kubernetes control plane is running at https://localhost:6443" + mock_run = MagicMock(return_value=result) + monkeypatch.setattr("kube_galaxy.pkg.utils.client.run", mock_run) + + info = get_cluster_info() + + assert "Kubernetes control plane" in info + mock_run.assert_called_once_with( + ["kubectl", "cluster-info"], check=True, capture_output=True, text=True + ) + + def test_get_cluster_info_error(self, monkeypatch): + """Test error when cluster info cannot be retrieved.""" + mock_run = MagicMock( + side_effect=ShellError(["kubectl", "cluster-info"], 1, "Connection failed") + ) + monkeypatch.setattr("kube_galaxy.pkg.utils.client.run", mock_run) + + with pytest.raises(ClusterError, match="Failed to retrieve cluster info"): + get_cluster_info() + + +class TestGetNodes: + """Tests for get_nodes().""" + + def test_get_nodes_success(self, monkeypatch): + """Test successful nodes retrieval.""" + result = MagicMock() + result.stdout = ( + "NAME STATUS ROLES AGE\nnode1 Ready control-plane 5d" + ) + mock_run = MagicMock(return_value=result) + monkeypatch.setattr("kube_galaxy.pkg.utils.client.run", mock_run) + + nodes = get_nodes() + + assert "node1" in nodes + mock_run.assert_called_once_with( + ["kubectl", "get", "nodes"], check=True, capture_output=True, text=True + ) + + def test_get_nodes_with_wide_output(self, monkeypatch): + """Test nodes retrieval with wide output.""" + result = MagicMock() + result.stdout = "NAME STATUS ROLES AGE INTERNAL-IP" + mock_run = MagicMock(return_value=result) + monkeypatch.setattr("kube_galaxy.pkg.utils.client.run", mock_run) + + get_nodes(wide=True) + + args, _kwargs = mock_run.call_args + assert "-o" in args[0] + assert "wide" in args[0] + + def test_get_nodes_error(self, monkeypatch): + """Test error when nodes cannot be retrieved.""" + mock_run = MagicMock(side_effect=ShellError(["kubectl", "get", "nodes"], 1, "No nodes")) + monkeypatch.setattr("kube_galaxy.pkg.utils.client.run", mock_run) + + with pytest.raises(ClusterError, match="Failed to retrieve nodes"): + get_nodes() + + +class TestGetPods: + """Tests for get_pods().""" + + def test_get_pods_all_namespaces(self, monkeypatch): + """Test pods retrieval from all namespaces.""" + result = MagicMock() + result.stdout = "NAMESPACE NAME READY STATUS" + mock_run = MagicMock(return_value=result) + monkeypatch.setattr("kube_galaxy.pkg.utils.client.run", mock_run) + + pods = get_pods() + + assert "NAMESPACE" in pods + args, _kwargs = mock_run.call_args + assert "-A" in args[0] + + def test_get_pods_specific_namespace(self, monkeypatch): + """Test pods retrieval from specific namespace.""" + result = MagicMock() + result.stdout = "NAME READY STATUS" + mock_run = MagicMock(return_value=result) + monkeypatch.setattr("kube_galaxy.pkg.utils.client.run", mock_run) + + get_pods(namespace="default") + + args, _kwargs = mock_run.call_args + assert "-n" in args[0] + assert "default" in args[0] + + def test_get_pods_with_wide_output(self, monkeypatch): + """Test pods retrieval with wide output.""" + result = MagicMock() + result.stdout = "NAME READY STATUS IP" + mock_run = MagicMock(return_value=result) + monkeypatch.setattr("kube_galaxy.pkg.utils.client.run", mock_run) + + get_pods(wide=True) + + args, _kwargs = mock_run.call_args + assert "-o" in args[0] + assert "wide" in args[0] + + def test_get_pods_with_format(self, monkeypatch): + """Test pods retrieval with specific output format.""" + result = MagicMock() + result.stdout = "[]" + mock_run = MagicMock(return_value=result) + monkeypatch.setattr("kube_galaxy.pkg.utils.client.run", mock_run) + + get_pods(output_format="json") + + args, _kwargs = mock_run.call_args + assert "-o" in args[0] + assert "json" in args[0] + + def test_get_pods_error(self, monkeypatch): + """Test error when pods cannot be retrieved.""" + mock_run = MagicMock(side_effect=ShellError(["kubectl", "get", "pods", "-A"], 1, "No pods")) + monkeypatch.setattr("kube_galaxy.pkg.utils.client.run", mock_run) + + with pytest.raises(ClusterError, match="Failed to retrieve pods"): + get_pods() + + +class TestGetPodDataJson: + """Tests for get_pod_data_json().""" + + def test_get_pod_data_json_success(self, monkeypatch): + """Test successful JSON pod data retrieval.""" + result = MagicMock() + pod_data = { + "items": [ + {"metadata": {"name": "pod1", "namespace": "default"}}, + {"metadata": {"name": "pod2", "namespace": "default"}}, + ] + } + result.stdout = json.dumps(pod_data) + mock_run = MagicMock(return_value=result) + monkeypatch.setattr("kube_galaxy.pkg.utils.client.run", mock_run) + + pods = get_pod_data_json() + + assert len(pods) == 2 + assert pods[0]["metadata"]["name"] == "pod1" + + def test_get_pod_data_json_empty(self, monkeypatch): + """Test JSON pod data with empty results.""" + result = MagicMock() + result.stdout = '{"items": []}' + mock_run = MagicMock(return_value=result) + monkeypatch.setattr("kube_galaxy.pkg.utils.client.run", mock_run) + + pods = get_pod_data_json() + + assert pods == [] + + def test_get_pod_data_json_specific_namespace(self, monkeypatch): + """Test JSON pod data from specific namespace.""" + result = MagicMock() + result.stdout = '{"items": []}' + mock_run = MagicMock(return_value=result) + monkeypatch.setattr("kube_galaxy.pkg.utils.client.run", mock_run) + + get_pod_data_json(namespace="kube-system") + + args, _kwargs = mock_run.call_args + assert "-n" in args[0] + assert "kube-system" in args[0] + + def test_get_pod_data_json_shell_error(self, monkeypatch): + """Test error when pod data cannot be retrieved.""" + mock_run = MagicMock( + side_effect=ShellError( + ["kubectl", "get", "pods", "-A", "-o", "json"], 1, "Connection failed" + ) + ) + monkeypatch.setattr("kube_galaxy.pkg.utils.client.run", mock_run) + + with pytest.raises(ClusterError, match="Failed to retrieve pods data"): + get_pod_data_json() + + def test_get_pod_data_json_parse_error(self, monkeypatch): + """Test error when JSON cannot be parsed.""" + result = MagicMock() + result.stdout = "invalid json" + mock_run = MagicMock(return_value=result) + monkeypatch.setattr("kube_galaxy.pkg.utils.client.run", mock_run) + + with pytest.raises(ClusterError, match="Failed to retrieve pods data"): + get_pod_data_json() + + +class TestDescribeNodes: + """Tests for describe_nodes().""" + + def test_describe_nodes_success(self, monkeypatch): + """Test successful node description retrieval.""" + result = MagicMock() + result.stdout = "Name: node1\nStatus: Ready" + mock_run = MagicMock(return_value=result) + monkeypatch.setattr("kube_galaxy.pkg.utils.client.run", mock_run) + + desc = describe_nodes() + + assert "node1" in desc + mock_run.assert_called_once_with( + ["kubectl", "describe", "nodes"], check=True, capture_output=True, text=True + ) + + def test_describe_nodes_error(self, monkeypatch): + """Test error when node descriptions cannot be retrieved.""" + mock_run = MagicMock( + side_effect=ShellError(["kubectl", "describe", "nodes"], 1, "No nodes") + ) + monkeypatch.setattr("kube_galaxy.pkg.utils.client.run", mock_run) + + with pytest.raises(ClusterError, match="Failed to describe nodes"): + describe_nodes() + + +class TestGetEvents: + """Tests for get_events().""" + + def test_get_events_all_namespaces(self, monkeypatch): + """Test events retrieval from all namespaces.""" + result = MagicMock() + result.stdout = "NAMESPACE NAME REASON" + mock_run = MagicMock(return_value=result) + monkeypatch.setattr("kube_galaxy.pkg.utils.client.run", mock_run) + + events = get_events() + + assert "NAMESPACE" in events + args, _kwargs = mock_run.call_args + assert "-A" in args[0] + + def test_get_events_specific_namespace(self, monkeypatch): + """Test events retrieval from specific namespace.""" + result = MagicMock() + result.stdout = "NAME REASON" + mock_run = MagicMock(return_value=result) + monkeypatch.setattr("kube_galaxy.pkg.utils.client.run", mock_run) + + get_events(namespace="default", all_namespaces=False) + + args, _kwargs = mock_run.call_args + assert "-n" in args[0] + assert "default" in args[0] + + def test_get_events_error(self, monkeypatch): + """Test error when events cannot be retrieved.""" + mock_run = MagicMock( + side_effect=ShellError(["kubectl", "get", "events", "-A"], 1, "No events") + ) + monkeypatch.setattr("kube_galaxy.pkg.utils.client.run", mock_run) + + with pytest.raises(ClusterError, match="Failed to retrieve events"): + get_events() + + +class TestGetPodLogs: + """Tests for get_pod_logs().""" + + def test_get_pod_logs_success(self, monkeypatch): + """Test successful pod logs retrieval.""" + result = MagicMock() + result.returncode = 0 + result.stdout = "Container started\nListening on port 8080" + mock_run = MagicMock(return_value=result) + monkeypatch.setattr("kube_galaxy.pkg.utils.client.run", mock_run) + + logs = get_pod_logs("default", "my-pod") + + assert "Container started" in logs + mock_run.assert_called_once_with( + ["kubectl", "logs", "-n", "default", "my-pod", "--tail=100"], + check=False, + capture_output=True, + text=True, + ) + + def test_get_pod_logs_custom_tail(self, monkeypatch): + """Test pod logs with custom tail lines.""" + result = MagicMock() + result.returncode = 0 + result.stdout = "Last 50 lines" + mock_run = MagicMock(return_value=result) + monkeypatch.setattr("kube_galaxy.pkg.utils.client.run", mock_run) + + get_pod_logs("default", "my-pod", tail=50) + + args, _kwargs = mock_run.call_args + assert "--tail=50" in args[0] + + def test_get_pod_logs_no_logs(self, monkeypatch): + """Test pod logs when pod has no logs (non-zero exit).""" + result = MagicMock() + result.returncode = 1 + result.stdout = "" + mock_run = MagicMock(return_value=result) + monkeypatch.setattr("kube_galaxy.pkg.utils.client.run", mock_run) + + logs = get_pod_logs("default", "my-pod") + + assert logs == "" + + +class TestCreateNamespace: + """Tests for create_namespace().""" + + def test_create_namespace_success(self, monkeypatch): + """Test successful namespace creation.""" + mock_run = MagicMock() + monkeypatch.setattr("kube_galaxy.pkg.utils.client.run", mock_run) + + create_namespace("test-ns") + + calls = mock_run.call_args_list + assert len(calls) == 1 + assert "kubectl" in calls[0][0][0] + assert "create" in calls[0][0][0] + assert "namespace" in calls[0][0][0] + assert "test-ns" in calls[0][0][0] + + def test_create_namespace_with_labels(self, monkeypatch): + """Test namespace creation with labels.""" + mock_run = MagicMock() + monkeypatch.setattr("kube_galaxy.pkg.utils.client.run", mock_run) + + create_namespace("test-ns", labels={"app": "test", "env": "dev"}) + + calls = mock_run.call_args_list + assert len(calls) == 2 + # Second call should be label command + assert "label" in calls[1][0][0] + assert "app=test" in calls[1][0][0] + assert "env=dev" in calls[1][0][0] + + def test_create_namespace_error(self, monkeypatch): + """Test error when namespace creation fails.""" + mock_run = MagicMock( + side_effect=ShellError( + ["kubectl", "create", "namespace", "test-ns"], 1, "Already exists" + ) + ) + monkeypatch.setattr("kube_galaxy.pkg.utils.client.run", mock_run) + + with pytest.raises(ClusterError, match="Failed to create namespace test-ns"): + create_namespace("test-ns") + + +class TestDeleteNamespace: + """Tests for delete_namespace().""" + + def test_delete_namespace_success(self, monkeypatch): + """Test successful namespace deletion.""" + mock_run = MagicMock() + monkeypatch.setattr("kube_galaxy.pkg.utils.client.run", mock_run) + + delete_namespace("test-ns") + + mock_run.assert_called_once() + args, _kwargs = mock_run.call_args + assert "kubectl" in args[0] + assert "delete" in args[0] + assert "namespace" in args[0] + assert "test-ns" in args[0] + assert "--timeout" in args[0] + assert "60s" in args[0] + + def test_delete_namespace_custom_timeout(self, monkeypatch): + """Test namespace deletion with custom timeout.""" + mock_run = MagicMock() + monkeypatch.setattr("kube_galaxy.pkg.utils.client.run", mock_run) + + delete_namespace("test-ns", timeout=120) + + args, _kwargs = mock_run.call_args + assert "120s" in args[0] + + def test_delete_namespace_not_found(self, monkeypatch): + """Test deleting non-existent namespace (should not fail).""" + mock_run = MagicMock( + side_effect=ShellError( + ["kubectl", "delete", "namespace", "test-ns", "--timeout", "60s"], 1, "not found" + ) + ) + monkeypatch.setattr("kube_galaxy.pkg.utils.client.run", mock_run) + + # Should not raise error for "not found" + delete_namespace("test-ns") + + def test_delete_namespace_other_error(self, monkeypatch): + """Test error on namespace deletion failure (not "not found").""" + mock_run = MagicMock( + side_effect=ShellError( + ["kubectl", "delete", "namespace", "test-ns", "--timeout", "60s"], + 1, + "Permission denied", + ) + ) + monkeypatch.setattr("kube_galaxy.pkg.utils.client.run", mock_run) + + with pytest.raises(ClusterError, match="Failed to delete namespace test-ns"): + delete_namespace("test-ns") + + +class TestApplyManifest: + """Tests for apply_manifest().""" + + def test_apply_manifest_success(self, monkeypatch, tmp_path): + """Test successful manifest application.""" + manifest_file = tmp_path / "test.yaml" + manifest_file.write_text("apiVersion: v1\nkind: Pod") + + mock_run = MagicMock() + monkeypatch.setattr("kube_galaxy.pkg.utils.client.run", mock_run) + + apply_manifest(manifest_file) + + mock_run.assert_called_once() + args, _kwargs = mock_run.call_args + assert "kubectl" in args[0] + assert "apply" in args[0] + assert "-f" in args[0] + assert str(manifest_file) in args[0] + + def test_apply_manifest_with_string_path(self, monkeypatch, tmp_path): + """Test manifest application with string path.""" + manifest_file = tmp_path / "test.yaml" + manifest_file.write_text("apiVersion: v1") + + mock_run = MagicMock() + monkeypatch.setattr("kube_galaxy.pkg.utils.client.run", mock_run) + + apply_manifest(str(manifest_file)) + + mock_run.assert_called_once() + + def test_apply_manifest_file_not_found(self, monkeypatch): + """Test error when manifest file does not exist.""" + with pytest.raises(ClusterError, match="Manifest not found"): + apply_manifest("/nonexistent/manifest.yaml") + + def test_apply_manifest_error(self, monkeypatch, tmp_path): + """Test error when manifest application fails.""" + manifest_file = tmp_path / "test.yaml" + manifest_file.write_text("apiVersion: v1") + + mock_run = MagicMock( + side_effect=ShellError( + ["kubectl", "apply", "-f", str(manifest_file)], 1, "Invalid manifest" + ) + ) + monkeypatch.setattr("kube_galaxy.pkg.utils.client.run", mock_run) + + with pytest.raises(ClusterError, match="Failed to apply manifest"): + apply_manifest(manifest_file) diff --git a/tests/unit/test_status.py b/tests/unit/test_status.py index 4d5f569..ee7d32c 100644 --- a/tests/unit/test_status.py +++ b/tests/unit/test_status.py @@ -1,67 +1,63 @@ -from types import SimpleNamespace - import pytest import typer from kube_galaxy import cli from kube_galaxy.cmd import status as status_cmd -from kube_galaxy.pkg.utils.shell import ShellError +from kube_galaxy.pkg.utils.errors import ClusterError def test_status_wait_runs_readiness_checks(monkeypatch): - calls: list[list[str]] = [] - - def fake_run(command: list[str], **_kwargs): - calls.append(command) - - if command[:3] == ["kubectl", "config", "current-context"]: - return SimpleNamespace(returncode=0, stdout="test-context\n", stderr="") - if command[:3] == ["kubectl", "get", "nodes"] and "-o" not in command: - return SimpleNamespace( - returncode=0, - stdout="NAME STATUS ROLES AGE VERSION\nnode-1 Ready control-plane 1m v1.36.0\n", - stderr="", - ) - - return SimpleNamespace(returncode=0, stdout="ok\n", stderr="") - - monkeypatch.setattr(status_cmd, "run", fake_run) + """Test that status with --wait runs node and pod readiness checks.""" + wait_for_nodes_called = [] + wait_for_pods_called = [] + + def fake_wait_for_nodes(timeout=300, condition="Ready"): + wait_for_nodes_called.append({"timeout": timeout, "condition": condition}) + + def fake_wait_for_pods(namespace="kube-system", timeout=300, condition="Ready"): + wait_for_pods_called.append( + {"namespace": namespace, "timeout": timeout, "condition": condition} + ) + + def fake_get_context(): + return "test-context" + + def fake_get_nodes(): + return "NAME STATUS ROLES AGE VERSION\nnode-1 Ready control-plane 1m v1.36.0\n" + + monkeypatch.setattr(status_cmd, "wait_for_nodes", fake_wait_for_nodes) + monkeypatch.setattr(status_cmd, "wait_for_pods", fake_wait_for_pods) + monkeypatch.setattr(status_cmd, "get_context", fake_get_context) + monkeypatch.setattr(status_cmd, "get_nodes", fake_get_nodes) + monkeypatch.setattr(status_cmd, "get_cluster_info", lambda: "cluster-info") + monkeypatch.setattr(status_cmd, "get_pods", lambda: "pods-output") monkeypatch.setattr(status_cmd, "_check_command", lambda _cmd: "ok") monkeypatch.setattr(status_cmd.shutil, "which", lambda _cmd: "/usr/bin/tool") status_cmd.status(wait=True, timeout=123) - assert [ - "kubectl", - "wait", - "--for=condition=Ready", - "node", - "--all", - "--timeout=123s", - ] in calls - assert [ - "kubectl", - "wait", - "--for=condition=Ready", - "pod", - "--all", - "-n", - "kube-system", - "--timeout=123s", - ] in calls + assert len(wait_for_nodes_called) == 1 + assert wait_for_nodes_called[0]["timeout"] == 123 + assert len(wait_for_pods_called) == 1 + assert wait_for_pods_called[0]["timeout"] == 123 + assert wait_for_pods_called[0]["namespace"] == "kube-system" def test_status_wait_exits_non_zero_on_readiness_failure(monkeypatch): - def fake_run(command: list[str], **_kwargs): - if command[:5] == ["kubectl", "wait", "--for=condition=Ready", "node", "--all"]: - raise ShellError(command, 1, "timed out waiting for node readiness") + """Test that status exits with error code on readiness failure.""" + + def fake_wait_for_nodes(timeout=300, condition="Ready"): + raise ClusterError("timed out waiting for node readiness") - if command[:3] == ["kubectl", "config", "current-context"]: - return SimpleNamespace(returncode=0, stdout="test-context\n", stderr="") + def fake_get_context(): + return "test-context" - return SimpleNamespace(returncode=0, stdout="ok\n", stderr="") + def fake_get_nodes(): + return "NAME STATUS ROLES AGE VERSION\nnode-1 Ready control-plane 1m v1.36.0\n" - monkeypatch.setattr(status_cmd, "run", fake_run) + monkeypatch.setattr(status_cmd, "wait_for_nodes", fake_wait_for_nodes) + monkeypatch.setattr(status_cmd, "get_context", fake_get_context) + monkeypatch.setattr(status_cmd, "get_nodes", fake_get_nodes) monkeypatch.setattr(status_cmd, "_check_command", lambda _cmd: "ok") monkeypatch.setattr(status_cmd.shutil, "which", lambda _cmd: "/usr/bin/tool")