diff --git a/acto/runner/runner.py b/acto/runner/runner.py index c3214e8061..f3b6f953ed 100644 --- a/acto/runner/runner.py +++ b/acto/runner/runner.py @@ -15,6 +15,7 @@ from acto.kubectl_client import KubectlClient from acto.snapshot import Snapshot from acto.utils import acto_timer, get_thread_logger +from acto.utils.k8s_event_watcher import K8sEventWatcher RunnerHookType = Callable[[kubernetes.client.ApiClient], None] CustomSystemStateHookType = Callable[ @@ -456,7 +457,7 @@ def wait_for_system_converge(self, hard_timeout=480) -> bool: while True: try: event = combined_event_queue.get(timeout=self.wait_time) - if event == "timeout": + if event in ["timeout", K8sEventWatcher.ABORT]: converge = False break except queue.Empty: @@ -590,9 +591,13 @@ def wait_for_system_converge(self, hard_timeout=480) -> bool: def watch_system_events(self, event_stream, q: multiprocessing.Queue): """A process that watches namespaced events""" - for _ in event_stream: + + watcher = K8sEventWatcher(q) + + for payload in event_stream: try: q.put("event") + watcher.observe(payload) except (ValueError, AssertionError): pass diff --git a/acto/utils/k8s_event_watcher.py b/acto/utils/k8s_event_watcher.py new file mode 100644 index 0000000000..6cfa874bba --- /dev/null +++ b/acto/utils/k8s_event_watcher.py @@ -0,0 +1,135 @@ +"""determines whether cluster is stuck in an unhealthy state through interpreting K8s events""" + +import json +import multiprocessing +from typing import Callable, Optional +import copy +from acto.utils.thread_logger import get_thread_logger + + +class Predicate: + """Predicate for deciding abort""" + + def __init__( + self, + reason: str, + message_filter: Callable[[Optional[str]], bool] = lambda x: True, + threshold: Optional[int] = None, + ) -> None: + self.reason = reason + self.message_filter = ( + message_filter # event count threshold for deciding an abort + ) + self.threshold = threshold + + def match(self, reason: str, message: str = ""): + """decide whether a k8s event reason and message matches predicate""" + return self.reason == reason and self.message_filter(message) + + def __str__(self) -> str: + return f"(reason: {self.reason}, count_threshold: {self.threshold})" + + +# todo: unify this with other Acto configs +k8s_event_watcher_config = { + "default_threshold": 3, + "abort_predicates": [ + # a full list of kubelet emitted reason can be found at + # https://github.com/kubernetes/kubernetes/blob/master/pkg/kubelet/events/event.go + # event reason emitted by K8s controllers however, needs a scan + # from source code unfortunately + # kublet reasons, todo: add fine-calibrated message filters + Predicate("Failed"), + Predicate("BackOff", threshold=5), + Predicate("FailedCreatePodContainer"), + Predicate("ErrImageNeverPull"), + Predicate("FailedAttachVolume"), + Predicate("FailedMount"), + Predicate("VolumeResizeFailed"), + Predicate("FileSystemResizeFailed"), + Predicate("FailedMapVolume"), + # it is possible that scheduling fails due to node.kubernetes.io/not-ready + # which should be transient. We need to filter for truly alarming ones + Predicate( + "FailedScheduling", + lambda msg: any( + keyword in msg + for keyword in [ + "affinity", + "Insufficient memory", + "Insufficient cpu", + ] + ), + ), + ], +} + + +class K8sEventWatcher: + """watch for K8s events that might signal an unresolvable state + and request Acto to abort the convergence wait""" + + ABORT = "k8s_event_watcher_abort_request" + + def __init__(self, output_channel: multiprocessing.Queue) -> None: + self.logger = get_thread_logger(with_prefix=True) + self.output_channel = output_channel + self.counter = dict() + self.abort_requested = False + self.abort_predicates = copy.deepcopy( + k8s_event_watcher_config.get("abort_predicates", []) + ) + for predicate in self.abort_predicates: + t = predicate.threshold + predicate.threshold = ( + t + if t is not None and t > 0 + else k8s_event_watcher_config.get("default_threshold", 3) + ) + + def observe(self, payload: bytes) -> None: + """observe a K8s event in json byte string format""" + if ( + self.abort_requested + ): # do nothing since we have already requested Acto to abort the convergence wait + return + + try: + event: dict = json.loads(payload.decode("utf-8")) + reason = event.get("object", {}).get("reason") + message = event.get("object", {}).get("message") + except Exception: + self.logger.warning( + "Failed to deserialize K8s event from payload %s", str(payload) + ) + return + + for predicate in self.abort_predicates: + if predicate.match(reason, message): + involved_object = event["object"].get("involvedObject", {}) + self.logger.info( + "Observered K8s event matching abort predicate %s for object %s" + ,predicate, str(involved_object) + ) + object_id = involved_object.get("uid", "") + + need_abort = self._inc_and_check(object_id, predicate) + if need_abort: + self.logger.warning( + "Aborting convergence wait due to failed predicate %s", + predicate + ) + self.output_channel.put(self.ABORT) + self.abort_requested = True + break + + def _inc_and_check(self, object_id: str, predicate: Predicate) -> bool: + if not predicate in self.counter: + self.counter[predicate] = {} + + if not object_id in self.counter[predicate]: + self.counter[predicate][object_id] = 0 + + self.counter[predicate][object_id] += 1 + need_abort = self.counter[predicate][object_id] >= predicate.threshold + return need_abort diff --git a/test/e2e_tests/test_convergence_wait_abort.py b/test/e2e_tests/test_convergence_wait_abort.py new file mode 100644 index 0000000000..a31056bf45 --- /dev/null +++ b/test/e2e_tests/test_convergence_wait_abort.py @@ -0,0 +1,124 @@ +"""This module tests quick abort from waiting for system convergence upon observing unresolvable +errors from K8s events when deploying a testcase""" + +import logging +import os +import pathlib +from typing import Callable +import tempfile +import unittest +from acto import utils +from acto.kubernetes_engine.kind import Kind +from acto.utils.k8s_event_watcher import k8s_event_watcher_config +from acto.runner import Runner + +test_dir = pathlib.Path(__file__).parent.resolve() +test_data_dir = os.path.join(test_dir, "test_data") + + +class TestConvergenceWaitAbort(unittest.TestCase): + """tests if K8sEventWatcher correctly issues abort request by examing log output""" + + def __init__(self, methodName: str = "runTest") -> None: + super().__init__(methodName) + # lower threshold for the sake of faster test + k8s_event_watcher_config["default_threshold"] = 2 + + def test_unsatisfiable_affinity_rule(self): + """should issue abort when affinity cannot be satisfied""" + + def log_file_test(log_file_path) -> bool: + keyword = "Aborting convergence wait due to failed predicate (reason: FailedScheduling," + with open(log_file_path, "r", encoding="utf-8") as log_file: + for log_line in log_file: + if keyword in log_line: + return True + return False + + resource_manifest_path = os.path.join( + test_data_dir, "k8s-event-watcher", "unsatisfiable-affinity.yaml" + ) + self._test_convergence_wait_abort( + "unsatisfiable-affinity", resource_manifest_path, log_file_test + ) + + def test_invalid_image(self): + """should abort when detecting image pull errors""" + def log_file_test(log_file_path) -> bool: + keyword = "Aborting convergence wait due to failed predicate (reason: Failed," + with open(log_file_path, "r", encoding="utf-8") as log_file: + for log_line in log_file: + if keyword in log_line: + return True + return False + + resource_manifest_path = os.path.join( + test_data_dir, "k8s-event-watcher", "invalid-image.yaml" + ) + self._test_convergence_wait_abort( + "invalid-image", resource_manifest_path, log_file_test + ) + + def test_satisfiable_deployment(self): + "should never abort a convergence wait for satisfiable deployments" + def log_file_test(log_file_path) -> bool: + keyword = "Aborting convergence" + with open(log_file_path, "r", encoding="utf-8") as log_file: + for log_line in log_file: + if keyword in log_line: + return False + return True + + resource_manifest_path = os.path.join( + test_data_dir, "k8s-event-watcher", "satisfiable-deployment.yaml" + ) + self._test_convergence_wait_abort( + "satisfiable", resource_manifest_path, log_file_test + ) + + + def _test_convergence_wait_abort( + self, + cluster_name: str, + resource_file_path: str, + log_test_predicate: Callable[[str], bool], + ) -> str: + """apply a resource manifest and examine the log file""" + + tmp_dir = tempfile.TemporaryDirectory() + + log_file_path = os.path.join(tmp_dir.name, "test.log") + + logging.basicConfig( + filename=log_file_path, + level=logging.WARN, + format="%(message)s", + force=True, + ) + + kube_config_path = os.path.join( + os.path.expanduser("~"), ".kube/test-" + cluster_name + ) + + cluster = Kind(acto_namespace=0, num_nodes=3, version="v1.27.3") + + cluster.create_cluster(cluster_name, kube_config_path) + + runner = Runner( + context={ + "namespace": "test", + "crd": None, + "preload_images": set(), + }, + trial_dir=tmp_dir.name, + kubeconfig=kube_config_path, + context_name="kind-" + cluster_name, + ) + + utils.create_namespace(runner.apiclient, "test") + + runner.run_without_collect(resource_file_path) + cluster.delete_cluster(cluster_name, kube_config_path) + + assert log_test_predicate(log_file_path) + tmp_dir.cleanup() diff --git a/test/e2e_tests/test_data/k8s-event-watcher/invalid-image.yaml b/test/e2e_tests/test_data/k8s-event-watcher/invalid-image.yaml new file mode 100644 index 0000000000..2f76fc3f87 --- /dev/null +++ b/test/e2e_tests/test_data/k8s-event-watcher/invalid-image.yaml @@ -0,0 +1,30 @@ +apiVersion: apps/v1 +kind: Deployment +metadata: + name: busybox-deployment +spec: + selector: + matchLabels: + app: busybox + replicas: 12 + template: + metadata: + labels: + app: busybox + spec: + containers: + - name: busybox + image: strange-invalid-image + resources: + requests: + cpu: 1 + + limits: + cpu: 1 + + command: + - "sh" + - "-c" + - "while true; do sleep 3600; done" + + \ No newline at end of file diff --git a/test/e2e_tests/test_data/k8s-event-watcher/satisfiable-deployment.yaml b/test/e2e_tests/test_data/k8s-event-watcher/satisfiable-deployment.yaml new file mode 100644 index 0000000000..3aff7f028f --- /dev/null +++ b/test/e2e_tests/test_data/k8s-event-watcher/satisfiable-deployment.yaml @@ -0,0 +1,23 @@ +apiVersion: apps/v1 +kind: Deployment +metadata: + name: busybox-deployment +spec: + selector: + matchLabels: + app: busybox + replicas: 3 + template: + metadata: + labels: + app: busybox + spec: + containers: + - name: busybox + image: busybox + command: + - "sh" + - "-c" + - "while true; do sleep 3600; done" + + \ No newline at end of file diff --git a/test/e2e_tests/test_data/k8s-event-watcher/unsatisfiable-affinity.yaml b/test/e2e_tests/test_data/k8s-event-watcher/unsatisfiable-affinity.yaml new file mode 100644 index 0000000000..f584b43371 --- /dev/null +++ b/test/e2e_tests/test_data/k8s-event-watcher/unsatisfiable-affinity.yaml @@ -0,0 +1,33 @@ +apiVersion: apps/v1 +kind: Deployment +metadata: + name: busybox-deployment +spec: + selector: + matchLabels: + app: busybox + replicas: 5 #unsatisfiable affinity rule when we have only 3 worker nodes + template: + metadata: + labels: + app: busybox + spec: + affinity: + podAntiAffinity: + requiredDuringSchedulingIgnoredDuringExecution: + - labelSelector: + matchExpressions: + - key: app + operator: In + values: + - busybox + topologyKey: kubernetes.io/hostname + containers: + - name: busybox + image: busybox + command: + - "sh" + - "-c" + - "while true; do sleep 3600; done" + + \ No newline at end of file