From c68b404c8b04d3f1d229857815801cb37771af90 Mon Sep 17 00:00:00 2001 From: Pedro Algarvio Date: Tue, 18 Jul 2023 08:03:45 +0100 Subject: [PATCH 01/24] When loading plugin configs, catch `KeyError` on unavailable plugins and list all available. Signed-off-by: Pedro Algarvio --- changelog/68.improvement.rst | 3 +++ src/saf/models.py | 30 +++++++++++++++++++++++++++--- 2 files changed, 30 insertions(+), 3 deletions(-) create mode 100644 changelog/68.improvement.rst diff --git a/changelog/68.improvement.rst b/changelog/68.improvement.rst new file mode 100644 index 0000000..6be105b --- /dev/null +++ b/changelog/68.improvement.rst @@ -0,0 +1,3 @@ +Several improvements to the project + +* When loading plugin configs, catch `KeyError` on unavailable plugins and list all available. diff --git a/src/saf/models.py b/src/saf/models.py index 82dfd44..bd82c86 100644 --- a/src/saf/models.py +++ b/src/saf/models.py @@ -109,7 +109,15 @@ def loaded_plugin(self: CCB) -> ModuleType: """ Return the plugin instance(module) for which this configuration refers to. """ - return PluginsList.instance().collectors[self.plugin] + try: + return PluginsList.instance().collectors[self.plugin] + except KeyError as exc: + log.warning( + "Failed to load %r collector plugin. Available collector plugins: %s", + self.plugin, + list(PluginsList.instance().collectors), + ) + raise exc from None PCB = TypeVar("PCB", bound="ProcessConfigBase") @@ -125,7 +133,15 @@ def loaded_plugin(self: PCB) -> ModuleType: """ Return the plugin instance(module) for which this configuration refers to. """ - return PluginsList.instance().processors[self.plugin] + try: + return PluginsList.instance().processors[self.plugin] + except KeyError as exc: + log.warning( + "Failed to load %r processor plugin. Available processor plugins: %s", + self.plugin, + list(PluginsList.instance().processors), + ) + raise exc from None FCB = TypeVar("FCB", bound="ForwardConfigBase") @@ -141,7 +157,15 @@ def loaded_plugin(self: FCB) -> ModuleType: """ Return the plugin instance(module) for which this configuration refers to. """ - return PluginsList.instance().forwarders[self.plugin] + try: + return PluginsList.instance().forwarders[self.plugin] + except KeyError as exc: + log.warning( + "Failed to load %r forwarder plugin. Available forwarder plugins: %s", + self.plugin, + list(PluginsList.instance().forwarders), + ) + raise exc from None PC = TypeVar("PC", bound="PipelineConfig") From afffb1626d3ecc184ee8c664bfe76a4a7b762232 Mon Sep 17 00:00:00 2001 From: Pedro Algarvio Date: Wed, 5 Jul 2023 14:21:19 +0100 Subject: [PATCH 02/24] Fix regression which prevented pipelines without processors Signed-off-by: Pedro Algarvio --- changelog/68.bugfix.rst | 1 + 1 file changed, 1 insertion(+) create mode 100644 changelog/68.bugfix.rst diff --git a/changelog/68.bugfix.rst b/changelog/68.bugfix.rst new file mode 100644 index 0000000..804c729 --- /dev/null +++ b/changelog/68.bugfix.rst @@ -0,0 +1 @@ +Fix regression which prevented pipelines without processors From 24076caea9c1f03b053323630acf41fa8753b702 Mon Sep 17 00:00:00 2001 From: Pedro Algarvio Date: Wed, 5 Jul 2023 14:24:37 +0100 Subject: [PATCH 03/24] Pass the event tag and stamp explicitly when calling `saf.utils.eventbus._construct_event()` Signed-off-by: Pedro Algarvio --- changelog/68.improvement.rst | 1 + src/saf/utils/eventbus.py | 20 +++++++++++--------- 2 files changed, 12 insertions(+), 9 deletions(-) diff --git a/changelog/68.improvement.rst b/changelog/68.improvement.rst index 6be105b..683557f 100644 --- a/changelog/68.improvement.rst +++ b/changelog/68.improvement.rst @@ -1,3 +1,4 @@ Several improvements to the project * When loading plugin configs, catch `KeyError` on unavailable plugins and list all available. +* Pass the event tag and stamp explicitly when calling `saf.utils.eventbus._construct_event()` diff --git a/src/saf/utils/eventbus.py b/src/saf/utils/eventbus.py index 4c1b631..40099f2 100644 --- a/src/saf/utils/eventbus.py +++ b/src/saf/utils/eventbus.py @@ -19,12 +19,13 @@ from saf.models import SaltEvent if TYPE_CHECKING: + from datetime import datetime from queue import Queue log = logging.getLogger(__name__) -def _construct_event(event_data: dict[str, Any]) -> SaltEvent | None: +def _construct_event(tag: str, stamp: datetime, event_data: dict[str, Any]) -> SaltEvent | None: """ Construct a :py:class:`~saf.models.SaltEvent` from a salt event payload. """ @@ -37,9 +38,9 @@ def _construct_event(event_data: dict[str, Any]) -> SaltEvent | None: if key.startswith("_"): event_data.pop(key) salt_event = SaltEvent( - tag=event_data["tag"], - stamp=event_raw_data["_stamp"], - data=event_data["data"], + tag=tag, + stamp=stamp, + data=event_data.get("data") or event_data, raw_data=event_raw_data, ) log.debug("Constructed SaltEvent: %s", salt_event) @@ -80,9 +81,6 @@ def _process_events( for tag in tags: try: if fnmatch.fnmatch(beacon_event_data["tag"], tag): - if "_stamp" not in beacon_event_data: - # Wrapped beacon data usually lack the _stamp key/value pair. Use parent's. - beacon_event_data["_stamp"] = event_data["_stamp"] # Unwrap the nested data key/value pair if needed if "data" in beacon_event_data["data"]: beacon_event_data["data"] = beacon_event_data["data"].pop( @@ -93,7 +91,11 @@ def _process_events( beacon_event_data["tag"], beacon_event_data["data"], ) - salt_event = _construct_event(beacon_event_data) + salt_event = _construct_event( + beacon_event_data["tag"], + event_data["_stamp"], + beacon_event_data, + ) if salt_event: events_queue.put_nowait(salt_event) # We found a matching tag, stop iterating tags @@ -109,7 +111,7 @@ def _process_events( for tag in tags: if fnmatch.fnmatch(event_tag, tag): log.debug("Matching event; TAG: %r DATA: %r", event_tag, event_data) - salt_event = _construct_event(event_data) + salt_event = _construct_event(event_tag, event_data["_stamp"], event_data) if salt_event: events_queue.put_nowait(salt_event) # We found a matching tag, stop iterating tags From 88fa32006afa294ffc049b4ba2638ff8e6f3b657 Mon Sep 17 00:00:00 2001 From: Pedro Algarvio Date: Wed, 5 Jul 2023 14:25:51 +0100 Subject: [PATCH 04/24] Add the `id` attribute to `BeaconCollectedEvent`, which is minion ID. Signed-off-by: Pedro Algarvio --- changelog/68.improvement.rst | 1 + pyproject.toml | 2 ++ src/saf/collect/beacons.py | 9 +++++++++ 3 files changed, 12 insertions(+) diff --git a/changelog/68.improvement.rst b/changelog/68.improvement.rst index 683557f..56c78a4 100644 --- a/changelog/68.improvement.rst +++ b/changelog/68.improvement.rst @@ -2,3 +2,4 @@ Several improvements to the project * When loading plugin configs, catch `KeyError` on unavailable plugins and list all available. * Pass the event tag and stamp explicitly when calling `saf.utils.eventbus._construct_event()` +* Add the `id` attribute to `BeaconCollectedEvent`, which is minion ID. diff --git a/pyproject.toml b/pyproject.toml index 09a0538..2431e6e 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -81,6 +81,8 @@ ignore = [ "PERF203", # `try`-`except` within a loop incurs performance overhead" "PERF401", # Use a list comprehension to create a transformed list "PERF402", # Use `list` or `list.copy` to create a copy of a list + "FIX002", # Line contains TODO" + "TD003", # Missing issue link on the line following this TODO" ] ignore-init-module-imports = true builtins = [ diff --git a/src/saf/collect/beacons.py b/src/saf/collect/beacons.py index f517988..22bf4a2 100644 --- a/src/saf/collect/beacons.py +++ b/src/saf/collect/beacons.py @@ -46,6 +46,7 @@ class BeaconCollectedEvent(CollectedEvent): """ beacon: str + id: str # noqa: A003 tag: str stamp: datetime raw_data: Dict[str, Any] @@ -84,8 +85,16 @@ async def collect(*, ctx: PipelineRunContext[BeaconsConfig]) -> AsyncIterator[Be tags = {f"salt/beacon/*/{beacon}/*" for beacon in config.beacons} log.info("The beacons collect plugin is configured to listen to tags: %s", tags) async for salt_event in eventbus.iter_events(opts=ctx.salt_config.copy(), tags=tags): + if "beacon_name" not in salt_event.raw_data: + # TODO @s0undt3ch: We're listening master side, and the payload is not the same... Fix it? + continue + daemon_id = salt_event.data.get("id") + if daemon_id is None: + tag_parts = salt_event.tag.split("/") + daemon_id = tag_parts[2] yield BeaconCollectedEvent( beacon=salt_event.raw_data["beacon_name"], + id=daemon_id, tag=salt_event.tag, stamp=salt_event.stamp, data=salt_event.data, From 50b03e16cc40359deadf6eba1e4eb5571a777f19 Mon Sep 17 00:00:00 2001 From: Pedro Algarvio Date: Wed, 5 Jul 2023 14:34:00 +0100 Subject: [PATCH 05/24] Add an elastic search forwarder (inc. example usage) Fixes #69 Signed-off-by: Pedro Algarvio --- .dockerignore | 1 + changelog/69.improvement.rst | 1 + docker/elastic/.env | 30 +++++ docker/elastic/centosstream8.Dockerfile | 41 ++++++ docker/elastic/centosstream9.Dockerfile | 54 ++++++++ docker/elastic/conf/beacons.conf | 17 +++ docker/elastic/conf/salt-analytics.conf | 31 +++++ docker/elastic/conf/supervisord.conf | 29 ++++ docker/elastic/conf/supervisord.master.conf | 2 + docker/elastic/conf/supervisord.minion.conf | 2 + docker/elastic/debian10.Dockerfile | 41 ++++++ docker/elastic/debian11.Dockerfile | 54 ++++++++ docker/elastic/docker-compose.yml | 127 ++++++++++++++++++ examples/requirements/all.txt | 1 + examples/requirements/elasticsearch.txt | 1 + examples/setup.cfg | 9 ++ .../safexamples/process/beacons_to_es.py | 51 +++++++ requirements/elasticsearch.txt | 1 + setup.cfg | 2 + src/saf/forward/elasticsearch.py | 113 ++++++++++++++++ 20 files changed, 608 insertions(+) create mode 100644 .dockerignore create mode 100644 changelog/69.improvement.rst create mode 100644 docker/elastic/.env create mode 100644 docker/elastic/centosstream8.Dockerfile create mode 100644 docker/elastic/centosstream9.Dockerfile create mode 100644 docker/elastic/conf/beacons.conf create mode 100644 docker/elastic/conf/salt-analytics.conf create mode 100644 docker/elastic/conf/supervisord.conf create mode 100644 docker/elastic/conf/supervisord.master.conf create mode 100644 docker/elastic/conf/supervisord.minion.conf create mode 100644 docker/elastic/debian10.Dockerfile create mode 100644 docker/elastic/debian11.Dockerfile create mode 100644 docker/elastic/docker-compose.yml create mode 100644 examples/requirements/elasticsearch.txt create mode 100644 examples/src/saltext/safexamples/process/beacons_to_es.py create mode 100644 requirements/elasticsearch.txt create mode 100644 src/saf/forward/elasticsearch.py diff --git a/.dockerignore b/.dockerignore new file mode 100644 index 0000000..9b89590 --- /dev/null +++ b/.dockerignore @@ -0,0 +1 @@ +RADIO diff --git a/changelog/69.improvement.rst b/changelog/69.improvement.rst new file mode 100644 index 0000000..1aad54e --- /dev/null +++ b/changelog/69.improvement.rst @@ -0,0 +1 @@ +Add an Elastic Search forwarder(inc. example usage) diff --git a/docker/elastic/.env b/docker/elastic/.env new file mode 100644 index 0000000..3702ec4 --- /dev/null +++ b/docker/elastic/.env @@ -0,0 +1,30 @@ +# Password for the 'elastic' user (at least 6 characters) +ELASTIC_PASSWORD=elastic + +# Password for the 'kibana_system' user (at least 6 characters) +KIBANA_PASSWORD=kibana + +# Version of Elastic products +#STACK_VERSION=8.8.2 +STACK_VERSION=7.17.11 + +# Set the cluster name +CLUSTER_NAME=docker-cluster + +# Set to 'basic' or 'trial' to automatically start the 30-day trial +LICENSE=basic +#LICENSE=trial + +# Port to expose Elasticsearch HTTP API to the host +ES_PORT=9200 +#ES_PORT=127.0.0.1:9200 + +# Port to expose Kibana to the host +KIBANA_PORT=5601 +#KIBANA_PORT=80 + +# Increase or decrease based on the available host memory (in bytes) +MEM_LIMIT=2073741824 + +# Project namespace (defaults to the current folder name if not set) +#COMPOSE_PROJECT_NAME=myproject diff --git a/docker/elastic/centosstream8.Dockerfile b/docker/elastic/centosstream8.Dockerfile new file mode 100644 index 0000000..eac3024 --- /dev/null +++ b/docker/elastic/centosstream8.Dockerfile @@ -0,0 +1,41 @@ +FROM ghcr.io/saltstack/salt-ci-containers/centos-stream:8 as base + +ENV LANG=C.UTF-8 +ENV LANGUAGE=C.UTF-8 +RUN ln -sf /etc/localtime /usr/share/zoneinfo/America/Denver + +RUN dnf update -y \ + && dnf upgrade -y \ + && dnf install -y sed vim tmux sudo tree net-tools bind-utils lsof nmap which binutils iputils epel-release procps \ + && dnf install -y --allowerasing curl \ + && dnf install -y multitail supervisor + +RUN mkdir -p /etc/supervisor/conf.d/ +ADD docker/elastic/conf/supervisord.conf /etc/supervisor/supervisord.conf + +RUN rpm --import https://repo.saltproject.io/salt/py3/redhat/9/x86_64/SALT-PROJECT-GPG-PUBKEY-2023.pub \ + && curl -fsSL https://repo.saltproject.io/salt/py3/redhat/9/x86_64/3006.repo | tee /etc/yum.repos.d/salt.repo \ + && dnf install -y salt + +COPY ../../dist/salt*.whl /src/ +RUN ls -lah /src \ + && /opt/saltstack/salt/salt-pip install /src/salt_analytics_framework*.whl \ + && rm -f /src/*.whl + +COPY ../../examples/dist/salt*.whl /src/ +RUN ls -lah /src \ + && /opt/saltstack/salt/salt-pip install --find-links /src/ salt-analytics.examples[elasticsearch] \ + && rm -f /src/*.whl + + +FROM base as minion-2 + +ADD docker/elastic/conf/supervisord.minion.conf /etc/supervisor/conf.d/minion.conf +ADD docker/elastic/conf/beacons.conf /etc/salt/minion.d/beacons.conf +ADD docker/elastic/conf/salt-analytics.conf /etc/salt/minion.d/salt-analytics.conf +RUN mkdir -p /etc/salt/minion.d \ + && echo 'id: minion-2' > /etc/salt/minion.d/id.conf \ + && echo 'master: master-1' > /etc/salt/minion.d/master.conf \ + && dnf install -y salt-minion + +CMD ["/usr/bin/supervisord","-c","/etc/supervisor/supervisord.conf"] diff --git a/docker/elastic/centosstream9.Dockerfile b/docker/elastic/centosstream9.Dockerfile new file mode 100644 index 0000000..49eaed2 --- /dev/null +++ b/docker/elastic/centosstream9.Dockerfile @@ -0,0 +1,54 @@ +FROM ghcr.io/saltstack/salt-ci-containers/centos-stream:9 as base + +ENV LANG=C.UTF-8 +ENV LANGUAGE=C.UTF-8 +RUN ln -sf /etc/localtime /usr/share/zoneinfo/America/Denver + +RUN dnf update -y \ + && dnf upgrade -y \ + && dnf install -y sed vim tmux sudo tree net-tools bind-utils lsof nmap which binutils iputils epel-release procps \ + && dnf install -y --allowerasing curl \ + && dnf install -y multitail supervisor + +RUN mkdir -p /etc/supervisor/conf.d/ +ADD docker/elastic/conf/supervisord.conf /etc/supervisor/supervisord.conf + +RUN rpm --import https://repo.saltproject.io/salt/py3/redhat/9/x86_64/SALT-PROJECT-GPG-PUBKEY-2023.pub \ + && curl -fsSL https://repo.saltproject.io/salt/py3/redhat/9/x86_64/3006.repo | tee /etc/yum.repos.d/salt.repo \ + && dnf install -y salt + +COPY ../../dist/salt*.whl /src/ +RUN ls -lah /src \ + && /opt/saltstack/salt/salt-pip install /src/salt_analytics_framework*.whl \ + && rm -f /src/*.whl + +COPY ../../examples/dist/salt*.whl /src/ +RUN ls -lah /src \ + && /opt/saltstack/salt/salt-pip install --find-links /src/ salt-analytics.examples[elasticsearch] \ + && rm -f /src/*.whl + + +FROM base as master-1 + +ADD docker/elastic/conf/supervisord.master.conf /etc/supervisor/conf.d/master.conf +ADD docker/elastic/conf/beacons.conf /etc/salt/master.d/beacons.conf +ADD docker/elastic/conf/salt-analytics.conf /etc/salt/master.d/salt-analytics.conf +RUN mkdir -p /etc/salt/master.d \ + && echo 'id: master-1' > /etc/salt/master.d/id.conf \ + && echo 'open_mode: true' > /etc/salt/master.d/open-mode.conf \ + && dnf install -y salt-master + +CMD ["/usr/bin/supervisord","-c","/etc/supervisor/supervisord.conf"] + + +FROM base as minion-1 + +ADD docker/elastic/conf/supervisord.minion.conf /etc/supervisor/conf.d/minion.conf +ADD docker/elastic/conf/beacons.conf /etc/salt/minion.d/beacons.conf +ADD docker/elastic/conf/salt-analytics.conf /etc/salt/minion.d/salt-analytics.conf +RUN mkdir -p /etc/salt/minion.d \ + && echo 'id: minion-1' > /etc/salt/minion.d/id.conf \ + && echo 'master: master-1' > /etc/salt/minion.d/master.conf \ + && dnf install -y salt-minion + +CMD ["/usr/bin/supervisord","-c","/etc/supervisor/supervisord.conf"] diff --git a/docker/elastic/conf/beacons.conf b/docker/elastic/conf/beacons.conf new file mode 100644 index 0000000..c15fdde --- /dev/null +++ b/docker/elastic/conf/beacons.conf @@ -0,0 +1,17 @@ +beacons: +# diskusage: +# - '^\/(?!home).*$': 1% +# - '^[a-zA-Z]:\\$': 1% +# - interval: 5 + memusage: + - percent: 1% + - interval: 5 +# salt_monitor: +# - salt_fun: +# - test.ping +# - interval: 5 +## status: +## - interval: 5 +## swapusage: +## - percent: 1% +## - interval: 5 diff --git a/docker/elastic/conf/salt-analytics.conf b/docker/elastic/conf/salt-analytics.conf new file mode 100644 index 0000000..a7cf3c6 --- /dev/null +++ b/docker/elastic/conf/salt-analytics.conf @@ -0,0 +1,31 @@ +engines: + - analytics + +analytics: + + collectors: + beacons-collector: + plugin: beacons + beacons: + - "*" + + processors: + cast-to-es: + plugin: beacons_to_es + + forwarders: + elasticsearch-forwarder: + plugin: elasticsearch + hosts: + - http://node01:9200 + use_ssl: false + verify_ssl: false + + pipelines: + elastic-pipeline: + collect: + - beacons-collector + process: + - cast-to-es + forward: + - elasticsearch-forwarder diff --git a/docker/elastic/conf/supervisord.conf b/docker/elastic/conf/supervisord.conf new file mode 100644 index 0000000..c48ac50 --- /dev/null +++ b/docker/elastic/conf/supervisord.conf @@ -0,0 +1,29 @@ +[supervisord] +user=root +logfile=/var/log/supervisor/supervisord.log ; (main log file;default $CWD/supervisord.log) +logfile_maxbytes=50MB ; (max main logfile bytes b4 rotation;default 50MB) +logfile_backups=10 ; (num of main logfile rotation backups;default 10) +loglevel=info ; (logging level;default info; others: debug,warn) +pidfile=/var/run/supervisord.pid ; (supervisord pidfile;default supervisord.pid) +nodaemon=true ; (start in foreground if true;default false) +minfds=1024 ; (min. avail startup file descriptors;default 1024) +minprocs=200 ; (min. avail process descriptors;default 200) + +[unix_http_server] +file=/var/tmp/supervisor.sock +username=setup +password=setup + +[supervisorctl] +serverurl=unix:///var/tmp/supervisor.sock ; use a unix:// URL for a unix socket +username=setup +password=setup + +; the below section must remain in the config file for RPC +; (supervisorctl/web interface) to work, additional interfaces may be +; added by defining them in separate rpcinterface: sections +[rpcinterface:supervisor] +supervisor.rpcinterface_factory = supervisor.rpcinterface:make_main_rpcinterface + +[include] +files = /etc/supervisor/conf.d/*.conf diff --git a/docker/elastic/conf/supervisord.master.conf b/docker/elastic/conf/supervisord.master.conf new file mode 100644 index 0000000..1a85cc2 --- /dev/null +++ b/docker/elastic/conf/supervisord.master.conf @@ -0,0 +1,2 @@ +[program:salt-master] +command=/opt/saltstack/salt/salt-master -l debug diff --git a/docker/elastic/conf/supervisord.minion.conf b/docker/elastic/conf/supervisord.minion.conf new file mode 100644 index 0000000..4827aa9 --- /dev/null +++ b/docker/elastic/conf/supervisord.minion.conf @@ -0,0 +1,2 @@ +[program:salt-minion] +command=/opt/saltstack/salt/salt-minion -l debug diff --git a/docker/elastic/debian10.Dockerfile b/docker/elastic/debian10.Dockerfile new file mode 100644 index 0000000..e7b3dbc --- /dev/null +++ b/docker/elastic/debian10.Dockerfile @@ -0,0 +1,41 @@ +FROM ghcr.io/saltstack/salt-ci-containers/debian:10 as base + +ENV LANG=C.UTF-8 +ENV LANGUAGE=C.UTF-8 +RUN ln -sf /etc/localtime /usr/share/zoneinfo/America/Denver + +RUN apt update \ + && apt upgrade -y \ + && apt install -y curl sed vim tmux sudo tree net-tools bind9utils lsof nmap binutils multitail supervisor iputils-ping procps + +RUN mkdir -p /etc/supervisor/conf.d/ +ADD docker/elastic/conf/supervisord.conf /etc/supervisor/supervisord.conf + +RUN mkdir /etc/apt/keyrings \ + && curl -fsSL -o /etc/apt/keyrings/salt-archive-keyring-2023.gpg https://repo.saltproject.io/salt/py3/debian/11/amd64/SALT-PROJECT-GPG-PUBKEY-2023.gpg \ + && echo "deb [signed-by=/etc/apt/keyrings/salt-archive-keyring-2023.gpg arch=amd64] https://repo.saltproject.io/salt/py3/debian/11/amd64/3006 bullseye main" | tee /etc/apt/sources.list.d/salt.list \ + && apt update \ + && apt install -y salt-common + +COPY ../../dist/salt*.whl /src/ +RUN ls -lah /src \ + && /opt/saltstack/salt/salt-pip install /src/salt_analytics_framework*.whl \ + && rm -f /src/*.whl + +COPY ../../examples/dist/salt*.whl /src/ +RUN ls -lah /src \ + && /opt/saltstack/salt/salt-pip install --find-links /src/ salt-analytics.examples[elasticsearch] \ + && rm -f /src/*.whl + + +FROM base as minion-4 + +ADD docker/elastic/conf/supervisord.minion.conf /etc/supervisor/conf.d/minion.conf +ADD docker/elastic/conf/beacons.conf /etc/salt/minion.d/beacons.conf +ADD docker/elastic/conf/salt-analytics.conf /etc/salt/minion.d/salt-analytics.conf +RUN mkdir -p /etc/salt/minion.d \ + && echo 'id: minion-4' > /etc/salt/minion.d/id.conf \ + && echo 'master: master-2' > /etc/salt/minion.d/master.conf \ + && apt install -y salt-minion + +CMD ["/usr/bin/supervisord","-c","/etc/supervisor/supervisord.conf"] diff --git a/docker/elastic/debian11.Dockerfile b/docker/elastic/debian11.Dockerfile new file mode 100644 index 0000000..3040f43 --- /dev/null +++ b/docker/elastic/debian11.Dockerfile @@ -0,0 +1,54 @@ +FROM ghcr.io/saltstack/salt-ci-containers/debian:11 as base + +ENV LANG=C.UTF-8 +ENV LANGUAGE=C.UTF-8 +RUN ln -sf /etc/localtime /usr/share/zoneinfo/America/Denver + +RUN apt update \ + && apt upgrade -y \ + && apt install -y curl sed vim tmux sudo tree net-tools bind9-utils lsof nmap binutils multitail supervisor iputils-ping procps + +RUN mkdir -p /etc/supervisor/conf.d/ +ADD docker/elastic/conf/supervisord.conf /etc/supervisor/supervisord.conf + +RUN mkdir /etc/apt/keyrings \ + && curl -fsSL -o /etc/apt/keyrings/salt-archive-keyring-2023.gpg https://repo.saltproject.io/salt/py3/debian/11/amd64/SALT-PROJECT-GPG-PUBKEY-2023.gpg \ + && echo "deb [signed-by=/etc/apt/keyrings/salt-archive-keyring-2023.gpg arch=amd64] https://repo.saltproject.io/salt/py3/debian/11/amd64/3006 bullseye main" | tee /etc/apt/sources.list.d/salt.list \ + && apt update \ + && apt install -y salt-common + +COPY ../../dist/salt*.whl /src/ +RUN ls -lah /src \ + && /opt/saltstack/salt/salt-pip install /src/salt_analytics_framework*.whl \ + && rm -f /src/*.whl + +COPY ../../examples/dist/salt*.whl /src/ +RUN ls -lah /src \ + && /opt/saltstack/salt/salt-pip install --find-links /src/ salt-analytics.examples[elasticsearch] \ + && rm -f /src/*.whl + + +FROM base as master-2 + +ADD docker/elastic/conf/supervisord.master.conf /etc/supervisor/conf.d/master.conf +ADD docker/elastic/conf/beacons.conf /etc/salt/master.d/beacons.conf +ADD docker/elastic/conf/salt-analytics.conf /etc/salt/master.d/salt-analytics.conf +RUN mkdir -p /etc/salt/master.d \ + && echo 'id: master-2' > /etc/salt/master.d/id.conf \ + && echo 'open_mode: true' > /etc/salt/master.d/open-mode.conf \ + && apt install -y salt-master + +CMD ["/usr/bin/supervisord","-c","/etc/supervisor/supervisord.conf"] + + +FROM base as minion-3 + +ADD docker/elastic/conf/supervisord.minion.conf /etc/supervisor/conf.d/minion.conf +ADD docker/elastic/conf/beacons.conf /etc/salt/minion.d/beacons.conf +ADD docker/elastic/conf/salt-analytics.conf /etc/salt/minion.d/salt-analytics.conf +RUN mkdir -p /etc/salt/minion.d \ + && echo 'id: minion-3' > /etc/salt/minion.d/id.conf \ + && echo 'master: master-2' > /etc/salt/minion.d/master.conf \ + && apt install -y salt-minion + +CMD ["/usr/bin/supervisord","-c","/etc/supervisor/supervisord.conf"] diff --git a/docker/elastic/docker-compose.yml b/docker/elastic/docker-compose.yml new file mode 100644 index 0000000..5e2902c --- /dev/null +++ b/docker/elastic/docker-compose.yml @@ -0,0 +1,127 @@ +version: '3.8' +services: + node01: + image: docker.elastic.co/elasticsearch/elasticsearch:${STACK_VERSION} + container_name: node01 + environment: + - node.name=node01 + - cluster.name=es-cluster-7 + - discovery.type=single-node + - "ES_JAVA_OPTS=-Xms512m -Xmx512m" + ulimits: + memlock: + soft: -1 + hard: -1 + volumes: + - es-data01:/usr/share/elasticsearch/data + ports: + - ${ES_PORT}:9200 + networks: + - es-network + + kibana: + image: docker.elastic.co/kibana/kibana:${STACK_VERSION} + container_name: kibana + environment: + ELASTICSEARCH_HOSTS: http://node01:9200 + ports: + - ${KIBANA_PORT}:5601 + networks: + - es-network + depends_on: + - node01 + + metricbeat: + image: docker.elastic.co/beats/metricbeat:${STACK_VERSION} + container_name: metricbeat + environment: + ELASTICSEARCH_HOSTS: http://node01:9200 + volumes: + - metricbeat-data01:/usr/share/metricbeat/data + networks: + - es-network + depends_on: + - node01 + + master-1: + depends_on: + - kibana + container_name: master-1 + build: + context: ../../ + dockerfile: docker/elastic/centosstream9.Dockerfile + target: master-1 + networks: + es-network: + aliases: + - master-1 + minion-1: + depends_on: + - kibana + container_name: minion-1 + build: + context: ../../ + dockerfile: docker/elastic/centosstream9.Dockerfile + target: minion-1 + networks: + es-network: + aliases: + - minion-1 + minion-2: + depends_on: + - kibana + container_name: minion-2 + build: + context: ../../ + dockerfile: docker/elastic/centosstream8.Dockerfile + target: minion-2 + networks: + es-network: + aliases: + - minion-2 + master-2: + depends_on: + - kibana + container_name: master-2 + build: + context: ../../ + dockerfile: docker/elastic/debian11.Dockerfile + target: master-2 + networks: + es-network: + aliases: + - master-2 + minion-3: + depends_on: + - kibana + container_name: minion-3 + build: + context: ../../ + dockerfile: docker/elastic/debian11.Dockerfile + target: minion-3 + networks: + es-network: + aliases: + - minion-3 + minion-4: + depends_on: + - kibana + container_name: minion-4 + build: + context: ../../ + dockerfile: docker/elastic/debian10.Dockerfile + target: minion-4 + networks: + es-network: + aliases: + - minion-4 + +volumes: + es-data01: + driver: local + metricbeat-data01: + driver: local + +networks: + es-network: + driver: bridge diff --git a/examples/requirements/all.txt b/examples/requirements/all.txt index 3acf00e..7d81ef1 100644 --- a/examples/requirements/all.txt +++ b/examples/requirements/all.txt @@ -1,2 +1,3 @@ +-r elasticsearch.txt -r mnist-notebook.txt -r mnist.txt diff --git a/examples/requirements/elasticsearch.txt b/examples/requirements/elasticsearch.txt new file mode 100644 index 0000000..8c7dabc --- /dev/null +++ b/examples/requirements/elasticsearch.txt @@ -0,0 +1 @@ +elasticsearch[async] diff --git a/examples/setup.cfg b/examples/setup.cfg index 9c48d80..00a7264 100644 --- a/examples/setup.cfg +++ b/examples/setup.cfg @@ -48,6 +48,7 @@ saf.process = mnist_network = saltext.safexamples.process.mnist_network notebook_output = saltext.safexamples.process.notebook_output numpy_save_keys = saltext.safexamples.process.numpy_save_keys + beacons_to_es = saltext.safexamples.process.beacons_to_es [bdist_wheel] # Use this option if your package is pure-python @@ -56,3 +57,11 @@ universal = 1 [sdist] owner = root group = root + + +[requirements-files] +extras_require = + all = requirements/all.txt + elasticsearch = requirements/elasticsearch.txt + mnist = requirements/mnist.txt + mnist-notebook = requirements/mnist-notebook.txt diff --git a/examples/src/saltext/safexamples/process/beacons_to_es.py b/examples/src/saltext/safexamples/process/beacons_to_es.py new file mode 100644 index 0000000..93326b5 --- /dev/null +++ b/examples/src/saltext/safexamples/process/beacons_to_es.py @@ -0,0 +1,51 @@ +# Copyright 2021-2023 VMware, Inc. +# SPDX-License-Identifier: Apache-2.0 +""" +Convert beacon event to an elastic search index event. +""" +from __future__ import annotations + +import logging +import uuid +from typing import TYPE_CHECKING +from typing import Any +from typing import AsyncIterator +from typing import Type +from typing import cast + +from saf.forward.elasticsearch import ElasticSearchEvent +from saf.models import PipelineRunContext +from saf.models import ProcessConfigBase + +if TYPE_CHECKING: + from saf.collect.beacons import BeaconCollectedEvent +log = logging.getLogger(__name__) + + +class BeaconToESConfig(ProcessConfigBase): + """ + Processor configuration. + """ + + +def get_config_schema() -> Type[BeaconToESConfig]: + """ + Get the test collect plugin configuration schema. + """ + return BeaconToESConfig + + +async def process( + *, + ctx: PipelineRunContext[BeaconToESConfig], + event: BeaconCollectedEvent, +) -> AsyncIterator[ElasticSearchEvent]: + """ + Method called to collect events, in this case, generate. + """ + data = cast(dict[str, Any], event.data).copy() + data["role"] = ctx.info.salt.role + if TYPE_CHECKING: + assert event.timestamp + data["@timestamp"] = event.timestamp.isoformat() + yield ElasticSearchEvent.construct(index=event.beacon, id=str(uuid.uuid4()), data=data) diff --git a/requirements/elasticsearch.txt b/requirements/elasticsearch.txt new file mode 100644 index 0000000..8c7dabc --- /dev/null +++ b/requirements/elasticsearch.txt @@ -0,0 +1 @@ +elasticsearch[async] diff --git a/setup.cfg b/setup.cfg index d92d255..86afaa1 100644 --- a/setup.cfg +++ b/setup.cfg @@ -62,6 +62,7 @@ saf.forward = disk = saf.forward.disk noop = saf.forward.noop test = saf.forward.test + elasticsearch = saf.forward.elasticsearch [requirements-files] @@ -75,6 +76,7 @@ extras_require = changelog = requirements/changelog.txt build = requirements/build.txt jupyter = requirements/jupyter.txt + elasticsearch = requirements/elasticsearch.txt [bdist_wheel] diff --git a/src/saf/forward/elasticsearch.py b/src/saf/forward/elasticsearch.py new file mode 100644 index 0000000..ac9012e --- /dev/null +++ b/src/saf/forward/elasticsearch.py @@ -0,0 +1,113 @@ +# Copyright 2021-2023 VMware, Inc. +# SPDX-License-Identifier: Apache-2.0 +""" +ElasticSearch forwarder. + +Requires the elasticsearch-py python library. +""" +from __future__ import annotations + +import logging +import pprint +import uuid +from datetime import datetime +from datetime import timedelta +from typing import Any +from typing import Dict +from typing import List +from typing import Optional +from typing import Type + +from pydantic import Field + +from saf.models import CollectedEvent +from saf.models import ForwardConfigBase +from saf.models import PipelineRunContext + +try: + from elasticsearch import JsonSerializer +except ImportError: + + class JsonSerializer: # type: ignore[no-redef] + """ + Simple class definition in order not to cause errors when elasticsearch fails to import. + """ + + +log = logging.getLogger(__name__) + + +class AnalyticsJsonSerializer(JsonSerializer): + """ + Custom JSON serializer. + """ + + def default(self, data: Any) -> Any: # noqa: ANN101,ANN401 + """ + Convert the passed data to a type that can be JSON serialized. + """ + if isinstance(data, set): + return list(data) + if isinstance(data, datetime): + return data.isoformat() + if isinstance(data, timedelta): + return data.total_seconds() + return super().default(data) + + +class ElasticSearchEvent(CollectedEvent): + """ + ElasticSearch Event. + """ + + index: str + id: str = Field(default_factory=lambda: str(uuid.uuid4())) # noqa: A003 + data: Dict[str, Any] + + +class ElasticSearchConfig(ForwardConfigBase): + """ + Configuration schema for the disk forward plugin. + """ + + hosts: List[str] + username: Optional[str] = None + password: Optional[str] = None + timeout: int = 10 + + +def get_config_schema() -> Type[ElasticSearchConfig]: + """ + Get the ElasticSearch forwarder configuration schema. + """ + return ElasticSearchConfig + + +async def forward( + *, + ctx: PipelineRunContext[ElasticSearchConfig], + event: ElasticSearchEvent, +) -> None: + """ + Method called to forward the event. + """ + from elasticsearch import AsyncElasticsearch + + if "client" not in ctx.cache: + config = ctx.config + optional_config = {} + if config.username and config.password: + optional_config["http_auth"] = (config.username, config.password) + ctx.cache["client"] = client = AsyncElasticsearch( + hosts=config.hosts, + serializers={"application/json": AnalyticsJsonSerializer()}, + **optional_config, + ) + connection_info = await client.info() + log.warning("ES Connection Info:\n%s", pprint.pformat(connection_info)) + client = ctx.cache["client"] + data = event.data.copy() + if "@timestamp" not in data: + data["@timestamp"] = event.timestamp + ret = await client.index(index=event.index, id=event.id, body=data) + log.warning("ES SEND:\n%s", pprint.pformat(ret)) From 230a99493abfbe58edefd6a7ff1e4176a58c113a Mon Sep 17 00:00:00 2001 From: MKLeb Date: Mon, 3 Jul 2023 17:44:32 -0400 Subject: [PATCH 06/24] Initial check in for job aggregations --- setup.cfg | 3 + src/saf/collect/event_bus.py | 55 ++++++++++++++++++ src/saf/collect/salt_cmd.py | 56 +++++++++++++++++++ src/saf/process/state_aggregate.py | 89 ++++++++++++++++++++++++++++++ src/saf/utils/eventbus.py | 7 ++- 5 files changed, 209 insertions(+), 1 deletion(-) create mode 100644 src/saf/collect/event_bus.py create mode 100644 src/saf/collect/salt_cmd.py create mode 100644 src/saf/process/state_aggregate.py diff --git a/setup.cfg b/setup.cfg index 86afaa1..993bb39 100644 --- a/setup.cfg +++ b/setup.cfg @@ -50,13 +50,16 @@ salt.loader = salt-analytics-framework = saf.saltext saf.collect = beacons = saf.collect.beacons + event-bus = saf.collect.event_bus file = saf.collect.file salt_exec = saf.collect.salt_exec + salt_cmd = saf.collect.salt_cmd test = saf.collect.test saf.process = regex_mask = saf.process.regex_mask shannon_mask = saf.process.shannon_mask jupyter_notebook = saf.process.jupyter_notebook + state-aggregate = saf.process.state_aggregate test = saf.process.test saf.forward = disk = saf.forward.disk diff --git a/src/saf/collect/event_bus.py b/src/saf/collect/event_bus.py new file mode 100644 index 0000000..ca6e1dc --- /dev/null +++ b/src/saf/collect/event_bus.py @@ -0,0 +1,55 @@ +# Copyright 2021-2023 VMware, Inc. +# SPDX-License-Identifier: Apache-2.0 +""" +Collect events from the event bus. +""" +from __future__ import annotations + +import logging +from typing import AsyncIterator +from typing import Set +from typing import Type + +from saf.models import CollectConfigBase +from saf.models import CollectedEvent +from saf.models import PipelineRunContext +from saf.models import SaltEvent +from saf.utils import eventbus + +log = logging.getLogger(__name__) + + +class EventBusConfig(CollectConfigBase): + """ + Configuration schema for the beacons collect plugin. + """ + + tags: Set[str] + + +def get_config_schema() -> Type[EventBusConfig]: + """ + Get the event bus collect plugin configuration schema. + """ + return EventBusConfig + + +class EventBusCollectedEvent(CollectedEvent): + """ + A collected event surrounding a SaltEvent. + """ + + salt_event: SaltEvent + + +async def collect( + *, ctx: PipelineRunContext[EventBusConfig] +) -> AsyncIterator[EventBusCollectedEvent]: + """ + Method called to collect events. + """ + config = ctx.config + salt_event: SaltEvent + log.info("The event bus collect plugin is configured to listen to tags: %s", config.tags) + async for salt_event in eventbus.iter_events(opts=ctx.salt_config.copy(), tags=config.tags): + yield EventBusCollectedEvent(salt_event=salt_event, data={"tag": salt_event.tag}) diff --git a/src/saf/collect/salt_cmd.py b/src/saf/collect/salt_cmd.py new file mode 100644 index 0000000..647cfb4 --- /dev/null +++ b/src/saf/collect/salt_cmd.py @@ -0,0 +1,56 @@ +# Copyright 2021-2023 VMware, Inc. +# SPDX-License-Identifier: Apache-2.0 +""" +Collect events from the event bus. +""" +from __future__ import annotations + +import asyncio +import logging +from typing import AsyncIterator +from typing import Dict +from typing import List +from typing import Type + +from salt.client import LocalClient + +from saf.models import CollectConfigBase +from saf.models import CollectedEvent +from saf.models import PipelineRunContext + +log = logging.getLogger(__name__) + + +class SaltCommandConfig(CollectConfigBase): + """ + Configuration schema for the beacons collect plugin. + """ + + targets: str + cmd: str + args: List[str] | None + kwargs: Dict[str, str] | None + interval: float = 5 + cache_flag: str | None = None + + +def get_config_schema() -> Type[SaltCommandConfig]: + """ + Get the event bus collect plugin configuration schema. + """ + return SaltCommandConfig + + +async def collect(*, ctx: PipelineRunContext[SaltCommandConfig]) -> AsyncIterator[CollectedEvent]: + """ + Method called to collect events. + """ + config = ctx.config + client = LocalClient(mopts=ctx.salt_config.copy()) + + while True: + ret = client.cmd(config.targets, config.cmd, arg=config.args, kwarg=config.kwargs) + event = CollectedEvent(data={config._name: ret}) # noqa: SLF001 + log.debug("CollectedEvent: %s", event) + yield event + await asyncio.sleep(config.interval) diff --git a/src/saf/process/state_aggregate.py b/src/saf/process/state_aggregate.py new file mode 100644 index 0000000..4b999b0 --- /dev/null +++ b/src/saf/process/state_aggregate.py @@ -0,0 +1,89 @@ +# Copyright 2021-2023 VMware, Inc. +# SPDX-License-Identifier: Apache-2.0 +""" +Aggregate the necessary job info into one event to be forwarded. +""" +from __future__ import annotations + +import fnmatch +import logging +from typing import TYPE_CHECKING +from typing import AsyncIterator +from typing import Type + +from saf.collect.event_bus import EventBusCollectedEvent +from saf.models import CollectedEvent +from saf.models import PipelineRunContext +from saf.models import ProcessConfigBase + +if TYPE_CHECKING: + from datetime import datetime + from datetime import timedelta + +log = logging.getLogger(__name__) + + +class StateAggregateConfig(ProcessConfigBase): + """ + Job aggregate collector configuration. + """ + + +def get_config_schema() -> Type[StateAggregateConfig]: + """ + Get the job aggregate collect plugin configuration schema. + """ + return StateAggregateConfig + + +class StateAggregateCollectedEvent(CollectedEvent): + """ + A collected event with aggregated state run information. + """ + + start_time: datetime + end_time: datetime + duration: timedelta + minion_id: str + + +async def process( + *, + ctx: PipelineRunContext[StateAggregateConfig], + event: CollectedEvent, +) -> AsyncIterator[CollectedEvent]: + """ + Aggregate received events, otherwise store in cache. + """ + if isinstance(event, EventBusCollectedEvent): + salt_event = event.salt_event + tag = salt_event.tag + data = salt_event.data + if fnmatch.fnmatch(tag, "salt/job/*/new"): + # We will probably want to make this condition configurable + if TYPE_CHECKING: + assert isinstance(salt_event.data, dict) + if data.get("fun") == "state.apply": + jid = tag.split("/")[2] + if "watched_jids" not in ctx.cache: + ctx.cache["watched_jids"] = {} + # We are going to want a TTL at some point for the watched jids + ctx.cache["watched_jids"][jid] = salt_event + elif fnmatch.fnmatch(tag, "salt/job/*/ret/*"): + split_tag = tag.split("/") + jid = split_tag[2] + if "watched_jids" not in ctx.cache: + ctx.cache["watched_jids"] = {} + if jid in ctx.cache["watched_jids"]: + job_start_event = ctx.cache["watched_jids"][jid] + minion_id = split_tag[-1] + start_time = job_start_event.stamp + end_time = salt_event.stamp + duration = end_time - start_time + yield StateAggregateCollectedEvent.construct( + data=data, + start_time=start_time, + end_time=end_time, + duration=duration, + minion_id=minion_id, + ) diff --git a/src/saf/utils/eventbus.py b/src/saf/utils/eventbus.py index 40099f2..8eedcbd 100644 --- a/src/saf/utils/eventbus.py +++ b/src/saf/utils/eventbus.py @@ -37,10 +37,15 @@ def _construct_event(tag: str, stamp: datetime, event_data: dict[str, Any]) -> S for key in list(event_data): if key.startswith("_"): event_data.pop(key) + event_data_value = event_data.get("data") or event_data + if isinstance(event_data_value, str): + data = {"return": event_data_value} + else: + data = event_data_value salt_event = SaltEvent( tag=tag, stamp=stamp, - data=event_data.get("data") or event_data, + data=data, raw_data=event_raw_data, ) log.debug("Constructed SaltEvent: %s", salt_event) From 925a4405887779937ce2dcf54070304d559c8b38 Mon Sep 17 00:00:00 2001 From: MKLeb Date: Thu, 6 Jul 2023 06:40:34 +0100 Subject: [PATCH 07/24] Add grains collector and state aggregate processor --- setup.cfg | 2 +- src/saf/collect/{salt_cmd.py => grains.py} | 30 ++++++++++++++-------- src/saf/process/state_aggregate.py | 24 ++++++++++++++++- 3 files changed, 43 insertions(+), 13 deletions(-) rename src/saf/collect/{salt_cmd.py => grains.py} (63%) diff --git a/setup.cfg b/setup.cfg index 993bb39..871f4f1 100644 --- a/setup.cfg +++ b/setup.cfg @@ -53,7 +53,7 @@ saf.collect = event-bus = saf.collect.event_bus file = saf.collect.file salt_exec = saf.collect.salt_exec - salt_cmd = saf.collect.salt_cmd + grains = saf.collect.grains test = saf.collect.test saf.process = regex_mask = saf.process.regex_mask diff --git a/src/saf/collect/salt_cmd.py b/src/saf/collect/grains.py similarity index 63% rename from src/saf/collect/salt_cmd.py rename to src/saf/collect/grains.py index 647cfb4..7b3ae40 100644 --- a/src/saf/collect/salt_cmd.py +++ b/src/saf/collect/grains.py @@ -26,12 +26,9 @@ class SaltCommandConfig(CollectConfigBase): Configuration schema for the beacons collect plugin. """ - targets: str - cmd: str - args: List[str] | None - kwargs: Dict[str, str] | None - interval: float = 5 - cache_flag: str | None = None + targets: str = "*" + grains: List[str] + interval: float = 20 def get_config_schema() -> Type[SaltCommandConfig]: @@ -41,7 +38,18 @@ def get_config_schema() -> Type[SaltCommandConfig]: return SaltCommandConfig -async def collect(*, ctx: PipelineRunContext[SaltCommandConfig]) -> AsyncIterator[CollectedEvent]: +class GrainsCollectedEvent(CollectedEvent): + """ + A collected event surrounding a SaltEvent. + """ + + minion: str + grains: Dict[str, str] + + +async def collect( + *, ctx: PipelineRunContext[SaltCommandConfig] +) -> AsyncIterator[GrainsCollectedEvent]: """ Method called to collect events. """ @@ -49,8 +57,8 @@ async def collect(*, ctx: PipelineRunContext[SaltCommandConfig]) -> AsyncIterato client = LocalClient(mopts=ctx.salt_config.copy()) while True: - ret = client.cmd(config.targets, config.cmd, arg=config.args, kwarg=config.kwargs) - event = CollectedEvent(data={config._name: ret}) # noqa: SLF001 - log.debug("CollectedEvent: %s", event) - yield event + ret = client.cmd(config.targets, "grains.item", arg=config.grains) + for minion, grains in ret.items(): + event = GrainsCollectedEvent(data=ret, minion=minion, grains=grains) + yield event await asyncio.sleep(config.interval) diff --git a/src/saf/process/state_aggregate.py b/src/saf/process/state_aggregate.py index 4b999b0..20a5eb2 100644 --- a/src/saf/process/state_aggregate.py +++ b/src/saf/process/state_aggregate.py @@ -9,9 +9,11 @@ import logging from typing import TYPE_CHECKING from typing import AsyncIterator +from typing import Dict from typing import Type from saf.collect.event_bus import EventBusCollectedEvent +from saf.collect.grains import GrainsCollectedEvent from saf.models import CollectedEvent from saf.models import PipelineRunContext from saf.models import ProcessConfigBase @@ -45,6 +47,7 @@ class StateAggregateCollectedEvent(CollectedEvent): end_time: datetime duration: timedelta minion_id: str + grains: Dict[str, str] async def process( @@ -80,10 +83,29 @@ async def process( start_time = job_start_event.stamp end_time = salt_event.stamp duration = end_time - start_time - yield StateAggregateCollectedEvent.construct( + grains = ctx.cache.get("grains", {}).get(minion_id, {}) + ret = StateAggregateCollectedEvent.construct( data=data, start_time=start_time, end_time=end_time, duration=duration, minion_id=minion_id, + grains=grains, ) + if grains: + yield ret + else: + if "waiting_for_grains" not in ctx.cache: + ctx.cache["waiting_for_grains"] = set() + ctx.cache["waiting_for_grains"].add(ret) + elif isinstance(event, GrainsCollectedEvent): + if "grains" not in ctx.cache: + ctx.cache["grains"] = {} + ctx.cache["grains"][event.minion] = event.grains + waiting = ctx.cache.get("waiting_for_grains") + if waiting: + to_remove = [agg_event for agg_event in waiting if agg_event.minion_id == event.minion] + for event_with_grains in to_remove: + event_with_grains.grains = event.grains + waiting.remove(event_with_grains) + yield event_with_grains From 248fa1ca1146f601abf98861f0f22a0a774c9f37 Mon Sep 17 00:00:00 2001 From: Pedro Algarvio Date: Wed, 5 Jul 2023 16:50:12 +0100 Subject: [PATCH 08/24] Renamed `src/saf/process/state_aggregate.py` -> `src/saf/process/job_aggregate.py` Additionally, we listen to all job events, not just state events. Signed-off-by: Pedro Algarvio --- setup.cfg | 2 +- .../{state_aggregate.py => job_aggregate.py} | 27 ++++++++++--------- 2 files changed, 16 insertions(+), 13 deletions(-) rename src/saf/process/{state_aggregate.py => job_aggregate.py} (80%) diff --git a/setup.cfg b/setup.cfg index 871f4f1..c7f9833 100644 --- a/setup.cfg +++ b/setup.cfg @@ -59,7 +59,7 @@ saf.process = regex_mask = saf.process.regex_mask shannon_mask = saf.process.shannon_mask jupyter_notebook = saf.process.jupyter_notebook - state-aggregate = saf.process.state_aggregate + job-aggregate = saf.process.job_aggregate test = saf.process.test saf.forward = disk = saf.forward.disk diff --git a/src/saf/process/state_aggregate.py b/src/saf/process/job_aggregate.py similarity index 80% rename from src/saf/process/state_aggregate.py rename to src/saf/process/job_aggregate.py index 20a5eb2..3440297 100644 --- a/src/saf/process/state_aggregate.py +++ b/src/saf/process/job_aggregate.py @@ -62,24 +62,27 @@ async def process( salt_event = event.salt_event tag = salt_event.tag data = salt_event.data + if "watched_jids" not in ctx.cache: + ctx.cache["watched_jids"] = {} if fnmatch.fnmatch(tag, "salt/job/*/new"): + jid = tag.split("/")[2] # We will probably want to make this condition configurable - if TYPE_CHECKING: - assert isinstance(salt_event.data, dict) - if data.get("fun") == "state.apply": - jid = tag.split("/")[2] - if "watched_jids" not in ctx.cache: - ctx.cache["watched_jids"] = {} - # We are going to want a TTL at some point for the watched jids - ctx.cache["watched_jids"][jid] = salt_event + if jid not in ctx.cache["watched_jids"]: + ctx.cache["watched_jids"][jid] = { + "minions": set(salt_event.data["minions"]), + "event": salt_event, + } elif fnmatch.fnmatch(tag, "salt/job/*/ret/*"): split_tag = tag.split("/") jid = split_tag[2] - if "watched_jids" not in ctx.cache: - ctx.cache["watched_jids"] = {} + minion_id = split_tag[-1] if jid in ctx.cache["watched_jids"]: - job_start_event = ctx.cache["watched_jids"][jid] - minion_id = split_tag[-1] + ctx.cache["watched_jids"][jid]["minions"].remove(minion_id) + if not ctx.cache["watched_jids"][jid]["minions"]: + # No more minions should return. Remove jid from cache + job_start_event = ctx.cache["watched_jids"].pop(jid)["event"] + else: + job_start_event = ctx.cache["watched_jids"][jid]["event"] start_time = job_start_event.stamp end_time = salt_event.stamp duration = end_time - start_time From 1a5f5ab99602fae68687013a991fdfe3e214f947 Mon Sep 17 00:00:00 2001 From: MKLeb Date: Thu, 6 Jul 2023 06:45:12 +0100 Subject: [PATCH 09/24] Make some additional improvements to the JobAggregate Processor --- src/saf/collect/grains.py | 17 ++++++++--------- src/saf/process/job_aggregate.py | 31 +++++++++++++++++++------------ 2 files changed, 27 insertions(+), 21 deletions(-) diff --git a/src/saf/collect/grains.py b/src/saf/collect/grains.py index 7b3ae40..a3ff74c 100644 --- a/src/saf/collect/grains.py +++ b/src/saf/collect/grains.py @@ -21,21 +21,21 @@ log = logging.getLogger(__name__) -class SaltCommandConfig(CollectConfigBase): +class GrainsConfig(CollectConfigBase): """ Configuration schema for the beacons collect plugin. """ targets: str = "*" grains: List[str] - interval: float = 20 + interval: float = 5 -def get_config_schema() -> Type[SaltCommandConfig]: +def get_config_schema() -> Type[GrainsConfig]: """ Get the event bus collect plugin configuration schema. """ - return SaltCommandConfig + return GrainsConfig class GrainsCollectedEvent(CollectedEvent): @@ -47,9 +47,7 @@ class GrainsCollectedEvent(CollectedEvent): grains: Dict[str, str] -async def collect( - *, ctx: PipelineRunContext[SaltCommandConfig] -) -> AsyncIterator[GrainsCollectedEvent]: +async def collect(*, ctx: PipelineRunContext[GrainsConfig]) -> AsyncIterator[GrainsCollectedEvent]: """ Method called to collect events. """ @@ -59,6 +57,7 @@ async def collect( while True: ret = client.cmd(config.targets, "grains.item", arg=config.grains) for minion, grains in ret.items(): - event = GrainsCollectedEvent(data=ret, minion=minion, grains=grains) - yield event + if grains: + event = GrainsCollectedEvent(data=ret, minion=minion, grains=grains) + yield event await asyncio.sleep(config.interval) diff --git a/src/saf/process/job_aggregate.py b/src/saf/process/job_aggregate.py index 3440297..f342b8f 100644 --- a/src/saf/process/job_aggregate.py +++ b/src/saf/process/job_aggregate.py @@ -10,6 +10,7 @@ from typing import TYPE_CHECKING from typing import AsyncIterator from typing import Dict +from typing import Set from typing import Type from saf.collect.event_bus import EventBusCollectedEvent @@ -25,22 +26,24 @@ log = logging.getLogger(__name__) -class StateAggregateConfig(ProcessConfigBase): +class JobAggregateConfig(ProcessConfigBase): """ Job aggregate collector configuration. """ + jobs: Set[str] -def get_config_schema() -> Type[StateAggregateConfig]: + +def get_config_schema() -> Type[JobAggregateConfig]: """ Get the job aggregate collect plugin configuration schema. """ - return StateAggregateConfig + return JobAggregateConfig -class StateAggregateCollectedEvent(CollectedEvent): +class JobAggregateCollectedEvent(CollectedEvent): """ - A collected event with aggregated state run information. + A collected event with aggregated job run information. """ start_time: datetime @@ -52,7 +55,7 @@ class StateAggregateCollectedEvent(CollectedEvent): async def process( *, - ctx: PipelineRunContext[StateAggregateConfig], + ctx: PipelineRunContext[JobAggregateConfig], event: CollectedEvent, ) -> AsyncIterator[CollectedEvent]: """ @@ -67,11 +70,15 @@ async def process( if fnmatch.fnmatch(tag, "salt/job/*/new"): jid = tag.split("/")[2] # We will probably want to make this condition configurable - if jid not in ctx.cache["watched_jids"]: - ctx.cache["watched_jids"][jid] = { - "minions": set(salt_event.data["minions"]), - "event": salt_event, - } + salt_func = data.get("fun", "") + for func_filter in ctx.config.jobs: + if fnmatch.fnmatch(salt_func, func_filter): + if jid not in ctx.cache["watched_jids"]: + ctx.cache["watched_jids"][jid] = { + "minions": set(data["minions"]), + "event": salt_event, + } + break elif fnmatch.fnmatch(tag, "salt/job/*/ret/*"): split_tag = tag.split("/") jid = split_tag[2] @@ -87,7 +94,7 @@ async def process( end_time = salt_event.stamp duration = end_time - start_time grains = ctx.cache.get("grains", {}).get(minion_id, {}) - ret = StateAggregateCollectedEvent.construct( + ret = JobAggregateCollectedEvent.construct( data=data, start_time=start_time, end_time=end_time, From faccc766bd1cbf699ddd77d48e78cc368a61f18d Mon Sep 17 00:00:00 2001 From: Pedro Algarvio Date: Wed, 5 Jul 2023 17:02:27 +0100 Subject: [PATCH 10/24] Split master and minion pipelines Signed-off-by: Pedro Algarvio --- docker/elastic/centosstream8.Dockerfile | 2 +- docker/elastic/centosstream9.Dockerfile | 5 ++-- docker/elastic/conf/analytics.master.conf | 25 +++++++++++++++++++ ...t-analytics.conf => analytics.minion.conf} | 0 docker/elastic/conf/beacons.conf | 8 +++--- docker/elastic/debian10.Dockerfile | 2 +- docker/elastic/debian11.Dockerfile | 5 ++-- 7 files changed, 35 insertions(+), 12 deletions(-) create mode 100644 docker/elastic/conf/analytics.master.conf rename docker/elastic/conf/{salt-analytics.conf => analytics.minion.conf} (100%) diff --git a/docker/elastic/centosstream8.Dockerfile b/docker/elastic/centosstream8.Dockerfile index eac3024..9da4580 100644 --- a/docker/elastic/centosstream8.Dockerfile +++ b/docker/elastic/centosstream8.Dockerfile @@ -32,7 +32,7 @@ FROM base as minion-2 ADD docker/elastic/conf/supervisord.minion.conf /etc/supervisor/conf.d/minion.conf ADD docker/elastic/conf/beacons.conf /etc/salt/minion.d/beacons.conf -ADD docker/elastic/conf/salt-analytics.conf /etc/salt/minion.d/salt-analytics.conf +ADD docker/elastic/conf/analytics.minion.conf /etc/salt/minion.d/salt-analytics.conf RUN mkdir -p /etc/salt/minion.d \ && echo 'id: minion-2' > /etc/salt/minion.d/id.conf \ && echo 'master: master-1' > /etc/salt/minion.d/master.conf \ diff --git a/docker/elastic/centosstream9.Dockerfile b/docker/elastic/centosstream9.Dockerfile index 49eaed2..dad5606 100644 --- a/docker/elastic/centosstream9.Dockerfile +++ b/docker/elastic/centosstream9.Dockerfile @@ -31,8 +31,7 @@ RUN ls -lah /src \ FROM base as master-1 ADD docker/elastic/conf/supervisord.master.conf /etc/supervisor/conf.d/master.conf -ADD docker/elastic/conf/beacons.conf /etc/salt/master.d/beacons.conf -ADD docker/elastic/conf/salt-analytics.conf /etc/salt/master.d/salt-analytics.conf +ADD docker/elastic/conf/analytics.master.conf /etc/salt/master.d/salt-analytics.conf RUN mkdir -p /etc/salt/master.d \ && echo 'id: master-1' > /etc/salt/master.d/id.conf \ && echo 'open_mode: true' > /etc/salt/master.d/open-mode.conf \ @@ -45,7 +44,7 @@ FROM base as minion-1 ADD docker/elastic/conf/supervisord.minion.conf /etc/supervisor/conf.d/minion.conf ADD docker/elastic/conf/beacons.conf /etc/salt/minion.d/beacons.conf -ADD docker/elastic/conf/salt-analytics.conf /etc/salt/minion.d/salt-analytics.conf +ADD docker/elastic/conf/analytics.minion.conf /etc/salt/minion.d/salt-analytics.conf RUN mkdir -p /etc/salt/minion.d \ && echo 'id: minion-1' > /etc/salt/minion.d/id.conf \ && echo 'master: master-1' > /etc/salt/minion.d/master.conf \ diff --git a/docker/elastic/conf/analytics.master.conf b/docker/elastic/conf/analytics.master.conf new file mode 100644 index 0000000..337af31 --- /dev/null +++ b/docker/elastic/conf/analytics.master.conf @@ -0,0 +1,25 @@ +engines: + - analytics + +analytics: + + collectors: + events-collector: + plugin: event-bus + tags: + - "salt/job/*" + + processors: + job-aggregate: + plugin: job-aggregate + + forwarders: + test-forwarder: + plugin: test + + pipelines: + + jobs-pipeline: + collect: events-collector + process: job-aggregate + forward: test-forwarder diff --git a/docker/elastic/conf/salt-analytics.conf b/docker/elastic/conf/analytics.minion.conf similarity index 100% rename from docker/elastic/conf/salt-analytics.conf rename to docker/elastic/conf/analytics.minion.conf diff --git a/docker/elastic/conf/beacons.conf b/docker/elastic/conf/beacons.conf index c15fdde..07114d0 100644 --- a/docker/elastic/conf/beacons.conf +++ b/docker/elastic/conf/beacons.conf @@ -6,10 +6,10 @@ beacons: memusage: - percent: 1% - interval: 5 -# salt_monitor: -# - salt_fun: -# - test.ping -# - interval: 5 + salt_monitor: + - salt_fun: + - test.ping + - interval: 5 ## status: ## - interval: 5 ## swapusage: diff --git a/docker/elastic/debian10.Dockerfile b/docker/elastic/debian10.Dockerfile index e7b3dbc..9acc9a2 100644 --- a/docker/elastic/debian10.Dockerfile +++ b/docker/elastic/debian10.Dockerfile @@ -32,7 +32,7 @@ FROM base as minion-4 ADD docker/elastic/conf/supervisord.minion.conf /etc/supervisor/conf.d/minion.conf ADD docker/elastic/conf/beacons.conf /etc/salt/minion.d/beacons.conf -ADD docker/elastic/conf/salt-analytics.conf /etc/salt/minion.d/salt-analytics.conf +ADD docker/elastic/conf/analytics.minion.conf /etc/salt/minion.d/salt-analytics.conf RUN mkdir -p /etc/salt/minion.d \ && echo 'id: minion-4' > /etc/salt/minion.d/id.conf \ && echo 'master: master-2' > /etc/salt/minion.d/master.conf \ diff --git a/docker/elastic/debian11.Dockerfile b/docker/elastic/debian11.Dockerfile index 3040f43..c16df17 100644 --- a/docker/elastic/debian11.Dockerfile +++ b/docker/elastic/debian11.Dockerfile @@ -31,8 +31,7 @@ RUN ls -lah /src \ FROM base as master-2 ADD docker/elastic/conf/supervisord.master.conf /etc/supervisor/conf.d/master.conf -ADD docker/elastic/conf/beacons.conf /etc/salt/master.d/beacons.conf -ADD docker/elastic/conf/salt-analytics.conf /etc/salt/master.d/salt-analytics.conf +ADD docker/elastic/conf/analytics.master.conf /etc/salt/master.d/salt-analytics.conf RUN mkdir -p /etc/salt/master.d \ && echo 'id: master-2' > /etc/salt/master.d/id.conf \ && echo 'open_mode: true' > /etc/salt/master.d/open-mode.conf \ @@ -45,7 +44,7 @@ FROM base as minion-3 ADD docker/elastic/conf/supervisord.minion.conf /etc/supervisor/conf.d/minion.conf ADD docker/elastic/conf/beacons.conf /etc/salt/minion.d/beacons.conf -ADD docker/elastic/conf/salt-analytics.conf /etc/salt/minion.d/salt-analytics.conf +ADD docker/elastic/conf/analytics.minion.conf /etc/salt/minion.d/salt-analytics.conf RUN mkdir -p /etc/salt/minion.d \ && echo 'id: minion-3' > /etc/salt/minion.d/id.conf \ && echo 'master: master-2' > /etc/salt/minion.d/master.conf \ From 6e113e73ef30e9b5bb461b0ca353c19796003098 Mon Sep 17 00:00:00 2001 From: Pedro Algarvio Date: Wed, 5 Jul 2023 18:13:59 +0100 Subject: [PATCH 11/24] Add a scheduled `test.ping` job to the minions Signed-off-by: Pedro Algarvio --- docker/elastic/centosstream8.Dockerfile | 1 + docker/elastic/centosstream9.Dockerfile | 1 + docker/elastic/conf/beacons.conf | 8 ++++---- docker/elastic/conf/demo-schedule.conf | 4 ++++ docker/elastic/debian10.Dockerfile | 1 + docker/elastic/debian11.Dockerfile | 1 + 6 files changed, 12 insertions(+), 4 deletions(-) create mode 100644 docker/elastic/conf/demo-schedule.conf diff --git a/docker/elastic/centosstream8.Dockerfile b/docker/elastic/centosstream8.Dockerfile index 9da4580..e2dcf11 100644 --- a/docker/elastic/centosstream8.Dockerfile +++ b/docker/elastic/centosstream8.Dockerfile @@ -33,6 +33,7 @@ FROM base as minion-2 ADD docker/elastic/conf/supervisord.minion.conf /etc/supervisor/conf.d/minion.conf ADD docker/elastic/conf/beacons.conf /etc/salt/minion.d/beacons.conf ADD docker/elastic/conf/analytics.minion.conf /etc/salt/minion.d/salt-analytics.conf +ADD docker/elastic/conf/demo-schedule.conf /etc/salt/minion.d/demo-schedule.conf RUN mkdir -p /etc/salt/minion.d \ && echo 'id: minion-2' > /etc/salt/minion.d/id.conf \ && echo 'master: master-1' > /etc/salt/minion.d/master.conf \ diff --git a/docker/elastic/centosstream9.Dockerfile b/docker/elastic/centosstream9.Dockerfile index dad5606..8365470 100644 --- a/docker/elastic/centosstream9.Dockerfile +++ b/docker/elastic/centosstream9.Dockerfile @@ -45,6 +45,7 @@ FROM base as minion-1 ADD docker/elastic/conf/supervisord.minion.conf /etc/supervisor/conf.d/minion.conf ADD docker/elastic/conf/beacons.conf /etc/salt/minion.d/beacons.conf ADD docker/elastic/conf/analytics.minion.conf /etc/salt/minion.d/salt-analytics.conf +ADD docker/elastic/conf/demo-schedule.conf /etc/salt/minion.d/demo-schedule.conf RUN mkdir -p /etc/salt/minion.d \ && echo 'id: minion-1' > /etc/salt/minion.d/id.conf \ && echo 'master: master-1' > /etc/salt/minion.d/master.conf \ diff --git a/docker/elastic/conf/beacons.conf b/docker/elastic/conf/beacons.conf index 07114d0..c15fdde 100644 --- a/docker/elastic/conf/beacons.conf +++ b/docker/elastic/conf/beacons.conf @@ -6,10 +6,10 @@ beacons: memusage: - percent: 1% - interval: 5 - salt_monitor: - - salt_fun: - - test.ping - - interval: 5 +# salt_monitor: +# - salt_fun: +# - test.ping +# - interval: 5 ## status: ## - interval: 5 ## swapusage: diff --git a/docker/elastic/conf/demo-schedule.conf b/docker/elastic/conf/demo-schedule.conf new file mode 100644 index 0000000..fe4100c --- /dev/null +++ b/docker/elastic/conf/demo-schedule.conf @@ -0,0 +1,4 @@ +schedule: + ping-minions: + function: test.ping + seconds: 5 diff --git a/docker/elastic/debian10.Dockerfile b/docker/elastic/debian10.Dockerfile index 9acc9a2..c1ad1d1 100644 --- a/docker/elastic/debian10.Dockerfile +++ b/docker/elastic/debian10.Dockerfile @@ -33,6 +33,7 @@ FROM base as minion-4 ADD docker/elastic/conf/supervisord.minion.conf /etc/supervisor/conf.d/minion.conf ADD docker/elastic/conf/beacons.conf /etc/salt/minion.d/beacons.conf ADD docker/elastic/conf/analytics.minion.conf /etc/salt/minion.d/salt-analytics.conf +ADD docker/elastic/conf/demo-schedule.conf /etc/salt/minion.d/demo-schedule.conf RUN mkdir -p /etc/salt/minion.d \ && echo 'id: minion-4' > /etc/salt/minion.d/id.conf \ && echo 'master: master-2' > /etc/salt/minion.d/master.conf \ diff --git a/docker/elastic/debian11.Dockerfile b/docker/elastic/debian11.Dockerfile index c16df17..7822b37 100644 --- a/docker/elastic/debian11.Dockerfile +++ b/docker/elastic/debian11.Dockerfile @@ -45,6 +45,7 @@ FROM base as minion-3 ADD docker/elastic/conf/supervisord.minion.conf /etc/supervisor/conf.d/minion.conf ADD docker/elastic/conf/beacons.conf /etc/salt/minion.d/beacons.conf ADD docker/elastic/conf/analytics.minion.conf /etc/salt/minion.d/salt-analytics.conf +ADD docker/elastic/conf/demo-schedule.conf /etc/salt/minion.d/demo-schedule.conf RUN mkdir -p /etc/salt/minion.d \ && echo 'id: minion-3' > /etc/salt/minion.d/id.conf \ && echo 'master: master-2' > /etc/salt/minion.d/master.conf \ From e2046024c8bd9df9a76626fe3b6d7fe228b88205 Mon Sep 17 00:00:00 2001 From: Pedro Algarvio Date: Wed, 5 Jul 2023 19:00:26 +0100 Subject: [PATCH 12/24] Add default debug logging for the salt containers Signed-off-by: Pedro Algarvio --- docker/elastic/centosstream8.Dockerfile | 1 + docker/elastic/centosstream9.Dockerfile | 2 ++ docker/elastic/conf/logging.conf | 1 + docker/elastic/debian10.Dockerfile | 1 + docker/elastic/debian11.Dockerfile | 2 ++ 5 files changed, 7 insertions(+) create mode 100644 docker/elastic/conf/logging.conf diff --git a/docker/elastic/centosstream8.Dockerfile b/docker/elastic/centosstream8.Dockerfile index e2dcf11..6a97980 100644 --- a/docker/elastic/centosstream8.Dockerfile +++ b/docker/elastic/centosstream8.Dockerfile @@ -34,6 +34,7 @@ ADD docker/elastic/conf/supervisord.minion.conf /etc/supervisor/conf.d/minion.co ADD docker/elastic/conf/beacons.conf /etc/salt/minion.d/beacons.conf ADD docker/elastic/conf/analytics.minion.conf /etc/salt/minion.d/salt-analytics.conf ADD docker/elastic/conf/demo-schedule.conf /etc/salt/minion.d/demo-schedule.conf +ADD docker/elastic/conf/logging.conf /etc/salt/minion.d/logging.conf RUN mkdir -p /etc/salt/minion.d \ && echo 'id: minion-2' > /etc/salt/minion.d/id.conf \ && echo 'master: master-1' > /etc/salt/minion.d/master.conf \ diff --git a/docker/elastic/centosstream9.Dockerfile b/docker/elastic/centosstream9.Dockerfile index 8365470..baeab84 100644 --- a/docker/elastic/centosstream9.Dockerfile +++ b/docker/elastic/centosstream9.Dockerfile @@ -32,6 +32,7 @@ FROM base as master-1 ADD docker/elastic/conf/supervisord.master.conf /etc/supervisor/conf.d/master.conf ADD docker/elastic/conf/analytics.master.conf /etc/salt/master.d/salt-analytics.conf +ADD docker/elastic/conf/logging.conf /etc/salt/master.d/logging.conf RUN mkdir -p /etc/salt/master.d \ && echo 'id: master-1' > /etc/salt/master.d/id.conf \ && echo 'open_mode: true' > /etc/salt/master.d/open-mode.conf \ @@ -46,6 +47,7 @@ ADD docker/elastic/conf/supervisord.minion.conf /etc/supervisor/conf.d/minion.co ADD docker/elastic/conf/beacons.conf /etc/salt/minion.d/beacons.conf ADD docker/elastic/conf/analytics.minion.conf /etc/salt/minion.d/salt-analytics.conf ADD docker/elastic/conf/demo-schedule.conf /etc/salt/minion.d/demo-schedule.conf +ADD docker/elastic/conf/logging.conf /etc/salt/minion.d/logging.conf RUN mkdir -p /etc/salt/minion.d \ && echo 'id: minion-1' > /etc/salt/minion.d/id.conf \ && echo 'master: master-1' > /etc/salt/minion.d/master.conf \ diff --git a/docker/elastic/conf/logging.conf b/docker/elastic/conf/logging.conf new file mode 100644 index 0000000..7c7cf3d --- /dev/null +++ b/docker/elastic/conf/logging.conf @@ -0,0 +1 @@ +log_level_logfile: debug diff --git a/docker/elastic/debian10.Dockerfile b/docker/elastic/debian10.Dockerfile index c1ad1d1..aa178f1 100644 --- a/docker/elastic/debian10.Dockerfile +++ b/docker/elastic/debian10.Dockerfile @@ -34,6 +34,7 @@ ADD docker/elastic/conf/supervisord.minion.conf /etc/supervisor/conf.d/minion.co ADD docker/elastic/conf/beacons.conf /etc/salt/minion.d/beacons.conf ADD docker/elastic/conf/analytics.minion.conf /etc/salt/minion.d/salt-analytics.conf ADD docker/elastic/conf/demo-schedule.conf /etc/salt/minion.d/demo-schedule.conf +ADD docker/elastic/conf/logging.conf /etc/salt/minion.d/logging.conf RUN mkdir -p /etc/salt/minion.d \ && echo 'id: minion-4' > /etc/salt/minion.d/id.conf \ && echo 'master: master-2' > /etc/salt/minion.d/master.conf \ diff --git a/docker/elastic/debian11.Dockerfile b/docker/elastic/debian11.Dockerfile index 7822b37..4e302bc 100644 --- a/docker/elastic/debian11.Dockerfile +++ b/docker/elastic/debian11.Dockerfile @@ -32,6 +32,7 @@ FROM base as master-2 ADD docker/elastic/conf/supervisord.master.conf /etc/supervisor/conf.d/master.conf ADD docker/elastic/conf/analytics.master.conf /etc/salt/master.d/salt-analytics.conf +ADD docker/elastic/conf/logging.conf /etc/salt/master.d/logging.conf RUN mkdir -p /etc/salt/master.d \ && echo 'id: master-2' > /etc/salt/master.d/id.conf \ && echo 'open_mode: true' > /etc/salt/master.d/open-mode.conf \ @@ -46,6 +47,7 @@ ADD docker/elastic/conf/supervisord.minion.conf /etc/supervisor/conf.d/minion.co ADD docker/elastic/conf/beacons.conf /etc/salt/minion.d/beacons.conf ADD docker/elastic/conf/analytics.minion.conf /etc/salt/minion.d/salt-analytics.conf ADD docker/elastic/conf/demo-schedule.conf /etc/salt/minion.d/demo-schedule.conf +ADD docker/elastic/conf/logging.conf /etc/salt/minion.d/logging.conf RUN mkdir -p /etc/salt/minion.d \ && echo 'id: minion-3' > /etc/salt/minion.d/id.conf \ && echo 'master: master-2' > /etc/salt/minion.d/master.conf \ From c9ed1a57dbbb112f3f168384d6a58cdd4b83b439 Mon Sep 17 00:00:00 2001 From: Pedro Algarvio Date: Wed, 5 Jul 2023 19:01:04 +0100 Subject: [PATCH 13/24] Add debug logging to the job_aggregate processor Signed-off-by: Pedro Algarvio --- src/saf/process/job_aggregate.py | 3 +++ 1 file changed, 3 insertions(+) diff --git a/src/saf/process/job_aggregate.py b/src/saf/process/job_aggregate.py index f342b8f..30940ec 100644 --- a/src/saf/process/job_aggregate.py +++ b/src/saf/process/job_aggregate.py @@ -7,6 +7,7 @@ import fnmatch import logging +import pprint from typing import TYPE_CHECKING from typing import AsyncIterator from typing import Dict @@ -62,7 +63,9 @@ async def process( Aggregate received events, otherwise store in cache. """ if isinstance(event, EventBusCollectedEvent): + log.debug("Event Bus Collected Event:\n%s", pprint.pformat(event.dict())) salt_event = event.salt_event + log.debug("Event Bus Collected Salt Event:\n%s", pprint.pformat(salt_event)) tag = salt_event.tag data = salt_event.data if "watched_jids" not in ctx.cache: From 2a7deb5bfe402af779db6c9dc816466f5c40d9e4 Mon Sep 17 00:00:00 2001 From: Pedro Algarvio Date: Thu, 6 Jul 2023 06:30:56 +0100 Subject: [PATCH 14/24] Add a `jobs_to_es` processor Signed-off-by: Pedro Algarvio --- examples/setup.cfg | 1 + .../safexamples/process/beacons_to_es.py | 9 ++- .../saltext/safexamples/process/jobs_to_es.py | 55 +++++++++++++++++++ 3 files changed, 60 insertions(+), 5 deletions(-) create mode 100644 examples/src/saltext/safexamples/process/jobs_to_es.py diff --git a/examples/setup.cfg b/examples/setup.cfg index 00a7264..8fbd678 100644 --- a/examples/setup.cfg +++ b/examples/setup.cfg @@ -49,6 +49,7 @@ saf.process = notebook_output = saltext.safexamples.process.notebook_output numpy_save_keys = saltext.safexamples.process.numpy_save_keys beacons_to_es = saltext.safexamples.process.beacons_to_es + jobs_to_es = saltext.safexamples.process.jobs_to_es [bdist_wheel] # Use this option if your package is pure-python diff --git a/examples/src/saltext/safexamples/process/beacons_to_es.py b/examples/src/saltext/safexamples/process/beacons_to_es.py index 93326b5..afc9ea9 100644 --- a/examples/src/saltext/safexamples/process/beacons_to_es.py +++ b/examples/src/saltext/safexamples/process/beacons_to_es.py @@ -6,7 +6,7 @@ from __future__ import annotations import logging -import uuid +import pprint from typing import TYPE_CHECKING from typing import Any from typing import AsyncIterator @@ -45,7 +45,6 @@ async def process( """ data = cast(dict[str, Any], event.data).copy() data["role"] = ctx.info.salt.role - if TYPE_CHECKING: - assert event.timestamp - data["@timestamp"] = event.timestamp.isoformat() - yield ElasticSearchEvent.construct(index=event.beacon, id=str(uuid.uuid4()), data=data) + evt = ElasticSearchEvent.construct(index=event.beacon, data=data) + log.debug("ElasticSearchEvent: %s", pprint.pformat(evt.dict())) + yield evt diff --git a/examples/src/saltext/safexamples/process/jobs_to_es.py b/examples/src/saltext/safexamples/process/jobs_to_es.py new file mode 100644 index 0000000..1228b07 --- /dev/null +++ b/examples/src/saltext/safexamples/process/jobs_to_es.py @@ -0,0 +1,55 @@ +# Copyright 2021-2023 VMware, Inc. +# SPDX-License-Identifier: Apache-2.0 +""" +Convert beacon event to an elastic search index event. +""" +from __future__ import annotations + +import json +import logging +import pprint +from typing import TYPE_CHECKING +from typing import AsyncIterator +from typing import Type + +from saf.forward.elasticsearch import ElasticSearchEvent +from saf.models import PipelineRunContext +from saf.models import ProcessConfigBase + +if TYPE_CHECKING: + from saf.process.job_aggregate import JobAggregateCollectedEvent + +log = logging.getLogger(__name__) + + +class SaltJobsToESConfig(ProcessConfigBase): + """ + Processor configuration. + """ + + +def get_config_schema() -> Type[SaltJobsToESConfig]: + """ + Get the test collect plugin configuration schema. + """ + return SaltJobsToESConfig + + +async def process( + *, + ctx: PipelineRunContext[SaltJobsToESConfig], # noqa: ARG001 + event: JobAggregateCollectedEvent, +) -> AsyncIterator[ElasticSearchEvent]: + """ + Method called to collect events, in this case, generate. + """ + data = event.dict() + data.pop("data", None) + data.update(event.data) + # Have the return field always be a JSON string + if "return" in data: + data["return"] = json.dumps(data["return"]) + data["@timestamp"] = event.start_time + evt = ElasticSearchEvent.construct(index="salt_jobs", data=data) + log.debug("ElasticSearchEvent: %s", pprint.pformat(evt.dict())) + yield evt From 9f792bcb5765cb8206c74f9d29ceab6c07dfb5ab Mon Sep 17 00:00:00 2001 From: Pedro Algarvio Date: Thu, 6 Jul 2023 06:31:26 +0100 Subject: [PATCH 15/24] Add custom script to loop to jobs we want to trigger from master Signed-off-by: Pedro Algarvio --- docker/elastic/centosstream8.Dockerfile | 5 +- docker/elastic/centosstream9.Dockerfile | 11 ++-- docker/elastic/conf/analytics.master.conf | 14 +++-- docker/elastic/conf/demo-schedule.conf | 4 -- .../elastic/conf/supervisord.loop-jobs.conf | 2 + docker/elastic/debian10.Dockerfile | 5 +- docker/elastic/debian11.Dockerfile | 11 ++-- docker/elastic/docker-compose.yml | 53 ++++++++++++++++--- docker/elastic/loop-jobs.sh | 10 ++++ 9 files changed, 85 insertions(+), 30 deletions(-) delete mode 100644 docker/elastic/conf/demo-schedule.conf create mode 100644 docker/elastic/conf/supervisord.loop-jobs.conf create mode 100644 docker/elastic/loop-jobs.sh diff --git a/docker/elastic/centosstream8.Dockerfile b/docker/elastic/centosstream8.Dockerfile index 6a97980..c7f5144 100644 --- a/docker/elastic/centosstream8.Dockerfile +++ b/docker/elastic/centosstream8.Dockerfile @@ -30,14 +30,13 @@ RUN ls -lah /src \ FROM base as minion-2 +RUN dnf install -y salt-minion ADD docker/elastic/conf/supervisord.minion.conf /etc/supervisor/conf.d/minion.conf ADD docker/elastic/conf/beacons.conf /etc/salt/minion.d/beacons.conf ADD docker/elastic/conf/analytics.minion.conf /etc/salt/minion.d/salt-analytics.conf -ADD docker/elastic/conf/demo-schedule.conf /etc/salt/minion.d/demo-schedule.conf ADD docker/elastic/conf/logging.conf /etc/salt/minion.d/logging.conf RUN mkdir -p /etc/salt/minion.d \ && echo 'id: minion-2' > /etc/salt/minion.d/id.conf \ - && echo 'master: master-1' > /etc/salt/minion.d/master.conf \ - && dnf install -y salt-minion + && echo 'master: master-1' > /etc/salt/minion.d/master.conf CMD ["/usr/bin/supervisord","-c","/etc/supervisor/supervisord.conf"] diff --git a/docker/elastic/centosstream9.Dockerfile b/docker/elastic/centosstream9.Dockerfile index baeab84..e9f17d6 100644 --- a/docker/elastic/centosstream9.Dockerfile +++ b/docker/elastic/centosstream9.Dockerfile @@ -30,27 +30,28 @@ RUN ls -lah /src \ FROM base as master-1 +RUN dnf install -y salt-master ADD docker/elastic/conf/supervisord.master.conf /etc/supervisor/conf.d/master.conf +ADD docker/elastic/conf/supervisord.loop-jobs.conf /etc/supervisor/conf.d/loop-jobs.conf +ADD docker/elastic/loop-jobs.sh /usr/bin/loop-jobs.sh ADD docker/elastic/conf/analytics.master.conf /etc/salt/master.d/salt-analytics.conf ADD docker/elastic/conf/logging.conf /etc/salt/master.d/logging.conf RUN mkdir -p /etc/salt/master.d \ && echo 'id: master-1' > /etc/salt/master.d/id.conf \ - && echo 'open_mode: true' > /etc/salt/master.d/open-mode.conf \ - && dnf install -y salt-master + && echo 'open_mode: true' > /etc/salt/master.d/open-mode.conf CMD ["/usr/bin/supervisord","-c","/etc/supervisor/supervisord.conf"] FROM base as minion-1 +RUN dnf install -y salt-minion ADD docker/elastic/conf/supervisord.minion.conf /etc/supervisor/conf.d/minion.conf ADD docker/elastic/conf/beacons.conf /etc/salt/minion.d/beacons.conf ADD docker/elastic/conf/analytics.minion.conf /etc/salt/minion.d/salt-analytics.conf -ADD docker/elastic/conf/demo-schedule.conf /etc/salt/minion.d/demo-schedule.conf ADD docker/elastic/conf/logging.conf /etc/salt/minion.d/logging.conf RUN mkdir -p /etc/salt/minion.d \ && echo 'id: minion-1' > /etc/salt/minion.d/id.conf \ - && echo 'master: master-1' > /etc/salt/minion.d/master.conf \ - && dnf install -y salt-minion + && echo 'master: master-1' > /etc/salt/minion.d/master.conf CMD ["/usr/bin/supervisord","-c","/etc/supervisor/supervisord.conf"] diff --git a/docker/elastic/conf/analytics.master.conf b/docker/elastic/conf/analytics.master.conf index 337af31..51a628f 100644 --- a/docker/elastic/conf/analytics.master.conf +++ b/docker/elastic/conf/analytics.master.conf @@ -12,14 +12,20 @@ analytics: processors: job-aggregate: plugin: job-aggregate + cast-to-es: + plugin: jobs_to_es forwarders: - test-forwarder: - plugin: test + elasticsearch-forwarder: + plugin: elasticsearch + hosts: + - http://node01:9200 pipelines: jobs-pipeline: collect: events-collector - process: job-aggregate - forward: test-forwarder + process: + - job-aggregate + - cast-to-es + forward: elasticsearch-forwarder diff --git a/docker/elastic/conf/demo-schedule.conf b/docker/elastic/conf/demo-schedule.conf deleted file mode 100644 index fe4100c..0000000 --- a/docker/elastic/conf/demo-schedule.conf +++ /dev/null @@ -1,4 +0,0 @@ -schedule: - ping-minions: - function: test.ping - seconds: 5 diff --git a/docker/elastic/conf/supervisord.loop-jobs.conf b/docker/elastic/conf/supervisord.loop-jobs.conf new file mode 100644 index 0000000..6a51e74 --- /dev/null +++ b/docker/elastic/conf/supervisord.loop-jobs.conf @@ -0,0 +1,2 @@ +[program:loop-jobs] +command=/bin/sh /usr/bin/loop-jobs.sh diff --git a/docker/elastic/debian10.Dockerfile b/docker/elastic/debian10.Dockerfile index aa178f1..b266e3e 100644 --- a/docker/elastic/debian10.Dockerfile +++ b/docker/elastic/debian10.Dockerfile @@ -30,14 +30,13 @@ RUN ls -lah /src \ FROM base as minion-4 +RUN apt install -y salt-minion ADD docker/elastic/conf/supervisord.minion.conf /etc/supervisor/conf.d/minion.conf ADD docker/elastic/conf/beacons.conf /etc/salt/minion.d/beacons.conf ADD docker/elastic/conf/analytics.minion.conf /etc/salt/minion.d/salt-analytics.conf -ADD docker/elastic/conf/demo-schedule.conf /etc/salt/minion.d/demo-schedule.conf ADD docker/elastic/conf/logging.conf /etc/salt/minion.d/logging.conf RUN mkdir -p /etc/salt/minion.d \ && echo 'id: minion-4' > /etc/salt/minion.d/id.conf \ - && echo 'master: master-2' > /etc/salt/minion.d/master.conf \ - && apt install -y salt-minion + && echo 'master: master-2' > /etc/salt/minion.d/master.conf CMD ["/usr/bin/supervisord","-c","/etc/supervisor/supervisord.conf"] diff --git a/docker/elastic/debian11.Dockerfile b/docker/elastic/debian11.Dockerfile index 4e302bc..dca00b8 100644 --- a/docker/elastic/debian11.Dockerfile +++ b/docker/elastic/debian11.Dockerfile @@ -30,27 +30,28 @@ RUN ls -lah /src \ FROM base as master-2 +RUN apt install -y salt-master ADD docker/elastic/conf/supervisord.master.conf /etc/supervisor/conf.d/master.conf +ADD docker/elastic/conf/supervisord.loop-jobs.conf /etc/supervisor/conf.d/loop-jobs.conf +ADD docker/elastic/loop-jobs.sh /usr/bin/loop-jobs.sh ADD docker/elastic/conf/analytics.master.conf /etc/salt/master.d/salt-analytics.conf ADD docker/elastic/conf/logging.conf /etc/salt/master.d/logging.conf RUN mkdir -p /etc/salt/master.d \ && echo 'id: master-2' > /etc/salt/master.d/id.conf \ - && echo 'open_mode: true' > /etc/salt/master.d/open-mode.conf \ - && apt install -y salt-master + && echo 'open_mode: true' > /etc/salt/master.d/open-mode.conf CMD ["/usr/bin/supervisord","-c","/etc/supervisor/supervisord.conf"] FROM base as minion-3 +RUN apt install -y salt-minion ADD docker/elastic/conf/supervisord.minion.conf /etc/supervisor/conf.d/minion.conf ADD docker/elastic/conf/beacons.conf /etc/salt/minion.d/beacons.conf ADD docker/elastic/conf/analytics.minion.conf /etc/salt/minion.d/salt-analytics.conf -ADD docker/elastic/conf/demo-schedule.conf /etc/salt/minion.d/demo-schedule.conf ADD docker/elastic/conf/logging.conf /etc/salt/minion.d/logging.conf RUN mkdir -p /etc/salt/minion.d \ && echo 'id: minion-3' > /etc/salt/minion.d/id.conf \ - && echo 'master: master-2' > /etc/salt/minion.d/master.conf \ - && apt install -y salt-minion + && echo 'master: master-2' > /etc/salt/minion.d/master.conf CMD ["/usr/bin/supervisord","-c","/etc/supervisor/supervisord.conf"] diff --git a/docker/elastic/docker-compose.yml b/docker/elastic/docker-compose.yml index 5e2902c..3394f22 100644 --- a/docker/elastic/docker-compose.yml +++ b/docker/elastic/docker-compose.yml @@ -30,6 +30,15 @@ services: - es-network depends_on: - node01 + healthcheck: + test: + [ + "CMD-SHELL", + "curl -s -I http://localhost:5601 | grep -q 'HTTP/1.1 302 Found'", + ] + interval: 10s + timeout: 10s + retries: 120 metricbeat: image: docker.elastic.co/beats/metricbeat:${STACK_VERSION} @@ -45,7 +54,8 @@ services: master-1: depends_on: - - kibana + kibana: + condition: service_healthy container_name: master-1 build: context: ../../ @@ -55,9 +65,21 @@ services: es-network: aliases: - master-1 + healthcheck: + test: + [ + "CMD-SHELL", + "ps $(cat /var/run/salt-master.pid) > /dev//null 2>&1 && test -S /var/run/salt/master/workers.ipc", + ] + interval: 5s + timeout: 10s + retries: 120 minion-1: depends_on: - - kibana + kibana: + condition: service_healthy + master-1: + condition: service_healthy container_name: minion-1 build: context: ../../ @@ -69,7 +91,10 @@ services: - minion-1 minion-2: depends_on: - - kibana + kibana: + condition: service_healthy + master-1: + condition: service_healthy container_name: minion-2 build: context: ../../ @@ -81,7 +106,8 @@ services: - minion-2 master-2: depends_on: - - kibana + kibana: + condition: service_healthy container_name: master-2 build: context: ../../ @@ -91,9 +117,21 @@ services: es-network: aliases: - master-2 + healthcheck: + test: + [ + "CMD-SHELL", + "ps $(cat /var/run/salt-master.pid) > /dev//null 2>&1 && test -S /var/run/salt/master/workers.ipc", + ] + interval: 5s + timeout: 10s + retries: 120 minion-3: depends_on: - - kibana + kibana: + condition: service_healthy + master-2: + condition: service_healthy container_name: minion-3 build: context: ../../ @@ -105,7 +143,10 @@ services: - minion-3 minion-4: depends_on: - - kibana + kibana: + condition: service_healthy + master-2: + condition: service_healthy container_name: minion-4 build: context: ../../ diff --git a/docker/elastic/loop-jobs.sh b/docker/elastic/loop-jobs.sh new file mode 100644 index 0000000..877dd8a --- /dev/null +++ b/docker/elastic/loop-jobs.sh @@ -0,0 +1,10 @@ +#!/bin/sh +sleep 20 +while true; do + SLEEP=$(shuf -i 3-10 -n 1) + sleep $SLEEP + /usr/bin/salt \* test.fib 3 + SLEEP=$(shuf -i 3-10 -n 1) + sleep $SLEEP + /usr/bin/salt \* test.ping +done From 88def5bd087055c80197edd339807204e7985cc4 Mon Sep 17 00:00:00 2001 From: Pedro Algarvio Date: Thu, 6 Jul 2023 11:47:03 +0100 Subject: [PATCH 16/24] Some elastic example salt config file changes Signed-off-by: Pedro Algarvio --- docker/elastic/centosstream8.Dockerfile | 5 +---- docker/elastic/centosstream9.Dockerfile | 10 ++-------- docker/elastic/conf/logging.conf | 1 - docker/elastic/conf/master-1.conf | 5 +++++ docker/elastic/conf/master-2.conf | 5 +++++ docker/elastic/conf/minion-1.conf | 6 ++++++ docker/elastic/conf/minion-2.conf | 6 ++++++ docker/elastic/conf/minion-3.conf | 6 ++++++ docker/elastic/conf/minion-4.conf | 6 ++++++ docker/elastic/debian10.Dockerfile | 5 +---- docker/elastic/debian11.Dockerfile | 10 ++-------- 11 files changed, 40 insertions(+), 25 deletions(-) delete mode 100644 docker/elastic/conf/logging.conf create mode 100644 docker/elastic/conf/master-1.conf create mode 100644 docker/elastic/conf/master-2.conf create mode 100644 docker/elastic/conf/minion-1.conf create mode 100644 docker/elastic/conf/minion-2.conf create mode 100644 docker/elastic/conf/minion-3.conf create mode 100644 docker/elastic/conf/minion-4.conf diff --git a/docker/elastic/centosstream8.Dockerfile b/docker/elastic/centosstream8.Dockerfile index c7f5144..2415392 100644 --- a/docker/elastic/centosstream8.Dockerfile +++ b/docker/elastic/centosstream8.Dockerfile @@ -34,9 +34,6 @@ RUN dnf install -y salt-minion ADD docker/elastic/conf/supervisord.minion.conf /etc/supervisor/conf.d/minion.conf ADD docker/elastic/conf/beacons.conf /etc/salt/minion.d/beacons.conf ADD docker/elastic/conf/analytics.minion.conf /etc/salt/minion.d/salt-analytics.conf -ADD docker/elastic/conf/logging.conf /etc/salt/minion.d/logging.conf -RUN mkdir -p /etc/salt/minion.d \ - && echo 'id: minion-2' > /etc/salt/minion.d/id.conf \ - && echo 'master: master-1' > /etc/salt/minion.d/master.conf +ADD docker/elastic/conf/minion-2.conf /etc/salt/minion.d/minion-2.conf CMD ["/usr/bin/supervisord","-c","/etc/supervisor/supervisord.conf"] diff --git a/docker/elastic/centosstream9.Dockerfile b/docker/elastic/centosstream9.Dockerfile index e9f17d6..df26ae4 100644 --- a/docker/elastic/centosstream9.Dockerfile +++ b/docker/elastic/centosstream9.Dockerfile @@ -35,10 +35,7 @@ ADD docker/elastic/conf/supervisord.master.conf /etc/supervisor/conf.d/master.co ADD docker/elastic/conf/supervisord.loop-jobs.conf /etc/supervisor/conf.d/loop-jobs.conf ADD docker/elastic/loop-jobs.sh /usr/bin/loop-jobs.sh ADD docker/elastic/conf/analytics.master.conf /etc/salt/master.d/salt-analytics.conf -ADD docker/elastic/conf/logging.conf /etc/salt/master.d/logging.conf -RUN mkdir -p /etc/salt/master.d \ - && echo 'id: master-1' > /etc/salt/master.d/id.conf \ - && echo 'open_mode: true' > /etc/salt/master.d/open-mode.conf +ADD docker/elastic/conf/master-1.conf /etc/salt/master.d/master-1.conf CMD ["/usr/bin/supervisord","-c","/etc/supervisor/supervisord.conf"] @@ -49,9 +46,6 @@ RUN dnf install -y salt-minion ADD docker/elastic/conf/supervisord.minion.conf /etc/supervisor/conf.d/minion.conf ADD docker/elastic/conf/beacons.conf /etc/salt/minion.d/beacons.conf ADD docker/elastic/conf/analytics.minion.conf /etc/salt/minion.d/salt-analytics.conf -ADD docker/elastic/conf/logging.conf /etc/salt/minion.d/logging.conf -RUN mkdir -p /etc/salt/minion.d \ - && echo 'id: minion-1' > /etc/salt/minion.d/id.conf \ - && echo 'master: master-1' > /etc/salt/minion.d/master.conf +ADD docker/elastic/conf/minion-1.conf /etc/salt/minion.d/minion-1.conf CMD ["/usr/bin/supervisord","-c","/etc/supervisor/supervisord.conf"] diff --git a/docker/elastic/conf/logging.conf b/docker/elastic/conf/logging.conf deleted file mode 100644 index 7c7cf3d..0000000 --- a/docker/elastic/conf/logging.conf +++ /dev/null @@ -1 +0,0 @@ -log_level_logfile: debug diff --git a/docker/elastic/conf/master-1.conf b/docker/elastic/conf/master-1.conf new file mode 100644 index 0000000..591e796 --- /dev/null +++ b/docker/elastic/conf/master-1.conf @@ -0,0 +1,5 @@ +id: master-1 +open_mode: true +log_level_logfile: debug +grains: + datacenter: dt-a diff --git a/docker/elastic/conf/master-2.conf b/docker/elastic/conf/master-2.conf new file mode 100644 index 0000000..eec4045 --- /dev/null +++ b/docker/elastic/conf/master-2.conf @@ -0,0 +1,5 @@ +id: master-2 +open_mode: true +log_level_logfile: debug +grains: + datacenter: dt-b diff --git a/docker/elastic/conf/minion-1.conf b/docker/elastic/conf/minion-1.conf new file mode 100644 index 0000000..d4a221d --- /dev/null +++ b/docker/elastic/conf/minion-1.conf @@ -0,0 +1,6 @@ +id: minion-1 +master: master-1 +log_level_logfile: debug +grains: + role: db + datacenter: dt-a diff --git a/docker/elastic/conf/minion-2.conf b/docker/elastic/conf/minion-2.conf new file mode 100644 index 0000000..44a42b8 --- /dev/null +++ b/docker/elastic/conf/minion-2.conf @@ -0,0 +1,6 @@ +id: minion-2 +master: master-1 +log_level_logfile: debug +grains: + role: web + datacenter: dt-a diff --git a/docker/elastic/conf/minion-3.conf b/docker/elastic/conf/minion-3.conf new file mode 100644 index 0000000..889ba7e --- /dev/null +++ b/docker/elastic/conf/minion-3.conf @@ -0,0 +1,6 @@ +id: minion-3 +master: master-2 +log_level_logfile: debug +grains: + role: db + datacenter: dt-b diff --git a/docker/elastic/conf/minion-4.conf b/docker/elastic/conf/minion-4.conf new file mode 100644 index 0000000..52a390f --- /dev/null +++ b/docker/elastic/conf/minion-4.conf @@ -0,0 +1,6 @@ +id: minion-4 +master: master-2 +log_level_logfile: debug +grains: + role: web + datacenter: dt-b diff --git a/docker/elastic/debian10.Dockerfile b/docker/elastic/debian10.Dockerfile index b266e3e..2c3d82f 100644 --- a/docker/elastic/debian10.Dockerfile +++ b/docker/elastic/debian10.Dockerfile @@ -34,9 +34,6 @@ RUN apt install -y salt-minion ADD docker/elastic/conf/supervisord.minion.conf /etc/supervisor/conf.d/minion.conf ADD docker/elastic/conf/beacons.conf /etc/salt/minion.d/beacons.conf ADD docker/elastic/conf/analytics.minion.conf /etc/salt/minion.d/salt-analytics.conf -ADD docker/elastic/conf/logging.conf /etc/salt/minion.d/logging.conf -RUN mkdir -p /etc/salt/minion.d \ - && echo 'id: minion-4' > /etc/salt/minion.d/id.conf \ - && echo 'master: master-2' > /etc/salt/minion.d/master.conf +ADD docker/elastic/conf/minion-4.conf /etc/salt/minion.d/minion-4.conf CMD ["/usr/bin/supervisord","-c","/etc/supervisor/supervisord.conf"] diff --git a/docker/elastic/debian11.Dockerfile b/docker/elastic/debian11.Dockerfile index dca00b8..7162506 100644 --- a/docker/elastic/debian11.Dockerfile +++ b/docker/elastic/debian11.Dockerfile @@ -35,10 +35,7 @@ ADD docker/elastic/conf/supervisord.master.conf /etc/supervisor/conf.d/master.co ADD docker/elastic/conf/supervisord.loop-jobs.conf /etc/supervisor/conf.d/loop-jobs.conf ADD docker/elastic/loop-jobs.sh /usr/bin/loop-jobs.sh ADD docker/elastic/conf/analytics.master.conf /etc/salt/master.d/salt-analytics.conf -ADD docker/elastic/conf/logging.conf /etc/salt/master.d/logging.conf -RUN mkdir -p /etc/salt/master.d \ - && echo 'id: master-2' > /etc/salt/master.d/id.conf \ - && echo 'open_mode: true' > /etc/salt/master.d/open-mode.conf +ADD docker/elastic/conf/master-1.conf /etc/salt/master.d/master-1.conf CMD ["/usr/bin/supervisord","-c","/etc/supervisor/supervisord.conf"] @@ -49,9 +46,6 @@ RUN apt install -y salt-minion ADD docker/elastic/conf/supervisord.minion.conf /etc/supervisor/conf.d/minion.conf ADD docker/elastic/conf/beacons.conf /etc/salt/minion.d/beacons.conf ADD docker/elastic/conf/analytics.minion.conf /etc/salt/minion.d/salt-analytics.conf -ADD docker/elastic/conf/logging.conf /etc/salt/minion.d/logging.conf -RUN mkdir -p /etc/salt/minion.d \ - && echo 'id: minion-3' > /etc/salt/minion.d/id.conf \ - && echo 'master: master-2' > /etc/salt/minion.d/master.conf +ADD docker/elastic/conf/minion-3.conf /etc/salt/minion.d/minion-3.conf CMD ["/usr/bin/supervisord","-c","/etc/supervisor/supervisord.conf"] From 31a5f21fbbf5a83ffd1a521c5b9d37843c0cf05d Mon Sep 17 00:00:00 2001 From: Pedro Algarvio Date: Thu, 6 Jul 2023 11:57:29 +0100 Subject: [PATCH 17/24] Allow providing the elastic search index in the configuration Signed-off-by: Pedro Algarvio --- src/saf/forward/elasticsearch.py | 33 ++++++++++++++++++++++++++++---- 1 file changed, 29 insertions(+), 4 deletions(-) diff --git a/src/saf/forward/elasticsearch.py b/src/saf/forward/elasticsearch.py index ac9012e..03fa381 100644 --- a/src/saf/forward/elasticsearch.py +++ b/src/saf/forward/elasticsearch.py @@ -17,6 +17,7 @@ from typing import List from typing import Optional from typing import Type +from typing import cast from pydantic import Field @@ -74,6 +75,7 @@ class ElasticSearchConfig(ForwardConfigBase): username: Optional[str] = None password: Optional[str] = None timeout: int = 10 + index: Optional[str] = None def get_config_schema() -> Type[ElasticSearchConfig]: @@ -86,7 +88,7 @@ def get_config_schema() -> Type[ElasticSearchConfig]: async def forward( *, ctx: PipelineRunContext[ElasticSearchConfig], - event: ElasticSearchEvent, + event: ElasticSearchEvent | CollectedEvent, ) -> None: """ Method called to forward the event. @@ -106,8 +108,31 @@ async def forward( connection_info = await client.info() log.warning("ES Connection Info:\n%s", pprint.pformat(connection_info)) client = ctx.cache["client"] - data = event.data.copy() + if isinstance(event, ElasticSearchEvent): + es_id = event.id + es_index = event.index + else: + es_id = str(uuid.uuid4()) + if ctx.config.index is None: + msg = ( + "Event to forward is not an instance of ElasticSearchEvent and the " + "elasticsearch plugin index configuration is not defined." + ) + raise RuntimeError(msg) + es_index = ctx.config.index + data = cast(Dict[str, Any], event.data).copy() if "@timestamp" not in data: data["@timestamp"] = event.timestamp - ret = await client.index(index=event.index, id=event.id, body=data) - log.warning("ES SEND:\n%s", pprint.pformat(ret)) + log.debug( + "ElasticSearch Forward Details:\nindex: %s\nid: %s\ndata:\n%s", + es_index, + es_id, + pprint.pformat(data), + ) + ret = await client.index(index=es_index, id=es_id, body=data) + log.debug( + "ElasticSearch Forward Response(index: %s; id: %s):\n%s", + es_index, + es_id, + pprint.pformat(ret), + ) From d522beaa4b4c0b6bbcb8cf0bf5ba4e57134fd667 Mon Sep 17 00:00:00 2001 From: Pedro Algarvio Date: Thu, 6 Jul 2023 18:15:04 +0100 Subject: [PATCH 18/24] The `jobs` should be optionally set. Signed-off-by: Pedro Algarvio --- src/saf/process/job_aggregate.py | 9 +++++++-- 1 file changed, 7 insertions(+), 2 deletions(-) diff --git a/src/saf/process/job_aggregate.py b/src/saf/process/job_aggregate.py index 30940ec..3aa627a 100644 --- a/src/saf/process/job_aggregate.py +++ b/src/saf/process/job_aggregate.py @@ -14,6 +14,8 @@ from typing import Set from typing import Type +from pydantic import Field + from saf.collect.event_bus import EventBusCollectedEvent from saf.collect.grains import GrainsCollectedEvent from saf.models import CollectedEvent @@ -32,7 +34,7 @@ class JobAggregateConfig(ProcessConfigBase): Job aggregate collector configuration. """ - jobs: Set[str] + jobs: Set[str] = Field(default_factory=set) def get_config_schema() -> Type[JobAggregateConfig]: @@ -74,7 +76,10 @@ async def process( jid = tag.split("/")[2] # We will probably want to make this condition configurable salt_func = data.get("fun", "") - for func_filter in ctx.config.jobs: + matching_jobs = ctx.config.jobs + if not matching_jobs: + matching_jobs.add("*") + for func_filter in matching_jobs: if fnmatch.fnmatch(salt_func, func_filter): if jid not in ctx.cache["watched_jids"]: ctx.cache["watched_jids"][jid] = { From 20b30af699c9f5ecfb6576f7a31775e85237ece1 Mon Sep 17 00:00:00 2001 From: Pedro Algarvio Date: Fri, 7 Jul 2023 18:32:08 +0100 Subject: [PATCH 19/24] Fix paths to `supervisord.conf` Signed-off-by: Pedro Algarvio --- docker/elastic/centosstream8.Dockerfile | 4 ++-- docker/elastic/centosstream9.Dockerfile | 6 +++--- 2 files changed, 5 insertions(+), 5 deletions(-) diff --git a/docker/elastic/centosstream8.Dockerfile b/docker/elastic/centosstream8.Dockerfile index 2415392..823aaf3 100644 --- a/docker/elastic/centosstream8.Dockerfile +++ b/docker/elastic/centosstream8.Dockerfile @@ -11,7 +11,7 @@ RUN dnf update -y \ && dnf install -y multitail supervisor RUN mkdir -p /etc/supervisor/conf.d/ -ADD docker/elastic/conf/supervisord.conf /etc/supervisor/supervisord.conf +ADD docker/elastic/conf/supervisord.conf /etc/supervisord.conf RUN rpm --import https://repo.saltproject.io/salt/py3/redhat/9/x86_64/SALT-PROJECT-GPG-PUBKEY-2023.pub \ && curl -fsSL https://repo.saltproject.io/salt/py3/redhat/9/x86_64/3006.repo | tee /etc/yum.repos.d/salt.repo \ @@ -36,4 +36,4 @@ ADD docker/elastic/conf/beacons.conf /etc/salt/minion.d/beacons.conf ADD docker/elastic/conf/analytics.minion.conf /etc/salt/minion.d/salt-analytics.conf ADD docker/elastic/conf/minion-2.conf /etc/salt/minion.d/minion-2.conf -CMD ["/usr/bin/supervisord","-c","/etc/supervisor/supervisord.conf"] +CMD ["/usr/bin/supervisord","-c","/etc/supervisord.conf"] diff --git a/docker/elastic/centosstream9.Dockerfile b/docker/elastic/centosstream9.Dockerfile index df26ae4..43047b8 100644 --- a/docker/elastic/centosstream9.Dockerfile +++ b/docker/elastic/centosstream9.Dockerfile @@ -11,7 +11,7 @@ RUN dnf update -y \ && dnf install -y multitail supervisor RUN mkdir -p /etc/supervisor/conf.d/ -ADD docker/elastic/conf/supervisord.conf /etc/supervisor/supervisord.conf +ADD docker/elastic/conf/supervisord.conf /etc/supervisord.conf RUN rpm --import https://repo.saltproject.io/salt/py3/redhat/9/x86_64/SALT-PROJECT-GPG-PUBKEY-2023.pub \ && curl -fsSL https://repo.saltproject.io/salt/py3/redhat/9/x86_64/3006.repo | tee /etc/yum.repos.d/salt.repo \ @@ -37,7 +37,7 @@ ADD docker/elastic/loop-jobs.sh /usr/bin/loop-jobs.sh ADD docker/elastic/conf/analytics.master.conf /etc/salt/master.d/salt-analytics.conf ADD docker/elastic/conf/master-1.conf /etc/salt/master.d/master-1.conf -CMD ["/usr/bin/supervisord","-c","/etc/supervisor/supervisord.conf"] +CMD ["/usr/bin/supervisord","-c","/etc/supervisord.conf"] FROM base as minion-1 @@ -48,4 +48,4 @@ ADD docker/elastic/conf/beacons.conf /etc/salt/minion.d/beacons.conf ADD docker/elastic/conf/analytics.minion.conf /etc/salt/minion.d/salt-analytics.conf ADD docker/elastic/conf/minion-1.conf /etc/salt/minion.d/minion-1.conf -CMD ["/usr/bin/supervisord","-c","/etc/supervisor/supervisord.conf"] +CMD ["/usr/bin/supervisord","-c","/etc/supervisord.conf"] From 368d93aa79f8964d2d84ef4eb5102ec850bcd3ea Mon Sep 17 00:00:00 2001 From: Pedro Algarvio Date: Fri, 7 Jul 2023 18:33:17 +0100 Subject: [PATCH 20/24] `fun_args` also needs to be JSON dumped so that Elastic Search doesn't complain Signed-off-by: Pedro Algarvio --- examples/src/saltext/safexamples/process/jobs_to_es.py | 8 +++++--- 1 file changed, 5 insertions(+), 3 deletions(-) diff --git a/examples/src/saltext/safexamples/process/jobs_to_es.py b/examples/src/saltext/safexamples/process/jobs_to_es.py index 1228b07..8d2b9b0 100644 --- a/examples/src/saltext/safexamples/process/jobs_to_es.py +++ b/examples/src/saltext/safexamples/process/jobs_to_es.py @@ -46,9 +46,11 @@ async def process( data = event.dict() data.pop("data", None) data.update(event.data) - # Have the return field always be a JSON string - if "return" in data: - data["return"] = json.dumps(data["return"]) + # Some field must be cast to JSON strings or EL will complain about + # different types + for key in ("fun_args", "return"): + if key in data: + data[key] = json.dumps(data[key]) data["@timestamp"] = event.start_time evt = ElasticSearchEvent.construct(index="salt_jobs", data=data) log.debug("ElasticSearchEvent: %s", pprint.pformat(evt.dict())) From 356c24dc017a3289ddd9aac3b7211ffb03155cf0 Mon Sep 17 00:00:00 2001 From: Pedro Algarvio Date: Fri, 7 Jul 2023 18:33:56 +0100 Subject: [PATCH 21/24] Fix elastic search docker example after the include grains update Signed-off-by: Pedro Algarvio --- docker/elastic/conf/analytics.master.conf | 13 +++++++- src/saf/collect/event_bus.py | 2 +- src/saf/collect/grains.py | 10 +++--- src/saf/process/job_aggregate.py | 38 +++++++++++++---------- 4 files changed, 41 insertions(+), 22 deletions(-) diff --git a/docker/elastic/conf/analytics.master.conf b/docker/elastic/conf/analytics.master.conf index 51a628f..7bf0b1f 100644 --- a/docker/elastic/conf/analytics.master.conf +++ b/docker/elastic/conf/analytics.master.conf @@ -9,6 +9,15 @@ analytics: tags: - "salt/job/*" + grains-collector: + plugin: grains + interval: 30 + grains: + - "os" + - "id" + - "role" + - "datacenter" + processors: job-aggregate: plugin: job-aggregate @@ -24,7 +33,9 @@ analytics: pipelines: jobs-pipeline: - collect: events-collector + collect: + - grains-collector + - events-collector process: - job-aggregate - cast-to-es diff --git a/src/saf/collect/event_bus.py b/src/saf/collect/event_bus.py index ca6e1dc..997da3f 100644 --- a/src/saf/collect/event_bus.py +++ b/src/saf/collect/event_bus.py @@ -52,4 +52,4 @@ async def collect( salt_event: SaltEvent log.info("The event bus collect plugin is configured to listen to tags: %s", config.tags) async for salt_event in eventbus.iter_events(opts=ctx.salt_config.copy(), tags=config.tags): - yield EventBusCollectedEvent(salt_event=salt_event, data={"tag": salt_event.tag}) + yield EventBusCollectedEvent.construct(salt_event=salt_event, data={"tag": salt_event.tag}) diff --git a/src/saf/collect/grains.py b/src/saf/collect/grains.py index a3ff74c..4282cba 100644 --- a/src/saf/collect/grains.py +++ b/src/saf/collect/grains.py @@ -7,11 +7,13 @@ import asyncio import logging +from typing import Any from typing import AsyncIterator from typing import Dict from typing import List from typing import Type +from pydantic import Field from salt.client import LocalClient from saf.models import CollectConfigBase @@ -26,9 +28,9 @@ class GrainsConfig(CollectConfigBase): Configuration schema for the beacons collect plugin. """ - targets: str = "*" + targets: str = Field(default="*") grains: List[str] - interval: float = 5 + interval: float = Field(default=5) def get_config_schema() -> Type[GrainsConfig]: @@ -44,7 +46,7 @@ class GrainsCollectedEvent(CollectedEvent): """ minion: str - grains: Dict[str, str] + grains: Dict[str, Any] async def collect(*, ctx: PipelineRunContext[GrainsConfig]) -> AsyncIterator[GrainsCollectedEvent]: @@ -58,6 +60,6 @@ async def collect(*, ctx: PipelineRunContext[GrainsConfig]) -> AsyncIterator[Gra ret = client.cmd(config.targets, "grains.item", arg=config.grains) for minion, grains in ret.items(): if grains: - event = GrainsCollectedEvent(data=ret, minion=minion, grains=grains) + event = GrainsCollectedEvent.construct(data=ret, minion=minion, grains=grains) yield event await asyncio.sleep(config.interval) diff --git a/src/saf/process/job_aggregate.py b/src/saf/process/job_aggregate.py index 3aa627a..280df8e 100644 --- a/src/saf/process/job_aggregate.py +++ b/src/saf/process/job_aggregate.py @@ -34,7 +34,7 @@ class JobAggregateConfig(ProcessConfigBase): Job aggregate collector configuration. """ - jobs: Set[str] = Field(default_factory=set) + jobs: Set[str] = Field(default_factory=lambda: {"*"}) def get_config_schema() -> Type[JobAggregateConfig]: @@ -59,7 +59,7 @@ class JobAggregateCollectedEvent(CollectedEvent): async def process( *, ctx: PipelineRunContext[JobAggregateConfig], - event: CollectedEvent, + event: EventBusCollectedEvent | GrainsCollectedEvent, ) -> AsyncIterator[CollectedEvent]: """ Aggregate received events, otherwise store in cache. @@ -72,15 +72,20 @@ async def process( data = salt_event.data if "watched_jids" not in ctx.cache: ctx.cache["watched_jids"] = {} + if "waiting_for_grains" not in ctx.cache: + ctx.cache["waiting_for_grains"] = {} if fnmatch.fnmatch(tag, "salt/job/*/new"): jid = tag.split("/")[2] # We will probably want to make this condition configurable salt_func = data.get("fun", "") - matching_jobs = ctx.config.jobs - if not matching_jobs: - matching_jobs.add("*") - for func_filter in matching_jobs: + for func_filter in ctx.config.jobs: if fnmatch.fnmatch(salt_func, func_filter): + log.debug( + "The job with JID %r and func %r matched function filter %r", + jid, + salt_func, + func_filter, + ) if jid not in ctx.cache["watched_jids"]: ctx.cache["watched_jids"][jid] = { "minions": set(data["minions"]), @@ -113,17 +118,18 @@ async def process( if grains: yield ret else: - if "waiting_for_grains" not in ctx.cache: - ctx.cache["waiting_for_grains"] = set() - ctx.cache["waiting_for_grains"].add(ret) + if minion_id not in ctx.cache["waiting_for_grains"]: + ctx.cache["waiting_for_grains"][minion_id] = [] + ctx.cache["waiting_for_grains"][minion_id].append(ret) + else: + log.debug( + "The JID %r was not found in the 'watched_jids' processor cache. Ignoring", jid + ) elif isinstance(event, GrainsCollectedEvent): if "grains" not in ctx.cache: ctx.cache["grains"] = {} ctx.cache["grains"][event.minion] = event.grains - waiting = ctx.cache.get("waiting_for_grains") - if waiting: - to_remove = [agg_event for agg_event in waiting if agg_event.minion_id == event.minion] - for event_with_grains in to_remove: - event_with_grains.grains = event.grains - waiting.remove(event_with_grains) - yield event_with_grains + waiting_events = ctx.cache["waiting_for_grains"].pop(event.minion, ()) + for event_with_grains in waiting_events: + event_with_grains.grains = event.grains + yield event_with_grains From db0356b96b6f3fc5825f7e0260a6f1c380d3c767 Mon Sep 17 00:00:00 2001 From: Pedro Algarvio Date: Fri, 7 Jul 2023 19:47:15 +0100 Subject: [PATCH 22/24] Add a state run index to the elastic search docker example Signed-off-by: Pedro Algarvio --- docker/elastic/centosstream9.Dockerfile | 1 + docker/elastic/conf/analytics.master.conf | 19 +++++- docker/elastic/debian11.Dockerfile | 1 + docker/elastic/loop-jobs.sh | 12 ++++ docker/elastic/state-tree/add-user.sls | 4 ++ docker/elastic/state-tree/install-nginx.sls | 2 + docker/elastic/state-tree/remove-user.sls | 4 ++ docker/elastic/state-tree/uninstall-nginx.sls | 2 + examples/setup.cfg | 1 + .../safexamples/process/state_jobs_to_es.py | 58 +++++++++++++++++++ 10 files changed, 102 insertions(+), 2 deletions(-) create mode 100644 docker/elastic/state-tree/add-user.sls create mode 100644 docker/elastic/state-tree/install-nginx.sls create mode 100644 docker/elastic/state-tree/remove-user.sls create mode 100644 docker/elastic/state-tree/uninstall-nginx.sls create mode 100644 examples/src/saltext/safexamples/process/state_jobs_to_es.py diff --git a/docker/elastic/centosstream9.Dockerfile b/docker/elastic/centosstream9.Dockerfile index 43047b8..cad10f7 100644 --- a/docker/elastic/centosstream9.Dockerfile +++ b/docker/elastic/centosstream9.Dockerfile @@ -36,6 +36,7 @@ ADD docker/elastic/conf/supervisord.loop-jobs.conf /etc/supervisor/conf.d/loop-j ADD docker/elastic/loop-jobs.sh /usr/bin/loop-jobs.sh ADD docker/elastic/conf/analytics.master.conf /etc/salt/master.d/salt-analytics.conf ADD docker/elastic/conf/master-1.conf /etc/salt/master.d/master-1.conf +ADD docker/elastic/state-tree/*.sls /srv/salt/ CMD ["/usr/bin/supervisord","-c","/etc/supervisord.conf"] diff --git a/docker/elastic/conf/analytics.master.conf b/docker/elastic/conf/analytics.master.conf index 7bf0b1f..a8b679b 100644 --- a/docker/elastic/conf/analytics.master.conf +++ b/docker/elastic/conf/analytics.master.conf @@ -4,7 +4,7 @@ engines: analytics: collectors: - events-collector: + jobs-collector: plugin: event-bus tags: - "salt/job/*" @@ -21,8 +21,14 @@ analytics: processors: job-aggregate: plugin: job-aggregate + state-job-aggregate: + plugin: job-aggregate + jobs: + - "state.*" cast-to-es: plugin: jobs_to_es + state-cast-to-es: + plugin: state_jobs_to_es forwarders: elasticsearch-forwarder: @@ -35,8 +41,17 @@ analytics: jobs-pipeline: collect: - grains-collector - - events-collector + - jobs-collector process: - job-aggregate - cast-to-es forward: elasticsearch-forwarder + + state-jobs-pipeline: + collect: + - grains-collector + - jobs-collector + process: + - state-job-aggregate + - state-cast-to-es + forward: elasticsearch-forwarder diff --git a/docker/elastic/debian11.Dockerfile b/docker/elastic/debian11.Dockerfile index 7162506..9383799 100644 --- a/docker/elastic/debian11.Dockerfile +++ b/docker/elastic/debian11.Dockerfile @@ -36,6 +36,7 @@ ADD docker/elastic/conf/supervisord.loop-jobs.conf /etc/supervisor/conf.d/loop-j ADD docker/elastic/loop-jobs.sh /usr/bin/loop-jobs.sh ADD docker/elastic/conf/analytics.master.conf /etc/salt/master.d/salt-analytics.conf ADD docker/elastic/conf/master-1.conf /etc/salt/master.d/master-1.conf +ADD docker/elastic/state-tree/*.sls /srv/salt/ CMD ["/usr/bin/supervisord","-c","/etc/supervisor/supervisord.conf"] diff --git a/docker/elastic/loop-jobs.sh b/docker/elastic/loop-jobs.sh index 877dd8a..d6d9871 100644 --- a/docker/elastic/loop-jobs.sh +++ b/docker/elastic/loop-jobs.sh @@ -7,4 +7,16 @@ while true; do SLEEP=$(shuf -i 3-10 -n 1) sleep $SLEEP /usr/bin/salt \* test.ping + SLEEP=$(shuf -i 3-10 -n 1) + sleep $SLEEP + /usr/bin/salt \* state.sls add-user + SLEEP=$(shuf -i 3-10 -n 1) + sleep $SLEEP + /usr/bin/salt \* state.sls remove-user + SLEEP=$(shuf -i 3-10 -n 1) + sleep $SLEEP + /usr/bin/salt \* state.sls install-nginx + SLEEP=$(shuf -i 3-10 -n 1) + sleep $SLEEP + /usr/bin/salt \* state.sls uninstall-nginx done diff --git a/docker/elastic/state-tree/add-user.sls b/docker/elastic/state-tree/add-user.sls new file mode 100644 index 0000000..478c527 --- /dev/null +++ b/docker/elastic/state-tree/add-user.sls @@ -0,0 +1,4 @@ +james-bond: + user.present: + - createhome: true + - empty_password: true diff --git a/docker/elastic/state-tree/install-nginx.sls b/docker/elastic/state-tree/install-nginx.sls new file mode 100644 index 0000000..517dce7 --- /dev/null +++ b/docker/elastic/state-tree/install-nginx.sls @@ -0,0 +1,2 @@ +nginx: + pkg.installed diff --git a/docker/elastic/state-tree/remove-user.sls b/docker/elastic/state-tree/remove-user.sls new file mode 100644 index 0000000..de801bf --- /dev/null +++ b/docker/elastic/state-tree/remove-user.sls @@ -0,0 +1,4 @@ +james-bond: + user.absent: + - purge: true + - force: true diff --git a/docker/elastic/state-tree/uninstall-nginx.sls b/docker/elastic/state-tree/uninstall-nginx.sls new file mode 100644 index 0000000..5edc4bb --- /dev/null +++ b/docker/elastic/state-tree/uninstall-nginx.sls @@ -0,0 +1,2 @@ +nginx: + pkg.removed diff --git a/examples/setup.cfg b/examples/setup.cfg index 8fbd678..1f502a5 100644 --- a/examples/setup.cfg +++ b/examples/setup.cfg @@ -50,6 +50,7 @@ saf.process = numpy_save_keys = saltext.safexamples.process.numpy_save_keys beacons_to_es = saltext.safexamples.process.beacons_to_es jobs_to_es = saltext.safexamples.process.jobs_to_es + state_jobs_to_es = saltext.safexamples.process.state_jobs_to_es [bdist_wheel] # Use this option if your package is pure-python diff --git a/examples/src/saltext/safexamples/process/state_jobs_to_es.py b/examples/src/saltext/safexamples/process/state_jobs_to_es.py new file mode 100644 index 0000000..a18ac62 --- /dev/null +++ b/examples/src/saltext/safexamples/process/state_jobs_to_es.py @@ -0,0 +1,58 @@ +# Copyright 2021-2023 VMware, Inc. +# SPDX-License-Identifier: Apache-2.0 +""" +Convert beacon event to an elastic search index event. +""" +from __future__ import annotations + +import json +import logging +import pprint +from typing import TYPE_CHECKING +from typing import AsyncIterator +from typing import Type + +from saf.forward.elasticsearch import ElasticSearchEvent +from saf.models import PipelineRunContext +from saf.models import ProcessConfigBase + +if TYPE_CHECKING: + from saf.process.job_aggregate import JobAggregateCollectedEvent + +log = logging.getLogger(__name__) + + +class SaltStateJobsToESConfig(ProcessConfigBase): + """ + Processor configuration. + """ + + +def get_config_schema() -> Type[SaltStateJobsToESConfig]: + """ + Get the test collect plugin configuration schema. + """ + return SaltStateJobsToESConfig + + +async def process( + *, + ctx: PipelineRunContext[SaltStateJobsToESConfig], # noqa: ARG001 + event: JobAggregateCollectedEvent, +) -> AsyncIterator[ElasticSearchEvent]: + """ + Method called to collect events, in this case, generate. + """ + data = event.dict() + data.pop("data", None) + data.update(event.data) + data["state_name"] = data["fun_args"][0] + # Some field must be cast to JSON strings or EL will complain about + # different types + for key in ("fun_args", "return"): + if key in data: + data[key] = json.dumps(data[key]) + data["@timestamp"] = event.start_time + evt = ElasticSearchEvent.construct(index="salt_state_runs", data=data) + log.debug("ElasticSearchEvent: %s", pprint.pformat(evt.dict())) + yield evt From 93a8bae8a4d609c760075737e926061a6dca8e34 Mon Sep 17 00:00:00 2001 From: Pedro Algarvio Date: Mon, 10 Jul 2023 14:32:05 +0100 Subject: [PATCH 23/24] Setup the cache sooner Signed-off-by: Pedro Algarvio --- src/saf/process/job_aggregate.py | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/src/saf/process/job_aggregate.py b/src/saf/process/job_aggregate.py index 280df8e..b886247 100644 --- a/src/saf/process/job_aggregate.py +++ b/src/saf/process/job_aggregate.py @@ -64,16 +64,16 @@ async def process( """ Aggregate received events, otherwise store in cache. """ + if "watched_jids" not in ctx.cache: + ctx.cache["watched_jids"] = {} + if "waiting_for_grains" not in ctx.cache: + ctx.cache["waiting_for_grains"] = {} if isinstance(event, EventBusCollectedEvent): log.debug("Event Bus Collected Event:\n%s", pprint.pformat(event.dict())) salt_event = event.salt_event log.debug("Event Bus Collected Salt Event:\n%s", pprint.pformat(salt_event)) tag = salt_event.tag data = salt_event.data - if "watched_jids" not in ctx.cache: - ctx.cache["watched_jids"] = {} - if "waiting_for_grains" not in ctx.cache: - ctx.cache["waiting_for_grains"] = {} if fnmatch.fnmatch(tag, "salt/job/*/new"): jid = tag.split("/")[2] # We will probably want to make this condition configurable From e4c773a0991d923f359440546a2fd34d95fa7bc9 Mon Sep 17 00:00:00 2001 From: Pedro Algarvio Date: Tue, 18 Jul 2023 18:36:46 +0100 Subject: [PATCH 24/24] Catch `JSONDecodeError` errors Signed-off-by: Pedro Algarvio --- tools/ci.py | 7 ++++++- 1 file changed, 6 insertions(+), 1 deletion(-) diff --git a/tools/ci.py b/tools/ci.py index 7699378..fc23610 100644 --- a/tools/ci.py +++ b/tools/ci.py @@ -88,7 +88,12 @@ def download_onedir( tempdir_path = pathlib.Path(tempfile.gettempdir()) with ctx.web: repo_json_file = _download_file(ctx, repo_json_url, tempdir_path / "repo.json") - repo_json_data = json.loads(repo_json_file.read_text()) + repo_json_contents = repo_json_file.read_text() + try: + repo_json_data = json.loads(repo_json_contents) + except json.JSONDecodeError: + ctx.error("Failed to parse JSON from:\n{repo_json_contents}") + ctx.exit(1) ctx.info("Contents of the downloaded 'repo.json' file:") ctx.print(repo_json_data, soft_wrap=True) if salt_version not in repo_json_data: