diff --git a/.dockerignore b/.dockerignore index aaa5386..dfc37ef 100644 --- a/.dockerignore +++ b/.dockerignore @@ -6,4 +6,4 @@ build.sh .gitignore kubernetes/* venv/* -config_* +configd_* diff --git a/.gitignore b/.gitignore index 9c91466..0e78636 100644 --- a/.gitignore +++ b/.gitignore @@ -10,4 +10,3 @@ venv/* *.log /config*.py /config*.ini -.idea diff --git a/.idea/workspace.xml b/.idea/workspace.xml deleted file mode 100644 index 5bf9507..0000000 --- a/.idea/workspace.xml +++ /dev/null @@ -1,357 +0,0 @@ - - - - - - - - - - - - - - - - - - - - - - - - - { - "associatedIndex": 1 -} - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - 1619798041708 - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - \ No newline at end of file diff --git a/Dockerfile b/Dockerfile index c363ad8..8144b3c 100644 --- a/Dockerfile +++ b/Dockerfile @@ -1,5 +1,5 @@ -FROM python:3.11.0a3 -LABEL maintainer="ms-github@256bit.org" +FROM python:3.11 +LABEL maintainer="operations@vico-research.com" LABEL Description="zabbix-kubernetes - efficent kubernetes monitoring for zabbix" MAINTAINER operations@vico-research.com @@ -10,23 +10,23 @@ ENV ZABBIX_SERVER "zabbix" ENV ZABBIX_HOST "k8s" ENV CRYPTOGRAPHY_DONT_BUILD_RUST "1" -COPY --chown=nobody:users requirements.txt /app/requirements.txt +WORKDIR /app +COPY --chown=nobody:users Pipfile /app/ -RUN apt-get update -y && \ - apt-get install libffi-dev libffi7 libssl-dev bash screen ncdu -y && \ - pip3 install --upgrade pip && \ - pip3 install -r /app/requirements.txt && \ - apt-get upgrade -y && \ - apt-get dist-upgrade -y && \ - apt-get remove base libssl-dev libffi-dev gcc -y && \ - apt-get autoremove -y && \ - rm -rf /var/lib/apt/lists/* /root/.cache +RUN apt-get update -y +RUN apt-get upgrade -y +RUN apt-get dist-upgrade -y +RUN apt-get install libffi-dev libffi8 libssl-dev bash screen ncdu -y +RUN pip install --root-user-action=ignore --upgrade pip && pip install --root-user-action=ignore pipenv +RUN PIPENV_USE_SYSTEM=1 pipenv install --skip-lock --system +RUN apt-get remove base libssl-dev libffi-dev gcc -y +RUN apt-get autoremove -y +RUN rm -rf /var/lib/apt/lists/* /root/.cache COPY --chown=nobody:users base /app/base COPY --chown=nobody:users k8sobjects /app/k8sobjects COPY --chown=nobody:users check_kubernetesd /app/check_kubernetesd +COPY --chown=nobody:users config_default.ini /app/config_default.ini USER nobody -WORKDIR /app - ENTRYPOINT [ "/app/check_kubernetesd" ] diff --git a/Jenkinsfile b/Jenkinsfile index 0ac6578..ab37bcc 100644 --- a/Jenkinsfile +++ b/Jenkinsfile @@ -4,6 +4,7 @@ pipeline { agent any parameters { booleanParam(defaultValue: false, description: 'Create release', name: 'RELEASE') + string(name: 'REPO', defaultValue: '-', description: 'Repo for docker') } triggers { @@ -24,7 +25,7 @@ pipeline { steps { ansiColor('xterm') { sh 'git fetch --tags' - sh './build.sh cleanup' + sh "./build.sh cleanup ${params.TARGET}" } } } @@ -32,7 +33,7 @@ pipeline { steps { ansiColor('xterm') { sh 'git fetch --tags' - sh "./build.sh default vicoconsulting" + sh "./build.sh default ${params.TARGET}" } } } @@ -42,7 +43,7 @@ pipeline { } steps { ansiColor('xterm') { - sh './build.sh publish_image vicoconsulting' + sh "./build.sh publish_image ${params.TARGET}" } } } diff --git a/Pipfile b/Pipfile new file mode 100644 index 0000000..125f49e --- /dev/null +++ b/Pipfile @@ -0,0 +1,22 @@ +[[source]] +url = "https://pypi.org/simple" +verify_ssl = true +name = "pypi" + +[packages] +kubernetes = "==21.7.0" +cryptography = "==36.0.1" +types-cryptography = "==3.3.10" +py-zabbix = "==1.1.7" +sentry-sdk = "==1.5.1" +adal = "==1.2.7" +urllib3 = "==1.26.7" +pytest = "==6.2.5" +mypy = "==0.930" +flake8 = "==4.0.1" +coloredlogs = "==15.0.1" + +[dev-packages] + +[requires] +python_version = "3.11" diff --git a/README.md b/README.md index 3098f0a..6beab05 100644 --- a/README.md +++ b/README.md @@ -32,7 +32,7 @@ The solution currently supervises the following types of Kubernetes entities: For details or a overview of the monitored kubernetes attributes, have a look at the [documentation](http://htmlpreview.github.io/?https://github.com/zabbix-tooling/k8s-zabbix/blob/master/documentation/template/custom_service_kubernetes.html) -The current docker image is published at https://hub.docker.com/repository/docker/scoopex666/k8s-zabbix/ +The current docker image is published at https://hub.docker.com/r/zabbixtooling/k8s-zabbix/ Architecture Details ===================== @@ -141,6 +141,20 @@ Production Deployment * "Monitoring" → "Latest data" → "Add Hosts": i.e. "k8s-prod-001" * Enable Option "Show items without data" → Button "Apply" +Configuration +===================== +All Configuration Options are available through ENV Variables +for a list of options check *config_default.py* + + * K8S_CONFIG_TYPE + * incluster + * use default kubeconfig + * kubeconfig + * load kubeconfig file from current user + * token + * use token auth + + Unix Signals ============ diff --git a/base/config.py b/base/config.py index 93e1811..5b74f23 100644 --- a/base/config.py +++ b/base/config.py @@ -19,7 +19,7 @@ class ClusterAccessConfigType(Enum): TOKEN = "token" -logger = logging.getLogger(__file__) +logger = logging.getLogger("k8s-zabbix") @dataclass(order=True) @@ -38,6 +38,8 @@ class Configuration: sentry_enabled: bool = False sentry_dsn: str = "" + container_crawling: str = "container" + zabbix_server: str = 'example.zabbix-server.com' zabbix_resources_exclude: list[str] = field(default_factory=lambda: []) zabbix_host: str = 'k8s-example-host' @@ -59,6 +61,11 @@ class Configuration: discovery_interval_slow: int = 60 * 60 * 2 resend_data_interval_slow: int = 60 * 30 + discovery_interval_delay: int = 120 + data_resend_interval_delay: int = 180 + + data_refresh_interval: int = 60 * 60 * 4 + def _convert_to_type(self, field_name: str, value: str | list[str] | bool | int | ClusterAccessConfigType) -> \ str | list[str] | bool | int | ClusterAccessConfigType: diff --git a/base/daemon_thread.py b/base/daemon_thread.py index 4741933..79c08d2 100644 --- a/base/daemon_thread.py +++ b/base/daemon_thread.py @@ -1,4 +1,5 @@ import logging +import json import re import signal import sys @@ -8,18 +9,21 @@ from datetime import datetime, timedelta from pprint import pformat -import kubernetes +from k8sobjects.k8sobject import K8sObject +from k8sobjects.k8sresourcemanager import K8sResourceManager +from k8sobjects.pvc import get_pvc_volumes_for_all_nodes +from k8sobjects.container import get_container_zabbix_metrics from kubernetes import client, watch from kubernetes import config as kube_config -from kubernetes.client import ApiClient, CoreV1Api, AppsV1Api, ApiextensionsV1Api -from pyzabbix import ZabbixMetric, ZabbixSender, ZabbixResponse +from kubernetes import watch +from kubernetes.client import (ApiClient, AppsV1Api, CoreV1Api, + ApiextensionsV1Api) +from pyzabbix import ZabbixMetric, ZabbixResponse, ZabbixSender -from base.config import Configuration, ClusterAccessConfigType +from base.config import ClusterAccessConfigType, Configuration from base.timed_threads import TimedThread from base.watcher_thread import WatcherThread -from k8sobjects.k8sobject import K8sObject -from k8sobjects.k8sresourcemanager import K8sResourceManager -from k8sobjects.pvc import get_pvc_volumes_for_all_nodes + from .web_api import WebApi exit_flag = threading.Event() @@ -40,15 +44,13 @@ def get_discovery_timeout_datetime() -> datetime: class KubernetesApi: - __shared_state = dict(core_v1=None, - apps_v1=None, - extensions_v1=None) + __shared_state = dict(core_v1=None, apps_v1=None, extensions_v1=None) def __init__(self, api_client: ApiClient): self.__dict__ = self.__shared_state - if not getattr(self, 'core_v1', None): + if not getattr(self, "core_v1", None): self.core_v1 = client.CoreV1Api(api_client) - if not getattr(self, 'apps_v1', None): + if not getattr(self, "apps_v1", None): self.apps_v1 = client.AppsV1Api(api_client) if not getattr(self, 'extensions_v1', None): self.extensions_v1 = client.ApiextensionsV1Api(api_client) @@ -58,16 +60,20 @@ class CheckKubernetesDaemon: data: dict[str, K8sResourceManager] = {} discovery_sent: dict[str, datetime] = {} thread_lock = threading.Lock() + data_refreshed: dict[str, datetime] = {} def __init__(self, config: Configuration, resources: list[str], - discovery_interval: int, data_resend_interval: int, + discovery_interval: int, + data_resend_interval: int, + data_refresh_interval: int, ): self.manage_threads: list[TimedThread | WatcherThread] = [] - self.config = config - self.logger = logging.getLogger(__file__) + self.config: Configuration = config + self.logger = logging.getLogger("k8s-zabbix") self.discovery_interval = int(discovery_interval) self.data_resend_interval = int(data_resend_interval) + self.data_refresh_interval = int(data_refresh_interval) self.api_zabbix_interval = 60 self.rate_limit_seconds = 30 @@ -91,13 +97,15 @@ def __init__(self, config: Configuration, self.logger.info(f"Initialized cluster access for {config.k8s_config_type}") # K8S API self.debug_k8s_events = False - self.core_v1 = KubernetesApi(self.api_client).core_v1 - self.apps_v1 = KubernetesApi(self.api_client).apps_v1 - self.extensions_v1 = KubernetesApi(self.api_client).extensions_v1 + self.apis = { + 'core_v1': KubernetesApi(self.api_client).core_v1, + 'apps_v1': KubernetesApi(self.api_client).apps_v1, + 'extensions_v1': KubernetesApi(self.api_client).extensions_v1 + } self.zabbix_sender = ZabbixSender(zabbix_server=config.zabbix_server) self.zabbix_resources = CheckKubernetesDaemon.exclude_resources(resources, - self.config.zabbix_resources_exclude) + config.zabbix_resources_exclude) self.zabbix_host = config.zabbix_host self.zabbix_debug = config.zabbix_debug self.zabbix_single_debug = config.zabbix_single_debug @@ -106,14 +114,14 @@ def __init__(self, config: Configuration, self.web_api = None self.web_api_enable = config.web_api_enable self.web_api_resources = CheckKubernetesDaemon.exclude_resources(resources, - self.config.web_api_resources_exclude) + config.web_api_resources_exclude) self.web_api_host = config.web_api_host self.web_api_token = config.web_api_token self.web_api_cluster = config.web_api_cluster self.web_api_verify_ssl = config.web_api_verify_ssl - self.resources = CheckKubernetesDaemon.exclude_resources(resources, self.config.resources_exclude) + self.resources = CheckKubernetesDaemon.exclude_resources(resources, config.resources_exclude) self.logger.info(f"Init K8S-ZABBIX Watcher for resources: {','.join(self.resources)}") self.logger.info(f"Zabbix Host: {self.zabbix_host} / Zabbix Proxy or Server: {config.zabbix_server}") @@ -130,11 +138,11 @@ def exclude_resources(available_types: list[str], excluded_types: list[str]) -> def handler(self, signum: int, *args: str) -> None: if signum in [signal.SIGTERM]: - self.logger.info('Signal handler called with signal %s... stopping (max %s seconds)' % (signum, 3)) + self.logger.info("Signal handler called with signal %s... stopping (max %s seconds)" % (signum, 3)) exit_flag.set() for thread in self.manage_threads: thread.join(timeout=3) - self.logger.info('All threads exited... exit check_kubernetesd') + self.logger.info("All threads exited... exit check_kubernetesd") sys.exit(0) elif signum in [signal.SIGUSR1]: self.logger.info('=== Listing count of data hold in CheckKubernetesDaemon.data ===') @@ -142,8 +150,7 @@ def handler(self, signum: int, *args: str) -> None: for r, d in self.data.items(): for obj_name, obj_d in d.objects.items(): self.logger.info( - f"resource={r}, last_sent_zabbix={obj_d.last_sent_zabbix}, " + - f"last_sent_web={obj_d.last_sent_web}" + f"resource={r}, [{obj_name}], last_sent_zabbix={obj_d.last_sent_zabbix}, " + f"last_sent_web={obj_d.last_sent_web}" ) for resource_discovered, resource_discovered_time in self.discovery_sent.items(): self.logger.info( @@ -162,38 +169,33 @@ def run(self) -> None: self.start_loop_send_discovery_threads() self.start_resend_threads() + def excepthook(self, args): + self.logger.exception(f"Thread '{self.resources}' failed: {args.exc_value}") + def start_data_threads(self) -> None: thread: WatcherThread | TimedThread + threading.excepthook = self.excepthook for resource in self.resources: with self.thread_lock: - self.data.setdefault(resource, K8sResourceManager(resource, zabbix_host=self.zabbix_host)) - if resource == 'pods': - self.data.setdefault('containers', K8sResourceManager('containers')) - - # watcher threads - if resource == 'containers': - pass - elif resource == 'components': - thread = TimedThread(resource, self.data_resend_interval, exit_flag, - daemon_object=self, daemon_method='watch_data') - self.manage_threads.append(thread) - thread.start() - elif resource == 'pvcs': - thread = TimedThread(resource, self.data_resend_interval, exit_flag, - daemon_object=self, daemon_method='watch_data') - self.manage_threads.append(thread) - thread.start() - # additional looping data threads - elif resource == 'services': + self.data.setdefault(resource, K8sResourceManager(resource, + apis=self.apis, + zabbix_host=self.zabbix_host, + config=self.config)) + if resource == "pods": + # additional containers coming from pods + self.data.setdefault("containers", K8sResourceManager("containers", + config=self.config)) + + if resource in ['containers', 'services']: thread = TimedThread(resource, self.data_resend_interval, exit_flag, daemon_object=self, daemon_method='report_global_data_zabbix', + delay_first_run=True, delay_first_run_seconds=self.discovery_interval + 5) self.manage_threads.append(thread) thread.start() - elif resource == 'containers': + elif resource in ['components', 'pvcs']: thread = TimedThread(resource, self.data_resend_interval, exit_flag, - daemon_object=self, daemon_method='report_global_data_zabbix', - delay_first_run_seconds=self.discovery_interval + 5) + daemon_object=self, daemon_method='watch_data') self.manage_threads.append(thread) thread.start() else: @@ -214,10 +216,14 @@ def start_api_info_threads(self) -> None: def start_loop_send_discovery_threads(self) -> None: for resource in self.resources: + if resource == 'containers': + # skip containers as discovery is done by pods + continue + send_discovery_thread = TimedThread(resource, self.discovery_interval, exit_flag, - daemon_object=self, daemon_method='send_zabbix_discovery', + daemon_object=self, daemon_method='update_discovery', delay_first_run=True, - delay_first_run_seconds=30) + delay_first_run_seconds=self.config.discovery_interval_delay) self.manage_threads.append(send_discovery_thread) send_discovery_thread.start() @@ -226,7 +232,7 @@ def start_resend_threads(self) -> None: resend_thread = TimedThread(resource, self.data_resend_interval, exit_flag, daemon_object=self, daemon_method='resend_data', delay_first_run=True, - delay_first_run_seconds=60, + delay_first_run_seconds=self.config.data_resend_interval_delay, ) self.manage_threads.append(resend_thread) resend_thread.start() @@ -248,7 +254,7 @@ def get_web_api(self) -> WebApi: return self._web_api def watch_data(self, resource: str) -> None: - api = self.get_api_for_resource(resource) + api = self.data[resource].api stream_named_arguments = {"timeout_seconds": self.config.k8s_api_stream_timeout_seconds} request_named_arguments = {"_request_timeout": self.config.k8s_api_request_timeout_seconds} self.logger.info( @@ -271,7 +277,7 @@ def watch_data(self, resource: str) -> None: elif resource == 'statefulsets': for obj in w.stream(api.list_stateful_set_for_all_namespaces, **stream_named_arguments): self.watch_event_handler(resource, obj) - elif resource == 'components': + elif resource == "components": # The api does not support watching on component status with self.thread_lock: for obj in api.list_component_status(watch=False, **request_named_arguments).to_dict().get('items'): @@ -324,21 +330,13 @@ def watch_event_handler(self, resource: str, event: dict) -> None: self.logger.error('Could not add watch_event_handler! No resource_class for "%s"' % resource) return - if event_type.lower() == 'added': + if event_type.lower() in ['added', 'modified']: with self.thread_lock: resourced_obj = self.data[resource].add_obj_from_data(obj) - - if resourced_obj and (resourced_obj.is_dirty_zabbix or resourced_obj.is_dirty_web): - self.send_object(resource, resourced_obj, event_type, - send_zabbix_data=resourced_obj.is_dirty_zabbix, - send_web=resourced_obj.is_dirty_web) - elif event_type.lower() == 'modified': - with self.thread_lock: - resourced_obj = self.data[resource].add_obj_from_data(obj) - if resourced_obj and (resourced_obj.is_dirty_zabbix or resourced_obj.is_dirty_web): - self.send_object(resource, resourced_obj, event_type, - send_zabbix_data=resourced_obj.is_dirty_zabbix, - send_web=resourced_obj.is_dirty_web) + if resourced_obj and (resourced_obj.is_dirty_zabbix or resourced_obj.is_dirty_web): + self.send_object(resource, resourced_obj, event_type, + send_zabbix_data=resourced_obj.is_dirty_zabbix, + send_web=resourced_obj.is_dirty_web) elif event_type.lower() == 'deleted': with self.thread_lock: resourced_obj = self.data[resource].del_obj(obj) @@ -346,22 +344,23 @@ def watch_event_handler(self, resource: str, event: dict) -> None: self.delete_object(resource, resourced_obj) else: self.logger.info('event type "%s" not implemented' % event_type) + self.logger.debug(f'watch_event_handler[{resource}] finished') def report_global_data_zabbix(self, resource: str) -> None: """ aggregate and report information for some speciality in resources """ if resource not in self.discovery_sent: - self.logger.debug('skipping report_global_data_zabbix for %s, discovery not send yet!' % resource) + self.logger.info('skipping report_global_data_zabbix for %s, discovery not send yet!' % resource) return data_to_send = list() - if resource == 'services': + if resource == "services": num_services = 0 num_ingress_services = 0 with self.thread_lock: for obj_uid, resourced_obj in self.data[resource].objects.items(): num_services += 1 - if resourced_obj.resource_data['is_ingress']: + if resourced_obj.resource_data["is_ingress"]: num_ingress_services += 1 data_to_send.append( @@ -372,47 +371,47 @@ def report_global_data_zabbix(self, resource: str) -> None: str(num_ingress_services))) self.send_data_to_zabbix(resource, None, data_to_send) - # TODO: disabled, rewrite later - # elif resource == 'containers': - # # aggregate pod data to containers for each namespace - # with self.thread_lock: - # containers: dict[str, dict[str, dict[str, object]]] = dict() - # for obj_uid, resourced_obj in self.data['pods'].objects.items(): - # if resourced_obj.name_space is None: - # continue - # if not isinstance(resourced_obj, Pod): - # continue - # ns = resourced_obj.name_space - # containers.setdefault(resourced_obj.name_space, dict()) - # - # pod_data = resourced_obj.resource_data - # pod_base_name = resourced_obj.base_name - # try: - # container_status: dict[str, dict[str, object]] = json.loads(pod_data['container_status']) - # except Exception as e: - # self.logger.error(e) - # continue - # - # # aggregate container information - # for container_name, container_data in container_status.items(): - # containers[ns].setdefault(pod_base_name, dict()) - # containers[ns][pod_base_name].setdefault(container_name, container_data) - # - # for k, v in containers[ns][pod_base_name][container_name].items(): - # if isinstance(v, int): - # containers[ns][pod_base_name][container_name][k] += container_data[k] - # elif k == 'status' and container_data[k].startswith('ERROR'): - # containers[ns][pod_base_name][container_name][k] = container_data[k] - # - # for ns, d1 in containers.items(): - # for pod_base_name, d2 in d1.items(): - # for container_name, container_data in d2.items(): - # data_to_send += get_container_zabbix_metrics(self.zabbix_host, ns, pod_base_name, - # container_name, container_data) - # - # self.send_data_to_zabbix(resource, None, data_to_send) + elif resource == "containers": + # aggregate pod data to containers for each namespace + with self.thread_lock: + containers = dict() + for obj_uid, resourced_obj in self.data["pods"].objects.items(): + ns = resourced_obj.name_space + if ns not in containers: + containers[ns] = dict() + + pod_data = resourced_obj.resource_data + pod_base_name = resourced_obj.base_name + try: + container_status = json.loads(pod_data["container_status"]) + except Exception as e: + self.logger.error(e) + continue + + # aggregate container information + for container_name, container_data in container_status.items(): + containers[ns].setdefault(pod_base_name, dict()) + if container_name not in containers[ns][pod_base_name]: + containers[ns][pod_base_name].setdefault(container_name, container_data) + else: + for k, v in containers[ns][pod_base_name][container_name].items(): + if isinstance(v, int): + containers[ns][pod_base_name][container_name][k] += container_data[k] + elif k == "status" and container_data[k].startswith("ERROR"): + containers[ns][pod_base_name][container_name][k] = container_data[k] + # self.logger.debug("%s %s %s" % (resourced_obj.name, container_name, containers[ns][pod_base_name][container_name])) + for ns, d1 in containers.items(): + for pod_base_name, d2 in d1.items(): + for container_name, container_data in d2.items(): + data_to_send += get_container_zabbix_metrics( + self.zabbix_host, ns, pod_base_name, container_name, container_data + ) + + self.send_data_to_zabbix(resource, None, data_to_send) def resend_data(self, resource: str) -> None: + if resource == 'containers': + return with self.thread_lock: try: @@ -424,11 +423,15 @@ def resend_data(self, resource: str) -> None: # Zabbix for obj_uid, obj in self.data[resource].objects.items(): zabbix_send = False - if resource in self.discovery_sent: - zabbix_send = True + if resource in self.discovery_sent and obj.added > self.discovery_sent[resource]: + self.logger.info( + f'skipping resend of {obj}, resource {resource} discovery_sent "{self.discovery_sent[resource].isoformat()}"' + f' is older than {obj.added.isoformat()}') elif obj.last_sent_zabbix < (datetime.now() - timedelta(seconds=self.data_resend_interval)): - self.logger.debug("resend zabbix : %s - %s/%s data because its outdated" % ( - resource, obj.name_space, obj.name)) + self.logger.debug( + "resend zabbix : %s - %s/%s data because its outdated" + % (resource, obj.name_space, obj.name) + ) zabbix_send = True if zabbix_send: metrics += obj.get_zabbix_metrics() @@ -437,8 +440,9 @@ def resend_data(self, resource: str) -> None: if len(metrics) > 0: if resource not in self.discovery_sent: self.logger.debug( - 'skipping resend_data zabbix , discovery for %s - %s/%s not sent yet!' % ( - resource, obj.name_space, obj.name)) + "skipping resend_data zabbix , discovery for %s - %s/%s not sent yet!" + % (resource, obj.name_space, obj.name) + ) else: self.send_data_to_zabbix(resource, metrics=metrics) @@ -446,14 +450,14 @@ def resend_data(self, resource: str) -> None: for obj_uid, obj in self.data[resource].objects.items(): if obj.is_dirty_web: if obj.is_unsubmitted_web(): - self.send_to_web_api(resource, obj, 'ADDED') + self.send_to_web_api(resource, obj, "ADDED") else: - self.send_to_web_api(resource, obj, 'MODIFIED') + self.send_to_web_api(resource, obj, "MODIFIED") else: if obj.is_unsubmitted_web(): - self.send_to_web_api(resource, obj, 'ADDED') + self.send_to_web_api(resource, obj, "ADDED") elif obj.last_sent_web < (datetime.now() - timedelta(seconds=self.data_resend_interval)): - self.send_to_web_api(resource, obj, 'MODIFIED') + self.send_to_web_api(resource, obj, "MODIFIED") self.logger.debug("resend web : %s/%s data because its outdated" % (resource, obj.name)) obj.last_sent_web = datetime.now() obj.is_dirty_web = False @@ -461,56 +465,87 @@ def resend_data(self, resource: str) -> None: self.logger.warning(str(e)) def delete_object(self, resource_type: str, resourced_obj: K8sObject) -> None: - # TODO: trigger zabbix discovery, srsly? self.send_to_web_api(resource_type, resourced_obj, "deleted") + def update_discovery(self, resource: str) -> None: + """ Update elements on hold and send to zabbix """ + resource_obj = self.data[resource].resource_meta + with (self.thread_lock): + self.logger.debug(f"update_discovery[{resource}]: got thread_lock") + if resource in self.data_refreshed \ + and self.data_refreshed[resource] < (datetime.now() - timedelta(seconds=self.data_refresh_interval)) \ + or resource not in self.data_refreshed: + obj_uid_list, obj_data_list = resource_obj.get_uid_list_and_data() + obj_uid_list_len = len(obj_uid_list) + self.logger.info(f"refreshing [{resource}] uid_list + data and check for orphans: {obj_uid_list_len}") + if resource in self.data_refreshed: + self.logger.info(f"last refresh: {self.data_refreshed[resource]}") + + # copy dict to delete in it + for obj_uid in self.data[resource].objects.copy(): + if obj_uid not in obj_uid_list: + self.logger.info(f"NOT finding [{resource}]{obj_uid} anymore -> removing") + self.data[resource].del_obj(obj_uid) + else: + # update obj information + self.data[resource].add_obj(obj_data_list[obj_uid]) + + self.data_refreshed[resource] = datetime.now() + self.send_zabbix_discovery(resource) + def send_zabbix_discovery(self, resource: str) -> None: # aggregate data and send to zabbix - self.logger.info(f"send_zabbix_discovery: {resource}") - with self.thread_lock: - if resource not in self.data: - self.logger.warning('send_zabbix_discovery: resource "%s" not in self.data... skipping!' % resource) - return + next_run = datetime.now() + timedelta(seconds=self.discovery_interval) + self.logger.info(f"send_zabbix_discovery: {resource}, next run: {next_run.isoformat()}") - data = list() - for obj_uid, obj in self.data[resource].objects.items(): - data += obj.get_zabbix_discovery_data() + if resource not in self.data: + self.logger.warning('send_zabbix_discovery: resource "%s" not in self.data... skipping!' % resource) + return - if data: - metric = obj.get_discovery_for_zabbix(data) - self.logger.debug('send_zabbix_discovery: resource "%s": %s' % (resource, metric)) - self.send_discovery_to_zabbix(resource, metric=metric) - else: - self.logger.warning('send_zabbix_discovery: resource "%s" has no discovery data' % resource) + data = list() + for obj_uid, obj in self.data[resource].objects.items(): + data += obj.get_zabbix_discovery_data() - self.discovery_sent[resource] = datetime.now() + if data: + metric = obj.get_discovery_for_zabbix(data) + self.logger.debug('send_zabbix_discovery: resource "%s": %s' % (resource, metric)) + self.send_discovery_to_zabbix(resource, metric=metric) + else: + self.logger.warning('send_zabbix_discovery: resource "%s" has no discovery data' % resource) + + self.discovery_sent[resource] = datetime.now() + if resource == 'pods' and self.config.container_crawling == 'container': + self.discovery_sent['containers'] = datetime.now() def send_object(self, resource: str, resourced_obj: K8sObject, event_type: str, send_zabbix_data: bool = False, send_web: bool = False) -> None: # send single object for updates - with self.thread_lock: - if send_zabbix_data: - if resourced_obj.last_sent_zabbix < datetime.now() - timedelta(seconds=self.rate_limit_seconds): - self.send_data_to_zabbix(resource, obj=resourced_obj) - resourced_obj.last_sent_zabbix = datetime.now() - resourced_obj.is_dirty_zabbix = False - else: - self.logger.debug('obj >>>type: %s, name: %s/%s<<< not sending to zabbix! rate limited (%is)' % ( - resource, resourced_obj.name_space, resourced_obj.name, self.rate_limit_seconds)) - resourced_obj.is_dirty_zabbix = True - - if send_web: - if resourced_obj.last_sent_web < datetime.now() - timedelta(seconds=self.rate_limit_seconds): - self.send_to_web_api(resource, resourced_obj, event_type) - resourced_obj.last_sent_web = datetime.now() - if resourced_obj.is_dirty_web is True and not send_zabbix_data: - # only set dirty False if send_to_web_api worked - resourced_obj.is_dirty_web = False - else: - self.logger.debug('obj >>>type: %s, name: %s/%s<<< not sending to web! rate limited (%is)' % ( - resource, resourced_obj.name_space, resourced_obj.name, self.rate_limit_seconds)) - resourced_obj.is_dirty_web = True + if send_zabbix_data: + if resourced_obj.last_sent_zabbix < datetime.now() - timedelta(seconds=self.rate_limit_seconds): + self.send_data_to_zabbix(resource, obj=resourced_obj) + resourced_obj.last_sent_zabbix = datetime.now() + resourced_obj.is_dirty_zabbix = False + else: + self.logger.debug( + "obj >>>type: %s, name: %s/%s<<< not sending to zabbix! rate limited (%is)" + % (resource, resourced_obj.name_space, resourced_obj.name, self.rate_limit_seconds) + ) + resourced_obj.is_dirty_zabbix = True + + if send_web: + if resourced_obj.last_sent_web < datetime.now() - timedelta(seconds=self.rate_limit_seconds): + self.send_to_web_api(resource, resourced_obj, event_type) + resourced_obj.last_sent_web = datetime.now() + if resourced_obj.is_dirty_web is True and not send_zabbix_data: + # only set dirty False if send_to_web_api worked + resourced_obj.is_dirty_web = False + else: + self.logger.debug( + "obj >>>type: %s, name: %s/%s<<< not sending to web! rate limited (%is)" + % (resource, resourced_obj.name_space, resourced_obj.name, self.rate_limit_seconds) + ) + resourced_obj.is_dirty_web = True def send_heartbeat_info(self, resource: str) -> None: result = self.send_to_zabbix([ @@ -540,7 +575,7 @@ def send_to_zabbix(self, metrics: list[ZabbixMetric]) -> ZabbixResponse | DryRes self.logger.info('===> Sending to zabbix: >>>%s<<<' % metrics) return result - def send_discovery_to_zabbix(self, resource: str, metric: ZabbixMetric = None, + def send_discovery_to_zabbix(self, resource: str, metric: ZabbixMetric | list = None, obj: K8sObject | None = None) -> None: if resource not in self.zabbix_resources: self.logger.warning( @@ -556,20 +591,35 @@ def send_discovery_to_zabbix(self, resource: str, metric: ZabbixMetric = None, discovery_key = 'check_kubernetesd[discover,' + resource + ']' result = self.send_to_zabbix([ZabbixMetric(host=self.zabbix_host, key=discovery_key, value=discovery_data)]) if result.failed > 0: - self.logger.error("failed to sent zabbix discovery: %s : >>>%s<<<" % (discovery_key, discovery_data)) + self.logger.error("failed to send zabbix discovery: %s : >>>%s<<<" % (discovery_key, discovery_data)) elif self.zabbix_debug: self.logger.info("successfully sent zabbix discovery: %s >>>>%s<<<" % (discovery_key, discovery_data)) elif metric: - result = self.send_to_zabbix([metric]) + if isinstance(metric, list): + result = self.send_to_zabbix(metric) + else: + result = self.send_to_zabbix([metric]) if result.failed > 0: - self.logger.error("failed to sent mass zabbix discovery: >>>%s<<<" % metric) + self.logger.error("failed to send mass zabbix discovery: >>>%s<<<" % metric) elif self.zabbix_debug: self.logger.info("successfully sent mass zabbix discovery: >>>%s<<<" % metric) else: - self.logger.warning('No obj or metrics found for send_discovery_to_zabbix [%s]' % resource) + self.logger.warning("No obj or metrics found for send_discovery_to_zabbix [%s]" % resource) def send_data_to_zabbix(self, resource: str, obj: K8sObject | None = None, metrics: list[ZabbixMetric] | None = None) -> None: + + if resource not in self.discovery_sent: + self.logger.info('skipping send_data_to_zabbix for %s, discovery not send yet!' % resource) + return + elif obj and obj.added > self.discovery_sent[resource]: + self.logger.info( + f'skipping send of {obj}, resource {resource} discovery_sent "{self.discovery_sent[resource]}" ' + f'is older than obj: {obj.added.isoformat()}') + return + else: + self.logger.info(f'sending data for "{resource}" to zabbix') + if metrics is None: metrics = list() if resource not in self.zabbix_resources: @@ -579,28 +629,33 @@ def send_data_to_zabbix(self, resource: str, obj: K8sObject | None = None, metrics = obj.get_zabbix_metrics() if len(metrics) == 0 and obj: - self.logger.debug('No zabbix metrics to send for %s: %s' % (obj.uid, metrics)) + self.logger.debug("No zabbix metrics to send for %s: %s" % (obj.uid, metrics)) return elif len(metrics) == 0: - self.logger.debug('No zabbix metrics or no obj found for [%s]' % resource) + self.logger.debug("No zabbix metrics or no obj found for [%s]" % resource) return if self.zabbix_single_debug: for metric in metrics: result = self.send_to_zabbix([metric]) + self.logger.debug("Failed metrics: %s" % (result)) if result.failed > 0: - self.logger.error("failed to sent zabbix items: %s", metric) + self.logger.error("failed to send zabbix items: %s", metric) else: self.logger.info("successfully sent zabbix items: %s", metric) else: result = self.send_to_zabbix(metrics) if result.failed > 0: - self.logger.error("failed to sent %s zabbix items, processed %s items [%s: %s]" - % (result.failed, result.processed, resource, obj.name if obj else 'metrics')) - self.logger.debug(metrics) + self.logger.error( + "failed to send %s zabbix items, processed %s items [%s: %s]" + % (result.failed, result.processed, resource, obj.name if obj else "metrics") + ) + self.logger.debug("Result: %s" % (result)) else: - self.logger.debug("successfully sent %s zabbix items [%s: %s]" % ( - len(metrics), resource, obj.name if obj else 'metrics')) + self.logger.debug( + "successfully sent %s zabbix items [%s: %s]" + % (len(metrics), resource, obj.name if obj else "metrics") + ) def send_to_web_api(self, resource: str, obj: K8sObject, action: str) -> None: if resource not in self.web_api_resources: @@ -609,7 +664,7 @@ def send_to_web_api(self, resource: str, obj: K8sObject, action: str) -> None: if self.web_api_enable: api = self.get_web_api() data_to_send = obj.resource_data - data_to_send['cluster'] = self.web_api_cluster + data_to_send["cluster"] = self.web_api_cluster api.send_data(resource, data_to_send, action) else: diff --git a/base/timed_threads.py b/base/timed_threads.py index 78a8959..6155dd5 100644 --- a/base/timed_threads.py +++ b/base/timed_threads.py @@ -1,7 +1,6 @@ import logging import threading import time - from typing import TYPE_CHECKING if TYPE_CHECKING: @@ -13,7 +12,6 @@ class TimedThread(threading.Thread): restart_thread = False daemon = True - # TODO: change default of delay_first_run_seconds to 120 seconds def __init__(self, resource: str, interval: int, exit_flag: threading.Event, daemon_object: 'CheckKubernetesDaemon', @@ -28,7 +26,7 @@ def __init__(self, resource: str, interval: int, self.delay_first_run = delay_first_run self.delay_first_run_seconds = delay_first_run_seconds threading.Thread.__init__(self, target=self.run) - self.logger = logging.getLogger(__file__) + self.logger = logging.getLogger("k8s-zabbix") def stop(self) -> None: self.logger.info('OK: Thread "' + self.resource + '" is stopping"') @@ -36,6 +34,7 @@ def stop(self) -> None: def run(self) -> None: # manage first run + self.logger.info('[start thread|timed] %s -> %s' % (self.resource, self.daemon_method)) if self.delay_first_run: self.logger.info( '%s -> %s | delaying first run by %is [interval %is]' % @@ -47,6 +46,11 @@ def run(self) -> None: self.run_requests(first_run=True) except Exception as e: self.logger.exception(e) + else: + try: + self.run_requests(first_run=True) + except Exception as e: + self.logger.exception(e) # manage timed runs while not self.exit_flag.wait(self.cycle_interval_seconds): @@ -60,7 +64,7 @@ def run(self) -> None: ) time.sleep(self.cycle_interval_seconds) - self.logger.info('terminating looprun thread %s.%s' % (self.resource, self.daemon_method)) + self.logger.info("terminating looprun thread %s.%s" % (self.resource, self.daemon_method)) def run_requests(self, first_run: bool = False) -> None: if first_run: diff --git a/base/watcher_thread.py b/base/watcher_thread.py index bf3ace8..ee9069e 100644 --- a/base/watcher_thread.py +++ b/base/watcher_thread.py @@ -1,10 +1,9 @@ import logging import threading +from typing import TYPE_CHECKING from urllib3.exceptions import ProtocolError -from typing import TYPE_CHECKING - if TYPE_CHECKING: from base.daemon_thread import CheckKubernetesDaemon @@ -22,7 +21,7 @@ def __init__(self, resource: str, exit_flag: threading.Event, self.daemon_object = daemon_object self.daemon_method = daemon_method threading.Thread.__init__(self, target=self.run) - self.logger = logging.getLogger(__file__) + self.logger = logging.getLogger("k8s-zabbix") def stop(self) -> None: self.logger.info('OK: Thread "' + self.resource + '" is stopping"') @@ -33,5 +32,6 @@ def run(self) -> None: try: getattr(self.daemon_object, self.daemon_method)(self.resource) except (ProtocolError, ConnectionError) as e: - self.logger.error(e) + self.logger.error("[exception thread|watch] %s -> %s: %s" % (self.resource, self.daemon_method, str(e))) + # self.logger.error(e) self.restart_thread = True diff --git a/base/web_api.py b/base/web_api.py index a5c8216..43a9a63 100644 --- a/base/web_api.py +++ b/base/web_api.py @@ -1,9 +1,10 @@ -import requests import logging +import requests + from k8sobjects.k8sobject import K8S_RESOURCES -logger = logging.getLogger(__file__) +logger = logging.getLogger("k8s-zabbix") class WebApi: @@ -15,12 +16,12 @@ def __init__(self, api_host: str, api_token: str, verify_ssl: bool = True): url = self.get_url() r = requests.head(url) if r.status_code in [301, 302]: - self.api_host = r.headers['location'] + self.api_host = r.headers["location"] def get_headers(self): return { - 'Authorization': self.api_token, - 'User-Agent': 'k8s-zabbix agent', + "Authorization": self.api_token, + "User-Agent": "k8s-zabbix agent", } def get_url(self, resource=None, path_append=""): @@ -29,22 +30,22 @@ def get_url(self, resource=None, path_append=""): api_resource = K8S_RESOURCES[resource] url = self.api_host - if not url.endswith('/'): - url += '/' + if not url.endswith("/"): + url += "/" if not api_resource: return url - return url + api_resource + '/' + path_append + return url + api_resource + "/" + path_append def send_data(self, resource: str, data: dict[str, str], action: str) -> None: path_append = "" - if action.lower() == 'added': + if action.lower() == "added": func = requests.post - elif action.lower() == 'modified': + elif action.lower() == "modified": func = requests.put - elif action.lower() == 'deleted': + elif action.lower() == "deleted": func = requests.delete - if 'name_space' in data and data["name_space"]: + if "name_space" in data and data["name_space"]: path_append = "%s/%s/%s/" % ( data["cluster"], data["name_space"], @@ -62,16 +63,16 @@ def send_data(self, resource: str, data: dict[str, str], action: str) -> None: url = self.get_url(resource, path_append) # empty variables are NOT sent! - r = func(url, - data=data, - headers=self.get_headers(), - verify=self.verify_ssl, - allow_redirects=True) + r = func(url, data=data, headers=self.get_headers(), verify=self.verify_ssl, allow_redirects=True) if r.status_code > 399: - logger.warning('%s [%s] %s sended %s but failed data >>>%s<<< (%s)' % ( - self.api_host, r.status_code, url, resource, data, action)) + logger.warning( + "%s [%s] %s sended %s but failed data >>>%s<<< (%s)" + % (self.api_host, r.status_code, url, resource, data, action) + ) logger.warning(r.text) else: - logger.debug('%s [%s] %s sucessfully sended %s >>>%s<<< (%s)' % ( - self.api_host, r.status_code, url, resource, data, action)) + logger.debug( + "%s [%s] %s sucessfully sended %s >>>%s<<< (%s)" + % (self.api_host, r.status_code, url, resource, data, action) + ) diff --git a/build.sh b/build.sh index 8f0061c..8a3a162 100755 --- a/build.sh +++ b/build.sh @@ -3,23 +3,23 @@ #################################################################### ## Helpers -notice(){ - echo -e "\e[1;32m$1\e[0m" +notice() { + echo -e "\e[1;32m$1\e[0m" } # Parameter: # 1: cmd # Execute simple shell command, exit if errorcode of shell command != 0 -exec_cmd(){ - local CMD="$1" - echo "+ $CMD" - eval "$CMD 2>&1" - local RET="$?" - if [ "$RET" != "0" ];then - echo "ERROR: execution failed (returncode $RET)" - exit 2 - fi - return 0 +exec_cmd() { + local CMD="$1" + echo "+ $CMD" + eval "$CMD 2>&1" + local RET="$?" + if [ "$RET" != "0" ]; then + echo "ERROR: execution failed (returncode $RET)" + exit 2 + fi + return 0 } get_env(){ @@ -43,21 +43,21 @@ BDIR="$(dirname $(readlink -f $0))" cd $BDIR || exit 1 # PHASES -build_image(){ - if [ -z "$VERSION" ];then - echo "ERROR: no git release tag available" - exit 1 - fi - if [ "$DOCKER_SQUASH" == "true" ];then - SQUASH_OPT="--squash" - notice "Squashing of image is enabled, you can disable that by 'export DOCKER_SQUASH=false'" - else - SQUASH_OPT="" - fi - - exec_cmd "docker build $SQUASH_OPT -t ${IMAGE_BASE} -f Dockerfile ." - SIZE="$(docker inspect $IMAGE_BASE --format='{{.Size}}')" - notice "Image size $(( $SIZE / 1024 / 1024 ))MB" +build_image() { + if [ -z "$VERSION" ]; then + echo "ERROR: no git release tag available" + exit 1 + fi + if [ "$DOCKER_SQUASH" == "true" ]; then + SQUASH_OPT="--squash" + notice "Squashing of image is enabled, you can disable that by 'export DOCKER_SQUASH=false'" + else + SQUASH_OPT="" + fi + + exec_cmd "docker build $SQUASH_OPT -t ${IMAGE_BASE} -f Dockerfile ." + SIZE="$(docker inspect $IMAGE_BASE --format='{{.Size}}')" + notice "Image size $(($SIZE / 1024 / 1024))MB" } test_container(){ @@ -79,44 +79,42 @@ inspect(){ exec_cmd "docker run -ti --rm $(get_env) --name $IDENT ${IMAGE_BASE} /bin/sh" } - -cleanup(){ - exec_cmd "rm -rf /tmp/${IMAGE_NAME}*" - exec_cmd "docker rmi ${IMAGE_NAME} --force" +cleanup() { + exec_cmd "rm -rf /tmp/${IMAGE_NAME}*" + exec_cmd "docker rmi ${IMAGE_NAME} --force" } -docu(){ - exec_cmd "template/create_template_documentation" +docu() { + exec_cmd "template/create_template_documentation" } -publish_image(){ - TIMESTAMP="$(date --date="today" "+%Y-%m-%d_%H-%M-%S")" - exec_cmd "docker tag ${IMAGE_REPO}/${IMAGE_NAME}:${VERSION} ${IMAGE_REPO}/${IMAGE_NAME}:${VERSION}" - exec_cmd "docker push ${IMAGE_REPO}/${IMAGE_NAME}:${VERSION}" - exec_cmd "docker tag ${IMAGE_REPO}/${IMAGE_NAME}:${VERSION} ${IMAGE_REPO}/${IMAGE_NAME}:latest" - exec_cmd "docker push ${IMAGE_REPO}/${IMAGE_NAME}:latest" +publish_image() { + TIMESTAMP="$(date --date="today" "+%Y-%m-%d_%H-%M-%S")" + exec_cmd "docker tag ${IMAGE_REPO}/${IMAGE_NAME}:${VERSION} ${IMAGE_REPO}/${IMAGE_NAME}:${VERSION}" + exec_cmd "docker push ${IMAGE_REPO}/${IMAGE_NAME}:${VERSION}" + exec_cmd "docker tag ${IMAGE_REPO}/${IMAGE_NAME}:${VERSION} ${IMAGE_REPO}/${IMAGE_NAME}:latest" + exec_cmd "docker push ${IMAGE_REPO}/${IMAGE_NAME}:latest" } -display_hint(){ - notice "CMD:" - echo - echo "$0 .. " - echo - notice "AVAILABLE PHASES:" - echo " - default" - echo " ($DEFAULT_PHASES)" - echo " - inspect" - for PHASE in $DEFAULT_PHASES; do - echo " - $PHASE " - done - echo " - publish_image (optional)" - echo " - inspect (optional)" +display_hint() { + notice "CMD:" + echo + echo "$0 .. " + echo + notice "AVAILABLE PHASES:" + echo " - default" + echo " ($DEFAULT_PHASES)" + echo " - inspect" + for PHASE in $DEFAULT_PHASES; do + echo " - $PHASE " + done + echo " - publish_image (optional)" + echo " - inspect (optional)" } - -if [ ${#@} -lt 2 ];then - display_hint - exit 2 +if [ ${#@} -lt 2 ]; then + display_hint + exit 2 fi IMAGE_REPO="${@: -1}" @@ -126,26 +124,25 @@ if type $IMAGE_REPO &>/dev/null;then fi PHASES="" -for arg in "${@:1:$(( ${#@} - 1 ))}"; do - if [ "$arg" = "default" ];then - PHASES="$PHASES $DEFAULT_PHASES" - else - PHASES="$PHASES $arg" - fi +for arg in "${@:1:$((${#@} - 1))}"; do + if [ "$arg" = "default" ]; then + PHASES="$PHASES $DEFAULT_PHASES" + else + PHASES="$PHASES $arg" + fi done IMAGE_NAME="k8s-zabbix" IMAGE_BASE="${IMAGE_REPO}/${IMAGE_NAME}:${VERSION}" -for PHASE in $PHASES; -do - if ( type "$PHASE" >/dev/null 2>&1 );then - notice "INFO: PHASE >>>$PHASE<<< for $IMAGE_BASE" - $PHASE - else - notice "ERROR: no such phase : $PHASE" - exit 1 - fi +for PHASE in $PHASES; do + if (type "$PHASE" >/dev/null 2>&1); then + notice "INFO: PHASE >>>$PHASE<<< for $IMAGE_BASE" + $PHASE + else + notice "ERROR: no such phase : $PHASE" + exit 1 + fi done #SIZE="$(docker inspect $IMAGE_BASE --format='{{.Size}}')" diff --git a/check_kubernetesd b/check_kubernetesd index 591a8e4..cd6210d 100755 --- a/check_kubernetesd +++ b/check_kubernetesd @@ -10,23 +10,21 @@ import logging import os import signal import sys +import time from typing import List import coloredlogs as coloredlogs import sentry_sdk -import time -from base.daemon_thread import CheckKubernetesDaemon from base.config import Configuration +from base.daemon_thread import CheckKubernetesDaemon -KNOWN_ACTIONS = ['discover', 'get'] +KNOWN_ACTIONS = ["discover", "get"] formatter_string = '%(asctime)s - %(threadName)s : {%(name)s:%(lineno)d} : %(levelname)s : %(message)s' -formatter = logging.Formatter(formatter_string) -stream = logging.StreamHandler(sys.stdout) -stream.setFormatter(formatter) -logger = logging.getLogger(__file__) +logger = logging.getLogger("k8s-zabbix") + if __name__ == '__main__': parser = argparse.ArgumentParser( @@ -45,8 +43,6 @@ if __name__ == '__main__': "binary with a fully qualified file path") args = parser.parse_args() - coloredlogs.install(fmt=formatter_string, isatty=(not args.disable_colors)) - config = Configuration() if args.ini_file: @@ -72,51 +68,71 @@ if __name__ == '__main__': sentry_sdk.init(config.sentry_dsn) if config.zabbix_debug: - logger.info("starting with zabbix debug") config.zabbix_debug = True - log = logging.getLogger('pyzabbix') + log = logging.getLogger("pyzabbix") log.setLevel(logging.DEBUG) else: - log = logging.getLogger('pyzabbix') + log = logging.getLogger("pyzabbix") log.setLevel(logging.INFO) if config.debug: - stream.setLevel(logging.DEBUG) logger.setLevel(logging.DEBUG) else: logger.setLevel(logging.INFO) - logger.addHandler(stream) + + coloredlogs.install(level=logging.getLevelName(logger.getEffectiveLevel()), + fmt=formatter_string, + isatty=(not args.disable_colors)) + + if config.debug: + logger.debug("Starting with DEBUG enabled") + if config.zabbix_debug: + logger.info("starting with zabbix debug") + if config.zabbix_dry_run: + logger.warning("starting with zabbix_dry_run True, not sending to Zabbix!") daemons: List[CheckKubernetesDaemon] = list() mgmt_daemon = CheckKubernetesDaemon(config, ['nodes'], - config.discovery_interval_slow, config.resend_data_interval_slow) + config.discovery_interval_slow, + config.resend_data_interval_slow, + config.data_refresh_interval) daemons.append(mgmt_daemon) daemons.append( CheckKubernetesDaemon(config, - # ['components', 'services', 'pvcs'], - ['components', 'pvcs'], - config.discovery_interval_slow, config.resend_data_interval_fast)) + ['components', 'services', 'pvcs'], + # ['components', 'pvcs'], + config.discovery_interval_slow, + config.resend_data_interval_fast, + config.data_refresh_interval)) + + daemons.append( + CheckKubernetesDaemon(config, + # ['deployments', 'statefulsets', 'daemonsets', 'pods', 'containers', + ["statefulsets", "deployments", "daemonsets"], + config.discovery_interval_slow, + config.resend_data_interval_slow, + config.data_refresh_interval)) daemons.append( CheckKubernetesDaemon(config, # ['deployments', 'statefulsets', 'daemonsets', 'pods', 'containers', # 'ingresses', 'containers', 'pods'], - ['deployments', 'statefulsets', "daemonsets", "pods"], - config.discovery_interval_slow, config.resend_data_interval_slow)) + ["pods", "containers"], + config.discovery_interval_fast, + config.resend_data_interval_fast, + config.data_refresh_interval)) if config.debug_k8s_events: for daemon in daemons: daemon.debug_k8s_events = True - # SIGNAL processing def _signal_handler(signum, *args): mgmt_daemon.handler(signum) - def stacktraces_and_terminate(signum, frame): print("#" * 80) print("# Threadump") @@ -125,7 +141,6 @@ if __name__ == '__main__': print("#" * 80) sys.exit(1) - signal.signal(signal.SIGQUIT, stacktraces_and_terminate) signal.signal(signal.SIGUSR1, _signal_handler) signal.signal(signal.SIGUSR2, _signal_handler) diff --git a/config_default.ini b/config_default.ini new file mode 100644 index 0000000..6beba07 --- /dev/null +++ b/config_default.ini @@ -0,0 +1,42 @@ +k8s_config_type = token +#k8s_api_host = https://example.kube-apiserver.com +#k8s_api_token = "" + +verify_ssl = True +debug = False +debug_k8s_events = False +# resources_exclude = components, statefulsets, daemonsets, nodes, services, pvcs, deployments +#resources_exclude = +namespace_exclude_re = ^\d\d\d\d$ + +container_crawling = container + +sentry_enabled = False +sentry_dsn = "" + +zabbix_server = example.zabbix-server.com +#zabbix_resources_exclude = components, statefulsets, daemonsets, nodes +zabbix_resources_exclude = +zabbix_host = k8s-example-host +zabbix_debug = False +zabbix_single_debug = False +zabbix_dry_run = False + +web_api_enable = False +web_api_resources_exclude = daemonsets, components, services, statefulsets +web_api_verify_ssl = True +web_api_host = https://example.api.com/api/v1/k8s +web_api_token = +web_api_cluster = k8s-test-cluster + +discovery_interval_fast = 900 +resend_data_interval_fast = 120 + +discovery_interval_slow = 7200 +resend_data_interval_slow = 1800 + +discovery_interval_delay = 120 +data_resend_interval_delay = 180 + +# force refresh with disc +data_refresh_interval = 14400 diff --git a/documentation/template/custom_service_kubernetes.html b/documentation/template/custom_service_kubernetes.html index c3e9c35..f3bfb62 100644 --- a/documentation/template/custom_service_kubernetes.html +++ b/documentation/template/custom_service_kubernetes.html @@ -171,7 +171,7 @@

Graph Overview

NameElements -Pod {#NAMESPACE} / {#NAME} - Launch Statisticscheck_kubernetesd[get,containers,{#NAMESPACE},{#NAME},{#CONTAINER},ready]
check_kubernetesd[get,containers,{#NAMESPACE},{#NAME},{#CONTAINER},not_ready]
check_kubernetesd[get,containers,{#NAMESPACE},{#NAME},{#CONTAINER},restart_count]
+Container {#NAMESPACE} / {#NAME} - Launch Statisticscheck_kubernetesd[get,containers,{#NAMESPACE},{#NAME},{#CONTAINER},ready]
check_kubernetesd[get,containers,{#NAMESPACE},{#NAME},{#CONTAINER},not_ready]
check_kubernetesd[get,containers,{#NAMESPACE},{#NAME},{#CONTAINER},restart_count]
@@ -425,6 +425,58 @@

Item Overview

TRAP{#NAME} - condition_status_failedcheck_kubernetesd[get,nodes,{#NAME},condition_status_failed]00 +

Discovery rule "Custom - Service - Kubernetes - Pods"

+ + + + + + + + + + + + + + + + +
NameValue
NameCustom - Service - Kubernetes - Pods
Keycheck_kubernetesd[discover,pods]
TypeTRAP
Delay0
+

Trigger Overview

+ + + + +
NameDescriptionPriorityExpressionDependencies
+

Graph Overview

+ + + + + + + +
NameElements
Pod {#NAMESPACE} / {#NAME} - Launch Statisticscheck_kubernetesd[get,pods,{#NAMESPACE},{#NAME},ready]
check_kubernetesd[get,pods,{#NAMESPACE},{#NAME},not_ready]
check_kubernetesd[get,pods,{#NAMESPACE},{#NAME},restart_count]
+
+

Item Overview

+ + + + + + + + + + + + + + + + +
TypeNameKeyDescriptionInterval (sec)History DaysTrend Days
TRAP{#NAMESPACE} / {#NAME} - not_readycheck_kubernetesd[get,pods,{#NAMESPACE},{#NAME},not_ready]014d
TRAP{#NAMESPACE} / {#NAME} - readycheck_kubernetesd[get,pods,{#NAMESPACE},{#NAME},ready]014d
TRAP{#NAMESPACE} / {#NAME} - restart_countcheck_kubernetesd[get,pods,{#NAMESPACE},{#NAME},restart_count]014d
TRAP{#NAMESPACE} / {#NAME} - statuscheck_kubernetesd[get,pods,{#NAMESPACE},{#NAME},status]014d0

Discovery rule "Custom - Service - Kubernetes - PVCs"

diff --git a/k8sobjects/__init__.py b/k8sobjects/__init__.py index e5653dc..1747e77 100644 --- a/k8sobjects/__init__.py +++ b/k8sobjects/__init__.py @@ -1,11 +1,11 @@ -from .deployment import * +from .component import * +from .container import * from .daemonset import * +from .deployment import * from .ingress import * from .node import * -from .pvc import * from .pod import * +from .pvc import * +from .secret import * from .service import * from .statefulset import * -from .secret import * -from .component import * -from .container import * diff --git a/k8sobjects/component.py b/k8sobjects/component.py index fda2215..7bb61e5 100644 --- a/k8sobjects/component.py +++ b/k8sobjects/component.py @@ -4,12 +4,15 @@ from .k8sobject import K8sObject -logger = logging.getLogger(__file__) +logger = logging.getLogger("k8s-zabbix") class Component(K8sObject): object_type = 'component' + def get_list(self): + return self.manager.api.list_component_status() + @property def resource_data(self): data = super().resource_data @@ -38,7 +41,6 @@ def resource_data(self): def get_zabbix_metrics(self): data_to_send = list() - data_to_send.append(ZabbixMetric( self.zabbix_host, 'check_kubernetesd[get,components,%s,available_status]' % self.name, diff --git a/k8sobjects/container.py b/k8sobjects/container.py index e7b2b90..247c7ef 100644 --- a/k8sobjects/container.py +++ b/k8sobjects/container.py @@ -2,7 +2,7 @@ from pyzabbix import ZabbixMetric -logger = logging.getLogger(__file__) +logger = logging.getLogger("k8s-zabbix") def get_container_zabbix_metrics(zabbix_host: str, name_space: str, diff --git a/k8sobjects/daemonset.py b/k8sobjects/daemonset.py index fe514da..ea53e5c 100644 --- a/k8sobjects/daemonset.py +++ b/k8sobjects/daemonset.py @@ -4,7 +4,7 @@ from .k8sobject import K8sObject, transform_value -logger = logging.getLogger(__file__) +logger = logging.getLogger("k8s-zabbix") # https://kubernetes.io/docs/reference/generated/kubernetes-api/v1.20/#daemonset-v1-apps @@ -21,7 +21,10 @@ # 'updated_number_scheduled': 8}} class Daemonset(K8sObject): - object_type = 'daemonset' + object_type = "daemonset" + + def get_list(self): + return self.manager.api.list_daemon_set_for_all_namespaces() @property def resource_data(self): diff --git a/k8sobjects/deployment.py b/k8sobjects/deployment.py index 11b662d..afa716a 100644 --- a/k8sobjects/deployment.py +++ b/k8sobjects/deployment.py @@ -1,30 +1,34 @@ import logging from pyzabbix import ZabbixMetric + from .k8sobject import K8sObject, transform_value -logger = logging.getLogger(__file__) +logger = logging.getLogger("k8s-zabbix") class Deployment(K8sObject): - object_type = 'deployment' + object_type = "deployment" + + def get_list(self): + return self.manager.api.list_deployment_for_all_namespaces() @property def resource_data(self): data = super().resource_data - for status_type in self.data['status']: - if status_type == 'conditions': + for status_type in self.data["status"]: + if status_type == "conditions": continue - data.update({status_type: transform_value(self.data['status'][status_type])}) + data.update({status_type: transform_value(self.data["status"][status_type])}) failed_conds = [] - if self.data['status']['conditions']: - available_conds = [x for x in self.data['status']['conditions'] if x['type'].lower() == "available"] - if available_conds: - for cond in available_conds: - if cond['status'] != 'True': - failed_conds.append(cond['type']) + if self.data["status"]["conditions"]: + available_conds = [x for x in self.data["status"]["conditions"] if x["type"].lower() == "available"] + for cond in available_conds: + if cond["status"] != "True": + logger.info(self.data["status"]["conditions"]) + failed_conds.append(cond["type"]) if len(failed_conds) > 0: data['available_status'] = 'ERROR: ' + (','.join(failed_conds)) @@ -36,20 +40,21 @@ def resource_data(self): def get_zabbix_metrics(self): data_to_send = [] + rd = self.resource_data - for status_type in self.data['status']: - if status_type == 'conditions': + for status_type in self.data["status"]: + if status_type == "conditions": continue data_to_send.append(ZabbixMetric( self.zabbix_host, 'check_kubernetesd[get,deployments,%s,%s,%s]' % (self.name_space, self.name, status_type), - transform_value(self.resource_data[status_type])) + transform_value(rd[status_type])) ) data_to_send.append(ZabbixMetric( self.zabbix_host, 'check_kubernetesd[get,deployments,%s,%s,available_status]' % (self.name_space, self.name), - self.resource_data['available_status'])) + rd['available_status'])) return data_to_send diff --git a/k8sobjects/ingress.py b/k8sobjects/ingress.py index 7d10489..9b30950 100644 --- a/k8sobjects/ingress.py +++ b/k8sobjects/ingress.py @@ -2,11 +2,11 @@ from .k8sobject import K8sObject -logger = logging.getLogger(__file__) +logger = logging.getLogger("k8s-zabbix") class Ingress(K8sObject): - object_type = 'ingress' + object_type = "ingress" @property def resource_data(self): diff --git a/k8sobjects/k8sobject.py b/k8sobjects/k8sobject.py index f156a05..b941717 100644 --- a/k8sobjects/k8sobject.py +++ b/k8sobjects/k8sobject.py @@ -1,9 +1,9 @@ import datetime import hashlib +import importlib import json import logging import re - from typing import TYPE_CHECKING, TypedDict if TYPE_CHECKING: @@ -11,20 +11,20 @@ from pyzabbix import ZabbixMetric -logger = logging.getLogger(__file__) +logger = logging.getLogger("k8s-zabbix") K8S_RESOURCES = dict( - nodes='node', - components='component', - services='service', - deployments='deployment', - statefulsets='statefulset', - daemonsets='daemonset', - pods='pod', - containers='container', - secrets='secret', - ingresses='ingress', - pvcs='pvc' + nodes="node", + components="component", + services="service", + deployments="deployment", + statefulsets="statefulset", + daemonsets="daemonset", + pods="pod", + containers="container", + secrets="secret", + ingresses="ingress", + pvcs="pvc", ) INITIAL_DATE = datetime.datetime(2000, 1, 1, 0, 0) @@ -57,7 +57,7 @@ def transform_value(value: str) -> str: def slugit(name_space: str, name: str, maxlen: int) -> str: if name_space: - slug = name_space + '/' + name + slug = name_space + "/" + name else: slug = name @@ -100,6 +100,7 @@ def __init__(self, obj_data: ObjectDataType, resource: str, manager: 'K8sResourc """Get the resource data from the k8s api""" self.is_dirty_zabbix = True self.is_dirty_web = True + self.added = INITIAL_DATE self.last_sent_zabbix_discovery = INITIAL_DATE self.last_sent_zabbix = INITIAL_DATE self.last_sent_web = INITIAL_DATE @@ -116,7 +117,8 @@ def __str__(self) -> str: def resource_data(self) -> dict[str, str]: """ customized values for k8s objects """ if self.name_space is None: - raise RuntimeError("name_space is None for %s" % self.name) + if self.resource.lower() not in ["nodes", "components"]: + raise RuntimeError(f"name_space is None for [{self.resource}] {self.name}") return dict( name=self.name, name_space=self.name_space @@ -127,12 +129,14 @@ def uid(self) -> str: if not hasattr(self, 'object_type'): raise AttributeError('No object_type set! Dont use K8sObject itself!') elif not self.name: - raise AttributeError('No name set for K8sObject.uid! [%s] name_space: %s, name: %s' - % (self.object_type, self.name_space, self.name)) + raise AttributeError( + "No name set for K8sObject.uid! [%s] name_space: %s, name: %s" + % (self.object_type, self.name_space, self.name) + ) if self.name_space: - return self.object_type + '_' + self.name_space + '_' + self.name - return self.object_type + '_' + self.name + return self.object_type + "_" + self.name_space + "_" + self.name + return self.object_type + "_" + self.name @property def name(self) -> str: @@ -144,16 +148,41 @@ def name(self) -> str: @property def name_space(self) -> str | None: - from .node import Node from .component import Component + from .node import Node if isinstance(self, Node) or isinstance(self, Component): return None - name_space = self.data.get('metadata', {}).get('namespace') + name_space = self.data.get("metadata", {}).get("namespace") if not name_space: - raise Exception('Could not find name_space for obj [%s] %s' % (self.resource, self.name)) + raise Exception("Could not find name_space for obj [%s] %s" % (self.resource, self.name)) return name_space + def slug(self, name): + return slugit(self.name_space or "None", name, 40) + + def get_uid_list_and_data(self, data=None): + uid_ret = [] + data_ret = {} + + # if no data was fetched and passed before: fetch it + if self.resource == 'pvcs': + obj_list = self.get_list() + else: + obj_list = self.get_list().items + + for obj in obj_list: + if self.resource == 'pvcs': + d = obj + uid_ret.append(d.uid) + data_ret[d.uid] = d + else: + d = obj.to_dict() + n = self.manager.resource_class(d, self.resource, manager=self.manager) + uid_ret.append(n.uid) + data_ret[n.uid] = n + return uid_ret, data_ret + def is_unsubmitted_web(self) -> bool: return self.last_sent_web == INITIAL_DATE @@ -167,7 +196,7 @@ def get_zabbix_discovery_data(self) -> list[dict[str, str]]: return [{ "{#NAME}": self.name, "{#NAMESPACE}": self.name_space or "None", - "{#SLUG}": slugit(self.name_space or "None", self.name, 40), + "{#SLUG}": self.slug(self.name), }] def get_discovery_for_zabbix(self, discovery_data: list[dict[str, str]] | None) -> ZabbixMetric: @@ -176,10 +205,12 @@ def get_discovery_for_zabbix(self, discovery_data: list[dict[str, str]] | None) return ZabbixMetric( self.zabbix_host, - 'check_kubernetesd[discover,%s]' % self.resource, - json.dumps({ - 'data': discovery_data, - }) + "check_kubernetesd[discover,%s]" % self.resource, + json.dumps( + { + "data": discovery_data, + } + ), ) def get_zabbix_metrics(self) -> list[ZabbixMetric]: diff --git a/k8sobjects/k8sresourcemanager.py b/k8sobjects/k8sresourcemanager.py index a78b4a1..52c2bad 100644 --- a/k8sobjects/k8sresourcemanager.py +++ b/k8sobjects/k8sresourcemanager.py @@ -1,15 +1,23 @@ import importlib import logging +from datetime import datetime +from kubernetes.client import (AppsV1Api, CoreV1Api, + ApiextensionsV1Api) + +from base.config import Configuration from k8sobjects.k8sobject import K8S_RESOURCES, K8sObject -logger = logging.getLogger(__file__) +logger = logging.getLogger("k8s-zabbix") class K8sResourceManager: - def __init__(self, resource: str, zabbix_host: str | None = None): + def __init__(self, resource: str, apis: dict | None = None, + zabbix_host: str | None = None, config: Configuration | None = None): self.resource = resource + self.apis = apis self.zabbix_host = zabbix_host + self.config = config self.objects: dict[str, K8sObject] = dict() self.containers: dict = dict() # containers only used for pods @@ -17,30 +25,45 @@ def __init__(self, resource: str, zabbix_host: str | None = None): mod = importlib.import_module('k8sobjects') class_label = K8S_RESOURCES[resource] self.resource_class = getattr(mod, class_label.capitalize(), None) + if self.resource_class is not None: + self.resource_meta = self.resource_class(None, self.resource, manager=self) + logger.info(f"Creating new resource manager for resource {resource} with class {self.resource_class}") + self.api = self.get_api_for_resource(resource) + + def get_api_for_resource(self, resource: str) -> CoreV1Api | AppsV1Api | ApiextensionsV1Api: + if resource in ['nodes', 'components', 'secrets', 'pods', 'services', 'pvcs']: + api = self.apis.get('core_v1') + elif resource in ["deployments", "daemonsets", "statefulsets"]: + api = self.apis.get('apps_v1') + elif resource in ["ingresses"]: + api = self.apis.get('extensions_v1') + elif resource == 'containers': + api = None + else: + raise AttributeError("No valid resource found: %s" % resource) + return api + def add_obj_from_data(self, data: dict) -> K8sObject | None: if not self.resource_class: logger.error('No Resource Class found for "%s"' % self.resource) return None - try: - new_obj = self.resource_class(data, self.resource, manager=self) - return self.add_obj(new_obj) - except Exception as e: - logger.fatal(f"Unable to add object by data : {e} - >>><{data}<<") - return None + new_obj = self.resource_class(data, self.resource, manager=self) + return self.add_obj(new_obj) def add_obj(self, new_obj: K8sObject) -> K8sObject | None: - if new_obj.uid not in self.objects: # new object self.objects[new_obj.uid] = new_obj + new_obj.added = datetime.now() elif self.objects[new_obj.uid].data_checksum != new_obj.data_checksum: # existing object with modified data new_obj.last_sent_zabbix_discovery = self.objects[new_obj.uid].last_sent_zabbix_discovery new_obj.last_sent_zabbix = self.objects[new_obj.uid].last_sent_zabbix new_obj.last_sent_web = self.objects[new_obj.uid].last_sent_web + new_obj.added = self.objects[new_obj.uid].added new_obj.is_dirty_web = True new_obj.is_dirty_zabbix = True self.objects[new_obj.uid] = new_obj @@ -48,12 +71,18 @@ def add_obj(self, new_obj: K8sObject) -> K8sObject | None: # return created or updated object return self.objects[new_obj.uid] - def del_obj(self, obj: K8sObject) -> K8sObject | None: + def del_obj(self, obj: str | dict) -> K8sObject | None: if not self.resource_class: logger.error('No Resource Class found for "%s"' % self.resource) return None - resourced_obj = self.resource_class(obj, self.resource, manager=self) - if resourced_obj.uid in self.objects: - del self.objects[resourced_obj.uid] + if isinstance(obj, str): + # find by string + resourced_obj = self.objects[obj] + del self.objects[obj] + else: + # find by dict data + resourced_obj = self.resource_class(obj, self.resource, manager=self) + if resourced_obj.uid in self.objects: + del self.objects[resourced_obj.uid] return resourced_obj diff --git a/k8sobjects/node.py b/k8sobjects/node.py index 85ad03e..d09f5e0 100644 --- a/k8sobjects/node.py +++ b/k8sobjects/node.py @@ -6,7 +6,7 @@ from .k8sobject import K8sObject, transform_value -logger = logging.getLogger(__file__) +logger = logging.getLogger("k8s-zabbix") # TODO: remove after refactoring @@ -20,38 +20,47 @@ def get_node_names(api: CoreV1Api) -> list[str]: class Node(K8sObject): - object_type = 'node' - - MONITOR_VALUES = ['allocatable.cpu', - 'allocatable.ephemeral-storage', - 'allocatable.memory', - 'allocatable.pods', - 'capacity.cpu', - 'capacity.ephemeral-storage', - 'capacity.memory', - 'capacity.pods'] + object_type = "node" + + MONITOR_VALUES = [ + "allocatable.cpu", + "allocatable.ephemeral-storage", + "allocatable.memory", + "allocatable.pods", + "capacity.cpu", + "capacity.ephemeral-storage", + "capacity.memory", + "capacity.pods", + ] + + def get_list(self): + return self.manager.api.list_node() @property def resource_data(self): data = super().resource_data failed_conds = [] - data['condition_ready'] = False - for cond in self.data['status']['conditions']: - if cond['type'].lower() == "ready" and cond['status'] == 'True': - data['condition_ready'] = True - else: - if cond['status'] == 'True': - failed_conds.append(cond['type']) + data["condition_ready"] = False + for cond in self.data["status"]["conditions"]: + if cond["type"].lower() == "ready" and cond["status"] == "True": + data["condition_ready"] = True + elif cond["status"] == "True": + failed_conds.append(cond["type"]) - data['failed_conds'] = failed_conds + data["failed_conds"] = failed_conds for monitor_value in self.MONITOR_VALUES: - current_indirection = self.data['status'] + current_indirection = self.data["status"] + is_found = True for key in monitor_value.split("."): - current_indirection = current_indirection[key] + if key in current_indirection: + current_indirection = current_indirection[key] + else: + is_found = False - data[monitor_value] = transform_value(current_indirection) + if is_found: + data[monitor_value] = transform_value(current_indirection) return data @@ -60,14 +69,25 @@ def get_zabbix_metrics(self): data = self.resource_data data_to_send.append( - ZabbixMetric(self.zabbix_host, 'check_kubernetesd[get,nodes,' + self.name + ',available_status]', - 'not available' if data['condition_ready'] is not True else 'OK')) + ZabbixMetric( + self.zabbix_host, + "check_kubernetesd[get,nodes," + self.name + ",available_status]", + "not available" if data["condition_ready"] is not True else "OK", + ) + ) data_to_send.append( - ZabbixMetric(self.zabbix_host, 'check_kubernetesd[get,nodes,' + self.name + ',condition_status_failed]', - data['failed_conds'] if len(data['failed_conds']) > 0 else 'OK')) + ZabbixMetric( + self.zabbix_host, + "check_kubernetesd[get,nodes," + self.name + ",condition_status_failed]", + data["failed_conds"] if len(data["failed_conds"]) > 0 else "OK", + ) + ) for monitor_value in self.MONITOR_VALUES: - data_to_send.append(ZabbixMetric( - self.zabbix_host, 'check_kubernetesd[get,nodes,%s,%s]' % (self.name, monitor_value), - transform_value(data[monitor_value])) + data_to_send.append( + ZabbixMetric( + self.zabbix_host, + "check_kubernetesd[get,nodes,%s,%s]" % (self.name, monitor_value), + transform_value(data[monitor_value]), + ) ) return data_to_send diff --git a/k8sobjects/pod.py b/k8sobjects/pod.py index 7ed2df2..bedf1b9 100644 --- a/k8sobjects/pod.py +++ b/k8sobjects/pod.py @@ -1,154 +1,186 @@ import logging +import json import re +from pprint import pformat +from pyzabbix import ZabbixMetric -from k8sobjects import K8sObject +from k8sobjects import K8sObject, transform_value -logger = logging.getLogger(__file__) +logger = logging.getLogger("k8s-zabbix") class Pod(K8sObject): + """ Pod discovery is used also for containers """ object_type = 'pod' kind = None + def get_list(self): + return self.manager.api.list_pod_for_all_namespaces() + @property def name(self) -> str: - if 'metadata' not in self.data and 'name' in self.data['metadata']: - raise Exception(f'Could not find name in metadata for resource {self.resource}') + return self.real_name + + @property + def real_name(self) -> str: + if 'metadata' in self.data and 'name' in self.data['metadata']: + return self.data['metadata']['name'] + raise Exception(f'Could not find name with metadata in data for resource {self.resource}: {self.data}') + + @property + def base_name(self) -> str: + if "owner_references" in self.data['metadata'] and self.data['metadata']['owner_references'] is not None: + try: + self.kind = self.data['metadata']['owner_references'][0]['kind'] + except Exception as e: + logger.warning("Pod base_name: metadata: %s, error: %s" % (self.data['metadata'], str(e))) + self.kind = None + + generate_name = self.real_name - if "owner_references" in self.data['metadata']: - for owner_refs in self.data['metadata']['owner_references']: - self.kind = owner_refs['kind'] + # override with generate_name + if "generate_name" in self.data['metadata'] and self.data['metadata']['generate_name']: + generate_name = self.data['metadata']['generate_name'] - generate_name = self.data['metadata']['generate_name'] + ret_name = "" if generate_name is not None: match self.kind: case "Job": - name = re.sub(r'-\d+-$', '', generate_name) + ret_name = re.sub(r'-\d+-$', '', generate_name) case "ReplicaSet": - name = re.sub(r'-[a-f0-9]{4,}-$', '', generate_name) + ret_name = re.sub(r'-[a-f0-9]{4,}-$', '', generate_name) case _: - name = re.sub(r'-$', '', generate_name) + try: + ret_name = re.sub(r'-$', '', generate_name) + except Exception as e: + logger.warning("Container name Exception in Pod: %s\ngenerate_name:%s\ndata:%s\n: %s" % + (self.kind, generate_name, pformat(self.data, indent=2), str(e))) + return ret_name - return name + @property + def resource_data(self): + data = super().resource_data + data["containers"] = json.dumps(self.containers) + container_status = dict() + data["ready"] = True + pod_data = { + "restart_count": 0, + "ready": 0, + "not_ready": 0, + "status": "OK", + } + self.phase = self.data["status"]["phase"] - def get_zabbix_discovery_data(self) -> list[dict[str, str]]: - data = super().get_zabbix_discovery_data() - if self.kind is not None: - data[0]['{#KIND}'] = self.kind + if "container_statuses" in self.data["status"] and self.data["status"]["container_statuses"] is not None: + for container in self.data["status"]["container_statuses"]: + status_values = [] + container_name = container["name"] + + # this pod data + if container_name not in container_status: + container_status[container_name] = { + "restart_count": 0, + "ready": 0, + "not_ready": 0, + "status": "OK", + } + container_status[container_name]["restart_count"] += container["restart_count"] + pod_data["restart_count"] += container["restart_count"] + + if container["ready"] is True: + container_status[container_name]["ready"] += 1 + pod_data["ready"] += 1 + # There are 5 possible Pod phases: Pending, Running, Succeeded, Failed, Unknown + # Only Failed and Unknown should throw an Error + elif self.phase not in ["Succeeded", "Running", "Pending"]: + container_status[container_name]["not_ready"] += 1 + pod_data["not_ready"] += 1 + + if container["state"] and len(container["state"]) > 0: + for status, container_data in container["state"].items(): + try: + reason = container["state"][status]["reason"] + except Exception: + reason = "" + + if container_data is not None and status == "terminated" and reason != "Completed": + status_values.append("Terminated") + + if self.phase == "Pending" and reason == 'ImagePullBackOff': + container_status[container_name]["not_ready"] += 1 + pod_data["not_ready"] += 1 + status_values.append('ImagePullBackOff') + + if len(status_values) > 0: + container_status[container_name]["status"] = "ERROR: " + (",".join(status_values)) + pod_data["status"] = container_status[container_name]["status"] + data["ready"] = False + + data["container_status"] = json.dumps(container_status) + data["pod_data"] = json.dumps(pod_data) return data @property - def resource_data(self) -> dict[str, str]: - data = super().resource_data + def containers(self): + containers = {} + for container in self.data["spec"]["containers"]: + containers.setdefault(container["name"], 0) + containers[container["name"]] += 1 + return containers + + def get_zabbix_discovery_data(self) -> list[dict[str, str]]: + # Main Methode + data = [] + if self.manager.config.container_crawling == 'container': + for container in self.containers: + name = self.base_name + data += [ + { + "{#NAMESPACE}": self.name_space, + "{#NAME}": name, + "{#CONTAINER}": container, + "{#SLUG}": self.slug(name), + } + ] + else: + data += [ + { + "{#NAMESPACE}": self.name_space, + "{#NAME}": self.real_name, + } + ] return data def get_zabbix_metrics(self): - # TODO: Temporary - # data = self.resource_data data_to_send = list() + + self.data["status"].pop('conditions', None) + rd = self.resource_data + pod_data = json.loads(rd["pod_data"]) + + if self.manager.config.container_crawling == 'pod': + for status_type in pod_data: + data_to_send.append(ZabbixMetric( + self.zabbix_host, + 'check_kubernetesd[get,pods,%s,%s,%s]' % (self.name_space, self.name, status_type), + transform_value(pod_data[status_type])) + ) + return data_to_send -# class Pod(K8sObject): -# object_type = 'pod' -# -# @property -# def base_name(self): -# for container in self.data['spec']['containers']: -# if container['name'] in self.name: -# return container['name'] -# return self.name -# -# @property -# def containers(self): -# containers = {} -# for container in self.data['spec']['containers']: -# containers.setdefault(container['name'], 0) -# containers[container['name']] += 1 -# return containers -# -# @property -# def resource_data(self): -# data = super().resource_data -# data['containers'] = json.dumps(self.containers) -# container_status = dict() -# data['ready'] = True -# pod_data = { -# "restart_count": 0, -# "ready": 0, -# "not_ready": 0, -# "status": "OK", -# } -# -# if "container_statuses" in self.data['status'] and self.data['status']['container_statuses']: -# for container in self.data['status']['container_statuses']: -# status_values = [] -# container_name = container['name'] -# -# # this pod data -# if container_name not in container_status: -# container_status[container_name] = { -# "restart_count": 0, -# "ready": 0, -# "not_ready": 0, -# "status": "OK", -# } -# container_status[container_name]['restart_count'] += container['restart_count'] -# pod_data['restart_count'] += container['restart_count'] -# -# if container['ready'] is True: -# container_status[container_name]['ready'] += 1 -# pod_data['ready'] += 1 -# else: -# container_status[container_name]['not_ready'] += 1 -# pod_data['not_ready'] += 1 -# -# if container["state"] and len(container["state"]) > 0: -# for status, container_data in container["state"].items(): -# if container_data and status != "running": -# status_values.append(status) -# -# if len(status_values) > 0: -# container_status[container_name]['status'] = 'ERROR: ' + (','.join(status_values)) -# pod_data['status'] = container_status[container_name]['status'] -# data['ready'] = False -# -# data['container_status'] = json.dumps(container_status) -# data['pod_data'] = json.dumps(pod_data) -# return data -# -# def get_zabbix_discovery_data(self): -# data = list() -# for container in self.containers: -# data += [{ -# "{#NAMESPACE}": self.name_space, -# "{#NAME}": self.base_name, -# "{#CONTAINER}": container, -# }] -# return data -# -# def get_discovery_for_zabbix(self, discovery_data=None): -# if discovery_data is None: -# discovery_data = self.get_zabbix_discovery_data() -# -# return ZabbixMetric( -# self.zabbix_host, -# 'check_kubernetesd[discover,containers]', -# json.dumps({ -# 'data': discovery_data, -# }) -# ) -# -# def get_zabbix_metrics(self): -# data = self.resource_data -# data_to_send = list() -# -# # if 'status' not in data: -# # logger.error(data) -# # -# # for k, v in pod_data.items(): -# # data_to_send.append(ZabbixMetric( -# # self.zabbix_host, 'check_kubernetesd[get,pods,%s,%s,%s]' % (self.name_space, self.name, k), -# # v, -# # )) -# -# return data_to_send + def get_discovery_for_zabbix(self, discovery_data=None): + if discovery_data is None: + discovery_data = self.get_zabbix_discovery_data() + + if self.manager.config.container_crawling == 'container': + discovery_string = "check_kubernetesd[discover, containers]" + else: + discovery_string = "check_kubernetesd[discover, pods]" + return ZabbixMetric( + self.zabbix_host, + discovery_string, + json.dumps( + { + "data": discovery_data, + }), + ) diff --git a/k8sobjects/pvc.py b/k8sobjects/pvc.py index 3a77fb5..31d2cff 100644 --- a/k8sobjects/pvc.py +++ b/k8sobjects/pvc.py @@ -6,10 +6,10 @@ from pyzabbix import ZabbixMetric from . import get_node_names -from .k8sobject import K8sObject, ObjectDataType, MetadataObjectType +from .k8sobject import K8sObject, MetadataObjectType, ObjectDataType from .k8sresourcemanager import K8sResourceManager -logger = logging.getLogger(__file__) +logger = logging.getLogger("k8s-zabbix") def _get_pvc_data_for_node(api: CoreV1Api, node: str, pvc_volumes: list[K8sObject], timeout_seconds: int, @@ -23,26 +23,25 @@ def _get_pvc_data_for_node(api: CoreV1Api, node: str, pvc_volumes: list[K8sObjec header_params['Accept'] = api.api_client.select_header_accept( ['application/json', 'application/yaml', 'application/vnd.kubernetes.protobuf', 'application/json;stream=watch', 'application/vnd.kubernetes.protobuf;stream=watch']) # noqa: E501 - - auth_settings = ['BearerToken'] # noqa: E501 - path_params = {'node': node} + auth_settings = ["BearerToken"] # noqa: E501 + path_params = {"node": node} logger.debug(f"Getting pvc infos for node {node}") ret = api.api_client.call_api( - '/api/v1/nodes/{node}/proxy/stats/summary', - 'GET', + "/api/v1/nodes/{node}/proxy/stats/summary", + "GET", path_params, query_params, header_params, body=body_params, post_params=form_params, files=local_var_files, - response_type='str', # noqa: E501 + response_type="str", # noqa: E501 auth_settings=auth_settings, async_req=False, _return_http_data_only=True, _preload_content=False, _request_timeout=timeout_seconds, - collection_formats={} + collection_formats={}, ) loaded_json = json.loads(ret.data) @@ -79,11 +78,13 @@ def _process_volume(item: dict, namespace_exclude_re: str, node: str, owner_references=list()) volume['nodename'] = node - volume['usedBytesPercentage'] = float(float( - volume['usedBytes'] / volume['capacityBytes'])) * 100 + if volume['capacityBytes'] != 0: + volume['usedBytesPercentage'] = float(float( + volume['usedBytes'] / volume['capacityBytes'])) * 100 - volume['inodesUsedPercentage'] = float(float( - volume['inodesUsed'] / volume['inodes'])) * 100 + if volume['inodes'] != 0: + volume['inodesUsedPercentage'] = float(float( + volume['inodesUsed'] / volume['inodes'])) * 100 for key in ['name', 'pvcRef', 'time', 'availableBytes', 'inodesFree']: volume.pop(key, None) @@ -109,7 +110,13 @@ def get_pvc_volumes_for_all_nodes(api: CoreV1Api, timeout: int, namespace_exclud class Pvc(K8sObject): - object_type = 'pvc' + object_type = "pvc" + + def get_list(self): + return get_pvc_volumes_for_all_nodes(api=self.manager.api, + timeout=self.manager.config.k8s_api_request_timeout_seconds, + namespace_exclude_re=self.manager.config.namespace_exclude_re, + resource_manager=self.manager) @property def resource_data(self): @@ -118,7 +125,7 @@ def resource_data(self): def get_zabbix_metrics(self): data_to_send = list() - for key, value in self.data['item'].items(): + for key, value in self.data["item"].items(): data_to_send.append( ZabbixMetric( self.zabbix_host, diff --git a/k8sobjects/secret.py b/k8sobjects/secret.py index 4acbc5f..60b04ee 100644 --- a/k8sobjects/secret.py +++ b/k8sobjects/secret.py @@ -8,18 +8,18 @@ from .k8sobject import K8sObject -logger = logging.getLogger(__file__) +logger = logging.getLogger("k8s-zabbix") class Secret(K8sObject): - object_type = 'secret' + object_type = "secret" @property def resource_data(self): data = super().resource_data - if 'data' not in self.data or not self.data['data']: - logger.debug('No data for tls_cert "' + self.name_space + '/' + self.name + '"', self.data) + if "data" not in self.data or not self.data["data"]: + logger.debug('No data for tls_cert "' + self.name_space + "/" + self.name + '"', self.data) return data if "tls.crt" not in self.data["data"]: @@ -27,22 +27,25 @@ def resource_data(self): base64_decode = base64.b64decode(self.data["data"]["tls.crt"]) cert = x509.load_pem_x509_certificate(base64_decode, default_backend()) - data['valid_days'] = (cert.not_valid_after - datetime.datetime.now()).days + data["valid_days"] = (cert.not_valid_after - datetime.datetime.now()).days return data def get_zabbix_metrics(self): data = self.resource_data data_to_send = list() - if 'valid_days' not in data: + if "valid_days" not in data: return data_to_send - data_to_send.append(ZabbixMetric( - self.zabbix_host, 'check_kubernetesd[get,secret,' + self.name_space + ',' + self.name + ',valid_days]', - data['valid_days']) + data_to_send.append( + ZabbixMetric( + self.zabbix_host, + "check_kubernetesd[get,secret," + self.name_space + "," + self.name + ",valid_days]", + data["valid_days"], + ) ) return data_to_send def get_zabbix_discovery_data(self): if self.data["data"] is not None and "tls.crt" in dict(self.data["data"]): return super().get_zabbix_discovery_data() - return '' + return "" diff --git a/k8sobjects/service.py b/k8sobjects/service.py index 846d3cc..452d243 100644 --- a/k8sobjects/service.py +++ b/k8sobjects/service.py @@ -2,18 +2,21 @@ from .k8sobject import K8sObject -logger = logging.getLogger(__file__) +logger = logging.getLogger("k8s-zabbix") class Service(K8sObject): - object_type = 'service' + object_type = "service" + + def get_list(self): + return self.manager.api.list_service_for_all_namespaces() @property def resource_data(self): data = super().resource_data - data['is_ingress'] = False + data["is_ingress"] = False if self.data["status"]["load_balancer"]["ingress"] is not None: - data['is_ingress'] = True + data["is_ingress"] = True return data def get_zabbix_metrics(self): diff --git a/k8sobjects/statefulset.py b/k8sobjects/statefulset.py index 6241446..1c31bfe 100644 --- a/k8sobjects/statefulset.py +++ b/k8sobjects/statefulset.py @@ -4,11 +4,14 @@ from .k8sobject import K8sObject, transform_value -logger = logging.getLogger(__file__) +logger = logging.getLogger("k8s-zabbix") class Statefulset(K8sObject): - object_type = 'statefulset' + object_type = "statefulset" + + def get_list(self): + return self.manager.api.list_stateful_set_for_all_namespaces() @property def resource_data(self): diff --git a/kubernetes/incluster/01_monitoring-user.yaml b/kubernetes/incluster/01_monitoring-user.yaml index 8eba4e1..2b4fc76 100644 --- a/kubernetes/incluster/01_monitoring-user.yaml +++ b/kubernetes/incluster/01_monitoring-user.yaml @@ -21,6 +21,9 @@ rules: - apiGroups: - "" resources: + - namespaces + - endpoints + - pvcs - pods - nodes - nodes/proxy diff --git a/kubernetes/token/01_monitoring-user.yaml b/kubernetes/token/01_monitoring-user.yaml index 8eba4e1..aaf6046 100644 --- a/kubernetes/token/01_monitoring-user.yaml +++ b/kubernetes/token/01_monitoring-user.yaml @@ -10,7 +10,15 @@ kind: ServiceAccount metadata: name: monitoring namespace: monitoring - +--- +apiVersion: v1 +kind: Secret +type: kubernetes.io/service-account-token +metadata: + name: monitoring-token + namespace: monitoring + annotations: + kubernetes.io/service-account.name: monitoring --- kind: ClusterRole apiVersion: rbac.authorization.k8s.io/v1 diff --git a/raw_client.py b/raw_client.py new file mode 100644 index 0000000..d7d187c --- /dev/null +++ b/raw_client.py @@ -0,0 +1,35 @@ +import sys + +from kubernetes import client +from kubernetes import config as kube_config +from base.config import ClusterAccessConfigType, Configuration +from base.daemon_thread import KubernetesApi + + +class RawClient: + def __init__(self, ini_file): + config = Configuration() + config.load_config_file(ini_file) + self.config = config + + if config.k8s_config_type is ClusterAccessConfigType.INCLUSTER: + kube_config.load_incluster_config() + self.api_client = client.ApiClient() + elif config.k8s_config_type is ClusterAccessConfigType.KUBECONFIG: + kube_config.load_kube_config() + self.api_client = kube_config.new_client_from_config() + elif config.k8s_config_type is ClusterAccessConfigType.TOKEN: + self.api_configuration = client.Configuration() + self.api_configuration.host = config.k8s_api_host + self.api_configuration.verify_ssl = config.verify_ssl + self.api_configuration.api_key = {"authorization": "Bearer " + config.k8s_api_token} + self.api_client = client.ApiClient(self.api_configuration) + else: + self.logger.fatal(f"k8s_config_type = {config.k8s_config_type} is not implemented") + sys.exit(1) + + self.apis = { + 'core_v1': KubernetesApi(self.api_client).core_v1, + 'apps_v1': KubernetesApi(self.api_client).apps_v1, + 'extensions_v1': KubernetesApi(self.api_client).extensions_v1 + } diff --git a/template/create_template_documentation b/template/create_template_documentation index 9d3f001..8714a40 100755 --- a/template/create_template_documentation +++ b/template/create_template_documentation @@ -1,22 +1,21 @@ #!/bin/bash -set -x +set -x BDIR="$(dirname $(readlink -f $0))" cd $BDIR || exit 1 -if ( !(type xalan >/dev/null 2>&1) );then - echo "INFO: skipping documentation generation because xalan is not installed" - echo " (apt-get install xalan)" - exit 0 +if (!(type xalan >/dev/null 2>&1)); then + echo "INFO: skipping documentation generation because xalan is not installed" + echo " (apt-get install xalan)" + exit 0 fi rm -rf documentation mkdir documentation -for i in custom*.xml; do - echo "=> $i" - DOCUFILE="../documentation/template/${i%%.xml}.html" - DOCUFILE="$(echo $DOCUFILE|tr ' ' '_')" - xalan -in "$i" -out $DOCUFILE -xsl transform.xsl; +for i in custom*.xml; do + echo "=> $i" + DOCUFILE="../documentation/template/${i%%.xml}.html" + DOCUFILE="$(echo $DOCUFILE | tr ' ' '_')" + xalan -in "$i" -out $DOCUFILE -xsl transform.xsl done - diff --git a/template/custom_service_kubernetes.xml b/template/custom_service_kubernetes.xml index 69c8cbb..902dd10 100644 --- a/template/custom_service_kubernetes.xml +++ b/template/custom_service_kubernetes.xml @@ -1,13 +1,7 @@ 5.4 - 2022-01-03T10:25:43Z - - - 7df96b18c230490a9a0a9e2307226338 - Templates - - + 2023-07-06T08:32:16Z