diff --git a/acto/__main__.py b/acto/__main__.py index d01ddd2fc..dea30af67 100644 --- a/acto/__main__.py +++ b/acto/__main__.py @@ -10,6 +10,7 @@ from acto.engine import Acto, apply_testcase from acto.input.input import DeterministicInputModel +from acto.kubernetes_engine import base, kind, provided from acto.lib.operator_config import OperatorConfig from acto.post_process.post_diff_test import PostDiffTest from acto.utils.error_handler import handle_excepthook, thread_excepthook @@ -133,11 +134,28 @@ apply_testcase_f = apply_testcase +kubernetes_engine: base.KubernetesEngine +if config.kubernetes_engine.self_provided: + kubernetes_engine = provided.ProvidedKubernetesEngine( + acto_namespace=0, + feature_gates=config.kubernetes_engine.feature_gates, + num_nodes=config.num_nodes, + version=config.kubernetes_version, + provided=config.kubernetes_engine.provided, + ) +else: + kubernetes_engine = kind.Kind( + acto_namespace=0, + feature_gates=config.kubernetes_engine.feature_gates, + num_nodes=config.num_nodes, + version=config.kubernetes_version, + ) + start_time = datetime.now() acto = Acto( workdir_path=args.workdir_path, operator_config=config, - cluster_runtime="KIND", + kubernetes_engine=kubernetes_engine, context_file=context_cache, helper_crd=args.helper_crd, num_workers=args.num_workers, diff --git a/acto/common.py b/acto/common.py index 9ad528b23..057f0e97a 100644 --- a/acto/common.py +++ b/acto/common.py @@ -5,7 +5,7 @@ import random import re import string -from typing import Any, Sequence, Tuple, TypeAlias, Union +from typing import Any, Optional, Sequence, Tuple, TypeAlias, Union import deepdiff.model as deepdiff_model import kubernetes @@ -391,7 +391,7 @@ def translate_op(input_op: str): def kubernetes_client( - kubeconfig: str, context_name: str + kubeconfig: str, context_name: Optional[str] ) -> kubernetes.client.ApiClient: """Create a kubernetes client from kubeconfig and context name""" return kubernetes.config.kube_config.new_client_from_config( diff --git a/acto/engine.py b/acto/engine.py index bf8dcb88c..4528c029a 100644 --- a/acto/engine.py +++ b/acto/engine.py @@ -33,7 +33,7 @@ from acto.input.testplan import TestGroup from acto.input.value_with_schema import ValueWithSchema, attach_schema_to_value from acto.kubectl_client import KubectlClient -from acto.kubernetes_engine import base, kind +from acto.kubernetes_engine import base from acto.lib.operator_config import OperatorConfig from acto.oracle_handle import OracleHandle from acto.result import ( @@ -149,12 +149,20 @@ def check_state_equality( prev_pods = prev_system_state["pod"] for k, v in curr_pods.items(): - if "owner_reference" in v["metadata"] and v["metadata"]["owner_reference"] is not None and ["owner_references"][0]["kind"] == "Job": + if ( + "owner_reference" in v["metadata"] + and v["metadata"]["owner_reference"] is not None + and v["metadata"]["owner_references"][0]["kind"] == "Job" + ): continue curr_system_state[k] = v - + for k, v in prev_pods.items(): - if "owner_reference" in v["metadata"] and v["metadata"]["owner_reference"] is not None and ["owner_references"][0]["kind"] == "Job": + if ( + "owner_reference" in v["metadata"] + and v["metadata"]["owner_reference"] is not None + and v["metadata"]["owner_references"][0]["kind"] == "Job" + ): continue prev_system_state[k] = v @@ -255,7 +263,7 @@ def __init__( runner_t: type, checker_t: type, wait_time: int, - custom_on_init: Optional[Callable], + custom_on_init: Optional[list[Callable]], custom_checker: Optional[type[CheckerInterface]], workdir: str, cluster: base.KubernetesEngine, @@ -805,7 +813,7 @@ def __init__( self, workdir_path: str, operator_config: OperatorConfig, - cluster_runtime: str, + kubernetes_engine: base.KubernetesEngine, context_file: str, helper_crd: Optional[str], num_workers: int, @@ -833,26 +841,7 @@ def __init__( deploy = Deploy(operator_config.deploy) - if cluster_runtime == "KIND": - cluster = kind.Kind( - acto_namespace=acto_namespace, - feature_gates=operator_config.kubernetes_engine.feature_gates, - num_nodes=operator_config.num_nodes, - version=operator_config.kubernetes_version, - ) - else: - logger.warning( - "Cluster Runtime %s is not supported, defaulted to use kind", - cluster_runtime, - ) - cluster = kind.Kind( - acto_namespace=acto_namespace, - feature_gates=operator_config.kubernetes_engine.feature_gates, - num_nodes=operator_config.num_nodes, - version=operator_config.kubernetes_version, - ) - - self.cluster = cluster + self.cluster = kubernetes_engine self.deploy = deploy self.operator_config = operator_config self.crd_name = operator_config.crd_name @@ -889,7 +878,7 @@ def __init__( self.sequence_base = 0 self.custom_oracle: Optional[type[CheckerInterface]] = None - self.custom_on_init: Optional[Callable] = None + self.custom_on_init: Optional[list[Callable]] = None if operator_config.custom_oracle is not None: module = importlib.import_module(operator_config.custom_oracle) if hasattr(module, "CUSTOM_CHECKER") and issubclass( diff --git a/acto/kubernetes_engine/base.py b/acto/kubernetes_engine/base.py index c8cebe519..36a8ed94f 100644 --- a/acto/kubernetes_engine/base.py +++ b/acto/kubernetes_engine/base.py @@ -6,6 +6,7 @@ import kubernetes from acto.constant import CONST +from acto.lib.operator_config import SelfProvidedKubernetesConfig from acto.utils import get_thread_logger KubernetesEnginePostHookType = Callable[[kubernetes.client.ApiClient], None] @@ -22,13 +23,17 @@ def __init__( feature_gates: Optional[dict[str, bool]] = None, num_nodes=1, version="", + provided: Optional[SelfProvidedKubernetesConfig] = None, ) -> None: """Constructor for KubernetesEngine Args: acto_namespace: the namespace of the acto posthooks: a list of posthooks to be executed after the cluster is created - feature_gates: a list of feature gates to be enabled + feature_gates: a list of Kubernetes feature gates to be enabled + num_nodes: the number of nodes in the cluster + version: the version of Kubernetes + provided: the configuration for a self-provided Kubernetes engine """ @abstractmethod @@ -91,10 +96,10 @@ def restart_cluster(self, name: str, kubeconfig: str): continue break - def get_node_list(self, name: str): + def get_node_list(self, name: str) -> list[str]: """Fetch the name of worker nodes inside a cluster Args: - 1. name: name of the cluster name + name: name of the cluster name """ _ = get_thread_logger(with_prefix=False) diff --git a/acto/kubernetes_engine/kind.py b/acto/kubernetes_engine/kind.py index ee5c1d00c..b7bbec074 100644 --- a/acto/kubernetes_engine/kind.py +++ b/acto/kubernetes_engine/kind.py @@ -2,7 +2,7 @@ import os import subprocess import time -from typing import Any, Dict, List, Optional +from typing import Any, Optional import kubernetes import yaml @@ -19,8 +19,8 @@ class Kind(base.KubernetesEngine): def __init__( self, acto_namespace: int, - posthooks: List[base.KubernetesEnginePostHookType] = None, - feature_gates: Dict[str, bool] = None, + posthooks: Optional[list[base.KubernetesEnginePostHookType]] = None, + feature_gates: Optional[dict[str, bool]] = None, num_nodes=1, version: Optional[str] = None, ): @@ -140,7 +140,7 @@ def create_cluster(self, name: str, kubeconfig: str): if self._posthooks: for posthook in self._posthooks: - posthook(apiclient=apiclient) + posthook(apiclient) def load_images(self, images_archive_path: str, name: str): logging.info("Loading preload images") diff --git a/acto/kubernetes_engine/provided.py b/acto/kubernetes_engine/provided.py new file mode 100644 index 000000000..2dc0e98c5 --- /dev/null +++ b/acto/kubernetes_engine/provided.py @@ -0,0 +1,113 @@ +import logging +import subprocess +from typing import Optional + +import kubernetes + +from acto.common import kubernetes_client, print_event +from acto.constant import CONST + +from . import base + + +class ProvidedKubernetesEngine(base.KubernetesEngine): + """KubernetesEngine for user-provided k8s cluster + + Configuration for user-provided k8s cluster is very limited, + as it is assumed that the user has already set up the cluster. + Everything needs to be deployed in the ACTO_NAMESPACE to provide the + necessary isolation. + """ + + def __init__( + self, + acto_namespace: int, + posthooks: Optional[list[base.KubernetesEnginePostHookType]] = None, + feature_gates: Optional[dict[str, bool]] = None, + num_nodes: int = 1, + version: Optional[str] = None, + provided: Optional[base.SelfProvidedKubernetesConfig] = None, + ): + self._posthooks = posthooks + + if feature_gates: + logging.error("Feature gates are not supported in provided k8s") + + if num_nodes != 1: + logging.error("num_nodes is not supported in provided k8s") + + if version: + logging.error("version is not supported in provided k8s") + + if provided is None: + raise ValueError("Missing configuration for provided k8s") + self._kube_config = provided.kube_config + self._kube_context = provided.kube_context + + def get_context_name(self, cluster_name: str) -> str: + """Returns the kubecontext based on the cluster name + KIND always adds `kind` before the cluster name + """ + return self._kube_context + + def create_cluster(self, name: str, kubeconfig: str): + """Does nothing as the cluster is already created + Args: + name: name of the cluster + config: path of the config file for cluster + version: k8s version + """ + print_event("Connecting to a user-provided Kubernetes cluster...") + + try: + kubernetes.config.load_kube_config( + config_file=self._kube_config, context=self._kube_context + ) + apiclient = kubernetes_client(self._kube_config, self._kube_context) + except Exception as e: + logging.debug("Incorrect kube config file:") + with open(self._kube_config, encoding="utf-8") as f: + logging.debug(f.read()) + raise e + + if self._posthooks: + for posthook in self._posthooks: + posthook(apiclient) + + def load_images(self, images_archive_path: str, name: str): + logging.info("Loading preload images") + cmd = ["kind", "load", "image-archive"] + if images_archive_path is None: + logging.warning( + "No image to preload, we at least should have operator image" + ) + + if name is not None: + cmd.extend(["--name", name]) + else: + logging.error("Missing cluster name for kind load") + + p = subprocess.run(cmd + [images_archive_path], check=False) + if p.returncode != 0: + logging.error("Failed to preload images archive") + + def delete_cluster(self, name: str, kubeconfig: str): + """Cluster deletion via deleting the acto-namespace + Args: + name: name of the cluster + kubeconfig: path of the config file for cluster + kubecontext: context of the cluster + """ + logging.info("Deleting cluster %s", name) + apiclient = kubernetes_client(self._kube_config, self._kube_context) + core_v1 = kubernetes.client.CoreV1Api(apiclient) + core_v1.delete_namespace( + CONST.ACTO_NAMESPACE, propagation_policy="Foreground" + ) + + def get_node_list(self, name: str) -> list[str]: + """We don't have a way to get the node list for a user-provided cluster + Args: + Name of the cluster + """ + return [] diff --git a/acto/lib/operator_config.py b/acto/lib/operator_config.py index 8df0e0bb7..cd998d5f9 100644 --- a/acto/lib/operator_config.py +++ b/acto/lib/operator_config.py @@ -1,4 +1,4 @@ -from typing import Optional +from typing import Any, Optional import pydantic from typing_extensions import Self @@ -134,12 +134,49 @@ class AnalysisConfig(pydantic.BaseModel, extra="forbid"): ) +class SelfProvidedKubernetesConfig(pydantic.BaseModel, extra="forbid"): + """Configuration for self-provided Kubernetes engine""" + + kube_config: str = pydantic.Field( + description="Path to the kubeconfig file for the Kubernetes cluster" + ) + kube_context: str = pydantic.Field( + description="Context name for the Kubernetes cluster" + ) + + class KubernetesEngineConfig(pydantic.BaseModel, extra="forbid"): """Configuration for Kubernetes""" + num_nodes: int = pydantic.Field( + description="Number of workers in the Kubernetes cluster", default=4 + ) + kubernetes_version: str = pydantic.Field( + default="v1.28.0", description="Kubernetes version" + ) feature_gates: dict[str, bool] = pydantic.Field( description="Path to the feature gates file", default=None ) + self_provided: Optional[SelfProvidedKubernetesConfig] = pydantic.Field( + default=None, + description="Configuration for self-provided Kubernetes engine", + ) + + @pydantic.model_validator(mode="before") + @classmethod + def check_self_provided(cls, data: Any) -> Any: + """Check if the self-provided Kubernetes engine is valid""" + if isinstance(data, dict) and "self_provided" in data: + if ( + "num_nodes" in data + or "kubernetes_version" in data + or "feature_gates" in data + ): + raise ValueError( + "num_nodes, kubernetes_version, and feature_gates " + + "are not supported in self-provided Kubernetes engine" + ) + return data class OperatorConfig(pydantic.BaseModel, extra="forbid"): diff --git a/acto/post_process/post_diff_test.py b/acto/post_process/post_diff_test.py index 03c8666e7..3dfc73f54 100644 --- a/acto/post_process/post_diff_test.py +++ b/acto/post_process/post_diff_test.py @@ -29,7 +29,7 @@ from acto.common import invalid_input_message_regex, kubernetes_client from acto.deploy import Deploy from acto.kubectl_client.kubectl import KubectlClient -from acto.kubernetes_engine import base, kind +from acto.kubernetes_engine import base, kind, provided from acto.lib.operator_config import OperatorConfig from acto.post_process.post_process import PostProcessor from acto.result import ( @@ -692,12 +692,23 @@ def post_process(self, workdir: str, num_workers: int = 1): """Start the post process""" if not os.path.exists(workdir): os.mkdir(workdir) - cluster = kind.Kind( - acto_namespace=self.acto_namespace, - feature_gates=self.config.kubernetes_engine.feature_gates, - num_nodes=self.config.num_nodes, - version=self.config.kubernetes_version, - ) + + kubernetes_engine: base.KubernetesEngine + if self.config.kubernetes_engine.self_provided: + kubernetes_engine = provided.ProvidedKubernetesEngine( + acto_namespace=self.acto_namespace, + feature_gates=self.config.kubernetes_engine.feature_gates, + num_nodes=self.config.num_nodes, + version=self.config.kubernetes_version, + provided=self.config.kubernetes_engine.self_provided, + ) + else: + kubernetes_engine = kind.Kind( + acto_namespace=self.acto_namespace, + feature_gates=self.config.kubernetes_engine.feature_gates, + num_nodes=self.config.num_nodes, + version=self.config.kubernetes_version, + ) deploy = Deploy(self.config.deploy) # Build an archive to be preloaded images_archive = os.path.join(workdir, "images.tar") @@ -722,7 +733,7 @@ def post_process(self, workdir: str, num_workers: int = 1): self.context, deploy, workdir, - cluster, + kubernetes_engine, i, self.acto_namespace, ) @@ -796,12 +807,23 @@ def check_diff_test_result( additional_runner_dir = os.path.join( workdir, f"additional-runner-{worker_id}" ) - cluster = kind.Kind( - acto_namespace=self.acto_namespace, - feature_gates=self.config.kubernetes_engine.feature_gates, - num_nodes=self.config.num_nodes, - version=self.config.kubernetes_version, - ) + + kubernetes_engine: base.KubernetesEngine + if self.config.kubernetes_engine.self_provided: + kubernetes_engine = provided.ProvidedKubernetesEngine( + acto_namespace=self.acto_namespace, + feature_gates=self.config.kubernetes_engine.feature_gates, + num_nodes=self.config.num_nodes, + version=self.config.kubernetes_version, + provided=self.config.kubernetes_engine.self_provided, + ) + else: + kubernetes_engine = kind.Kind( + acto_namespace=self.acto_namespace, + feature_gates=self.config.kubernetes_engine.feature_gates, + num_nodes=self.config.num_nodes, + version=self.config.kubernetes_version, + ) deploy = Deploy(self.config.deploy) @@ -809,7 +831,7 @@ def check_diff_test_result( context=self.context, deploy=deploy, workdir=additional_runner_dir, - cluster=cluster, + cluster=kubernetes_engine, worker_id=worker_id, acto_namespace=self.acto_namespace, ) diff --git a/acto/reproduce.py b/acto/reproduce.py index 791f26418..e98d92863 100644 --- a/acto/reproduce.py +++ b/acto/reproduce.py @@ -19,6 +19,7 @@ from acto.input.testcase import TestCase from acto.input.testplan import TestGroup from acto.input.value_with_schema import ValueWithSchema +from acto.kubernetes_engine import base, kind, provided from acto.lib.operator_config import OperatorConfig from acto.post_process.post_diff_test import PostDiffTest from acto.result import OracleResults @@ -190,9 +191,9 @@ def get_seed_input(self) -> dict: return self.seed_input def generate_test_plan( - self, delta_from: str = None, focus_fields: list = None + self, + focus_fields: Optional[list] = None, ) -> dict: - _ = delta_from _ = focus_fields return {} @@ -231,7 +232,6 @@ def reproduce( reproduce_dir: str, operator_config: str, acto_namespace: int, - **kwargs, ) -> List[OracleResults]: """Reproduce the trial folder""" os.makedirs(workdir_path, exist_ok=True) @@ -246,7 +246,7 @@ def reproduce( logging.getLogger("sh").setLevel(logging.ERROR) with open(operator_config, "r", encoding="utf-8") as config_file: - config = OperatorConfig(**json.load(config_file)) + config = OperatorConfig.model_validate(config_file) context_cache = os.path.join( os.path.dirname(config.seed_custom_resource), "context.json" ) @@ -255,11 +255,27 @@ def reproduce( ) apply_testcase_f = apply_repro_testcase + kubernetes_engine: base.KubernetesEngine + if config.kubernetes_engine.self_provided: + kubernetes_engine = provided.ProvidedKubernetesEngine( + acto_namespace=0, + feature_gates=config.kubernetes_engine.feature_gates, + num_nodes=config.num_nodes, + version=config.kubernetes_version, + provided=config.kubernetes_engine.self_provided, + ) + else: + kubernetes_engine = kind.Kind( + acto_namespace=0, + feature_gates=config.kubernetes_engine.feature_gates, + num_nodes=config.num_nodes, + version=config.kubernetes_version, + ) + acto = Acto( workdir_path=workdir_path, operator_config=config, - cluster_runtime=kwargs["cluster_runtime"], - preload_images_=[], + kubernetes_engine=kubernetes_engine, context_file=context_cache, helper_crd=None, num_workers=1, @@ -272,7 +288,7 @@ def reproduce( acto_namespace=acto_namespace, ) - errors = acto.run(modes=["normal"]) + errors = acto.run() return [error for error in errors if error is not None] @@ -344,6 +360,5 @@ def reproduce_postdiff( reproduce_dir=args.reproduce_dir, operator_config=args.config, acto_namespace=args.acto_namespace, - cluster_runtime=args.cluster_runtime, ) end_time = datetime.now() diff --git a/test/integration_tests/test_kubernetes_engines.py b/test/integration_tests/test_kubernetes_engines.py index 03fafc056..ab976893c 100644 --- a/test/integration_tests/test_kubernetes_engines.py +++ b/test/integration_tests/test_kubernetes_engines.py @@ -8,6 +8,8 @@ from acto.kubernetes_engine.base import KubernetesEngine from acto.kubernetes_engine.kind import Kind from acto.kubernetes_engine.minikube import Minikube +from acto.kubernetes_engine.provided import ProvidedKubernetesEngine +from acto.lib.operator_config import SelfProvidedKubernetesConfig testcases = [("kind", 4, "v1.27.3")] @@ -43,3 +45,21 @@ def test_kubernetes_engines(cluster_type: str, num_nodes, version): # expect to raise RuntimeError # get_node_list should raise RuntimeError when cluster is not found cluster_instance.get_node_list(name) + + +@pytest.mark.kubernetes_engine +def test_user_provided_kubernetes(): + """Test creating a user provided kubernetes cluster from Kind""" + config_path = os.path.join(os.path.expanduser("~"), ".kube/test-config") + name = "test-cluster" + cluster_instance = Kind(acto_namespace=0, num_nodes=1, version="v1.27.3") + cluster_instance.restart_cluster(name, config_path) + + provided = ProvidedKubernetesEngine( + acto_namespace=0, + provided=SelfProvidedKubernetesConfig( + kube_config=config_path, + kube_context=cluster_instance.get_context_name(name), + ), + ) + provided.create_cluster(name, config_path) diff --git a/test/integration_tests/test_learn.py b/test/integration_tests/test_learn.py index 6195ba37a..fffb489ab 100644 --- a/test/integration_tests/test_learn.py +++ b/test/integration_tests/test_learn.py @@ -6,6 +6,7 @@ from acto.engine import Acto, apply_testcase from acto.input.input import DeterministicInputModel +from acto.kubernetes_engine import kind from acto.lib.operator_config import OperatorConfig test_dir = pathlib.Path(__file__).parent.resolve() @@ -33,10 +34,17 @@ def test_statefulset_operator(self): os.path.dirname(config.seed_custom_resource), "context.json" ) + kubernetes_engine = kind.Kind( + acto_namespace=0, + feature_gates=config.kubernetes_engine.feature_gates, + num_nodes=config.num_nodes, + version=config.kubernetes_version, + ) + Acto( workdir_path=workdir_path, operator_config=config, - cluster_runtime="KIND", + kubernetes_engine=kubernetes_engine, context_file=context_cache, helper_crd=None, num_workers=1,