diff --git a/.dockerignore b/.dockerignore new file mode 100644 index 0000000..9b89590 --- /dev/null +++ b/.dockerignore @@ -0,0 +1 @@ +RADIO diff --git a/changelog/68.bugfix.rst b/changelog/68.bugfix.rst new file mode 100644 index 0000000..804c729 --- /dev/null +++ b/changelog/68.bugfix.rst @@ -0,0 +1 @@ +Fix regression which prevented pipelines without processors diff --git a/changelog/68.improvement.rst b/changelog/68.improvement.rst new file mode 100644 index 0000000..56c78a4 --- /dev/null +++ b/changelog/68.improvement.rst @@ -0,0 +1,5 @@ +Several improvements to the project + +* When loading plugin configs, catch `KeyError` on unavailable plugins and list all available. +* Pass the event tag and stamp explicitly when calling `saf.utils.eventbus._construct_event()` +* Add the `id` attribute to `BeaconCollectedEvent`, which is minion ID. diff --git a/changelog/69.improvement.rst b/changelog/69.improvement.rst new file mode 100644 index 0000000..1aad54e --- /dev/null +++ b/changelog/69.improvement.rst @@ -0,0 +1 @@ +Add an Elastic Search forwarder(inc. example usage) diff --git a/docker/elastic/.env b/docker/elastic/.env new file mode 100644 index 0000000..3702ec4 --- /dev/null +++ b/docker/elastic/.env @@ -0,0 +1,30 @@ +# Password for the 'elastic' user (at least 6 characters) +ELASTIC_PASSWORD=elastic + +# Password for the 'kibana_system' user (at least 6 characters) +KIBANA_PASSWORD=kibana + +# Version of Elastic products +#STACK_VERSION=8.8.2 +STACK_VERSION=7.17.11 + +# Set the cluster name +CLUSTER_NAME=docker-cluster + +# Set to 'basic' or 'trial' to automatically start the 30-day trial +LICENSE=basic +#LICENSE=trial + +# Port to expose Elasticsearch HTTP API to the host +ES_PORT=9200 +#ES_PORT=127.0.0.1:9200 + +# Port to expose Kibana to the host +KIBANA_PORT=5601 +#KIBANA_PORT=80 + +# Increase or decrease based on the available host memory (in bytes) +MEM_LIMIT=2073741824 + +# Project namespace (defaults to the current folder name if not set) +#COMPOSE_PROJECT_NAME=myproject diff --git a/docker/elastic/centosstream8.Dockerfile b/docker/elastic/centosstream8.Dockerfile new file mode 100644 index 0000000..823aaf3 --- /dev/null +++ b/docker/elastic/centosstream8.Dockerfile @@ -0,0 +1,39 @@ +FROM ghcr.io/saltstack/salt-ci-containers/centos-stream:8 as base + +ENV LANG=C.UTF-8 +ENV LANGUAGE=C.UTF-8 +RUN ln -sf /etc/localtime /usr/share/zoneinfo/America/Denver + +RUN dnf update -y \ + && dnf upgrade -y \ + && dnf install -y sed vim tmux sudo tree net-tools bind-utils lsof nmap which binutils iputils epel-release procps \ + && dnf install -y --allowerasing curl \ + && dnf install -y multitail supervisor + +RUN mkdir -p /etc/supervisor/conf.d/ +ADD docker/elastic/conf/supervisord.conf /etc/supervisord.conf + +RUN rpm --import https://repo.saltproject.io/salt/py3/redhat/9/x86_64/SALT-PROJECT-GPG-PUBKEY-2023.pub \ + && curl -fsSL https://repo.saltproject.io/salt/py3/redhat/9/x86_64/3006.repo | tee /etc/yum.repos.d/salt.repo \ + && dnf install -y salt + +COPY ../../dist/salt*.whl /src/ +RUN ls -lah /src \ + && /opt/saltstack/salt/salt-pip install /src/salt_analytics_framework*.whl \ + && rm -f /src/*.whl + +COPY ../../examples/dist/salt*.whl /src/ +RUN ls -lah /src \ + && /opt/saltstack/salt/salt-pip install --find-links /src/ salt-analytics.examples[elasticsearch] \ + && rm -f /src/*.whl + + +FROM base as minion-2 + +RUN dnf install -y salt-minion +ADD docker/elastic/conf/supervisord.minion.conf /etc/supervisor/conf.d/minion.conf +ADD docker/elastic/conf/beacons.conf /etc/salt/minion.d/beacons.conf +ADD docker/elastic/conf/analytics.minion.conf /etc/salt/minion.d/salt-analytics.conf +ADD docker/elastic/conf/minion-2.conf /etc/salt/minion.d/minion-2.conf + +CMD ["/usr/bin/supervisord","-c","/etc/supervisord.conf"] diff --git a/docker/elastic/centosstream9.Dockerfile b/docker/elastic/centosstream9.Dockerfile new file mode 100644 index 0000000..cad10f7 --- /dev/null +++ b/docker/elastic/centosstream9.Dockerfile @@ -0,0 +1,52 @@ +FROM ghcr.io/saltstack/salt-ci-containers/centos-stream:9 as base + +ENV LANG=C.UTF-8 +ENV LANGUAGE=C.UTF-8 +RUN ln -sf /etc/localtime /usr/share/zoneinfo/America/Denver + +RUN dnf update -y \ + && dnf upgrade -y \ + && dnf install -y sed vim tmux sudo tree net-tools bind-utils lsof nmap which binutils iputils epel-release procps \ + && dnf install -y --allowerasing curl \ + && dnf install -y multitail supervisor + +RUN mkdir -p /etc/supervisor/conf.d/ +ADD docker/elastic/conf/supervisord.conf /etc/supervisord.conf + +RUN rpm --import https://repo.saltproject.io/salt/py3/redhat/9/x86_64/SALT-PROJECT-GPG-PUBKEY-2023.pub \ + && curl -fsSL https://repo.saltproject.io/salt/py3/redhat/9/x86_64/3006.repo | tee /etc/yum.repos.d/salt.repo \ + && dnf install -y salt + +COPY ../../dist/salt*.whl /src/ +RUN ls -lah /src \ + && /opt/saltstack/salt/salt-pip install /src/salt_analytics_framework*.whl \ + && rm -f /src/*.whl + +COPY ../../examples/dist/salt*.whl /src/ +RUN ls -lah /src \ + && /opt/saltstack/salt/salt-pip install --find-links /src/ salt-analytics.examples[elasticsearch] \ + && rm -f /src/*.whl + + +FROM base as master-1 + +RUN dnf install -y salt-master +ADD docker/elastic/conf/supervisord.master.conf /etc/supervisor/conf.d/master.conf +ADD docker/elastic/conf/supervisord.loop-jobs.conf /etc/supervisor/conf.d/loop-jobs.conf +ADD docker/elastic/loop-jobs.sh /usr/bin/loop-jobs.sh +ADD docker/elastic/conf/analytics.master.conf /etc/salt/master.d/salt-analytics.conf +ADD docker/elastic/conf/master-1.conf /etc/salt/master.d/master-1.conf +ADD docker/elastic/state-tree/*.sls /srv/salt/ + +CMD ["/usr/bin/supervisord","-c","/etc/supervisord.conf"] + + +FROM base as minion-1 + +RUN dnf install -y salt-minion +ADD docker/elastic/conf/supervisord.minion.conf /etc/supervisor/conf.d/minion.conf +ADD docker/elastic/conf/beacons.conf /etc/salt/minion.d/beacons.conf +ADD docker/elastic/conf/analytics.minion.conf /etc/salt/minion.d/salt-analytics.conf +ADD docker/elastic/conf/minion-1.conf /etc/salt/minion.d/minion-1.conf + +CMD ["/usr/bin/supervisord","-c","/etc/supervisord.conf"] diff --git a/docker/elastic/conf/analytics.master.conf b/docker/elastic/conf/analytics.master.conf new file mode 100644 index 0000000..a8b679b --- /dev/null +++ b/docker/elastic/conf/analytics.master.conf @@ -0,0 +1,57 @@ +engines: + - analytics + +analytics: + + collectors: + jobs-collector: + plugin: event-bus + tags: + - "salt/job/*" + + grains-collector: + plugin: grains + interval: 30 + grains: + - "os" + - "id" + - "role" + - "datacenter" + + processors: + job-aggregate: + plugin: job-aggregate + state-job-aggregate: + plugin: job-aggregate + jobs: + - "state.*" + cast-to-es: + plugin: jobs_to_es + state-cast-to-es: + plugin: state_jobs_to_es + + forwarders: + elasticsearch-forwarder: + plugin: elasticsearch + hosts: + - http://node01:9200 + + pipelines: + + jobs-pipeline: + collect: + - grains-collector + - jobs-collector + process: + - job-aggregate + - cast-to-es + forward: elasticsearch-forwarder + + state-jobs-pipeline: + collect: + - grains-collector + - jobs-collector + process: + - state-job-aggregate + - state-cast-to-es + forward: elasticsearch-forwarder diff --git a/docker/elastic/conf/analytics.minion.conf b/docker/elastic/conf/analytics.minion.conf new file mode 100644 index 0000000..a7cf3c6 --- /dev/null +++ b/docker/elastic/conf/analytics.minion.conf @@ -0,0 +1,31 @@ +engines: + - analytics + +analytics: + + collectors: + beacons-collector: + plugin: beacons + beacons: + - "*" + + processors: + cast-to-es: + plugin: beacons_to_es + + forwarders: + elasticsearch-forwarder: + plugin: elasticsearch + hosts: + - http://node01:9200 + use_ssl: false + verify_ssl: false + + pipelines: + elastic-pipeline: + collect: + - beacons-collector + process: + - cast-to-es + forward: + - elasticsearch-forwarder diff --git a/docker/elastic/conf/beacons.conf b/docker/elastic/conf/beacons.conf new file mode 100644 index 0000000..c15fdde --- /dev/null +++ b/docker/elastic/conf/beacons.conf @@ -0,0 +1,17 @@ +beacons: +# diskusage: +# - '^\/(?!home).*$': 1% +# - '^[a-zA-Z]:\\$': 1% +# - interval: 5 + memusage: + - percent: 1% + - interval: 5 +# salt_monitor: +# - salt_fun: +# - test.ping +# - interval: 5 +## status: +## - interval: 5 +## swapusage: +## - percent: 1% +## - interval: 5 diff --git a/docker/elastic/conf/master-1.conf b/docker/elastic/conf/master-1.conf new file mode 100644 index 0000000..591e796 --- /dev/null +++ b/docker/elastic/conf/master-1.conf @@ -0,0 +1,5 @@ +id: master-1 +open_mode: true +log_level_logfile: debug +grains: + datacenter: dt-a diff --git a/docker/elastic/conf/master-2.conf b/docker/elastic/conf/master-2.conf new file mode 100644 index 0000000..eec4045 --- /dev/null +++ b/docker/elastic/conf/master-2.conf @@ -0,0 +1,5 @@ +id: master-2 +open_mode: true +log_level_logfile: debug +grains: + datacenter: dt-b diff --git a/docker/elastic/conf/minion-1.conf b/docker/elastic/conf/minion-1.conf new file mode 100644 index 0000000..d4a221d --- /dev/null +++ b/docker/elastic/conf/minion-1.conf @@ -0,0 +1,6 @@ +id: minion-1 +master: master-1 +log_level_logfile: debug +grains: + role: db + datacenter: dt-a diff --git a/docker/elastic/conf/minion-2.conf b/docker/elastic/conf/minion-2.conf new file mode 100644 index 0000000..44a42b8 --- /dev/null +++ b/docker/elastic/conf/minion-2.conf @@ -0,0 +1,6 @@ +id: minion-2 +master: master-1 +log_level_logfile: debug +grains: + role: web + datacenter: dt-a diff --git a/docker/elastic/conf/minion-3.conf b/docker/elastic/conf/minion-3.conf new file mode 100644 index 0000000..889ba7e --- /dev/null +++ b/docker/elastic/conf/minion-3.conf @@ -0,0 +1,6 @@ +id: minion-3 +master: master-2 +log_level_logfile: debug +grains: + role: db + datacenter: dt-b diff --git a/docker/elastic/conf/minion-4.conf b/docker/elastic/conf/minion-4.conf new file mode 100644 index 0000000..52a390f --- /dev/null +++ b/docker/elastic/conf/minion-4.conf @@ -0,0 +1,6 @@ +id: minion-4 +master: master-2 +log_level_logfile: debug +grains: + role: web + datacenter: dt-b diff --git a/docker/elastic/conf/supervisord.conf b/docker/elastic/conf/supervisord.conf new file mode 100644 index 0000000..c48ac50 --- /dev/null +++ b/docker/elastic/conf/supervisord.conf @@ -0,0 +1,29 @@ +[supervisord] +user=root +logfile=/var/log/supervisor/supervisord.log ; (main log file;default $CWD/supervisord.log) +logfile_maxbytes=50MB ; (max main logfile bytes b4 rotation;default 50MB) +logfile_backups=10 ; (num of main logfile rotation backups;default 10) +loglevel=info ; (logging level;default info; others: debug,warn) +pidfile=/var/run/supervisord.pid ; (supervisord pidfile;default supervisord.pid) +nodaemon=true ; (start in foreground if true;default false) +minfds=1024 ; (min. avail startup file descriptors;default 1024) +minprocs=200 ; (min. avail process descriptors;default 200) + +[unix_http_server] +file=/var/tmp/supervisor.sock +username=setup +password=setup + +[supervisorctl] +serverurl=unix:///var/tmp/supervisor.sock ; use a unix:// URL for a unix socket +username=setup +password=setup + +; the below section must remain in the config file for RPC +; (supervisorctl/web interface) to work, additional interfaces may be +; added by defining them in separate rpcinterface: sections +[rpcinterface:supervisor] +supervisor.rpcinterface_factory = supervisor.rpcinterface:make_main_rpcinterface + +[include] +files = /etc/supervisor/conf.d/*.conf diff --git a/docker/elastic/conf/supervisord.loop-jobs.conf b/docker/elastic/conf/supervisord.loop-jobs.conf new file mode 100644 index 0000000..6a51e74 --- /dev/null +++ b/docker/elastic/conf/supervisord.loop-jobs.conf @@ -0,0 +1,2 @@ +[program:loop-jobs] +command=/bin/sh /usr/bin/loop-jobs.sh diff --git a/docker/elastic/conf/supervisord.master.conf b/docker/elastic/conf/supervisord.master.conf new file mode 100644 index 0000000..1a85cc2 --- /dev/null +++ b/docker/elastic/conf/supervisord.master.conf @@ -0,0 +1,2 @@ +[program:salt-master] +command=/opt/saltstack/salt/salt-master -l debug diff --git a/docker/elastic/conf/supervisord.minion.conf b/docker/elastic/conf/supervisord.minion.conf new file mode 100644 index 0000000..4827aa9 --- /dev/null +++ b/docker/elastic/conf/supervisord.minion.conf @@ -0,0 +1,2 @@ +[program:salt-minion] +command=/opt/saltstack/salt/salt-minion -l debug diff --git a/docker/elastic/debian10.Dockerfile b/docker/elastic/debian10.Dockerfile new file mode 100644 index 0000000..2c3d82f --- /dev/null +++ b/docker/elastic/debian10.Dockerfile @@ -0,0 +1,39 @@ +FROM ghcr.io/saltstack/salt-ci-containers/debian:10 as base + +ENV LANG=C.UTF-8 +ENV LANGUAGE=C.UTF-8 +RUN ln -sf /etc/localtime /usr/share/zoneinfo/America/Denver + +RUN apt update \ + && apt upgrade -y \ + && apt install -y curl sed vim tmux sudo tree net-tools bind9utils lsof nmap binutils multitail supervisor iputils-ping procps + +RUN mkdir -p /etc/supervisor/conf.d/ +ADD docker/elastic/conf/supervisord.conf /etc/supervisor/supervisord.conf + +RUN mkdir /etc/apt/keyrings \ + && curl -fsSL -o /etc/apt/keyrings/salt-archive-keyring-2023.gpg https://repo.saltproject.io/salt/py3/debian/11/amd64/SALT-PROJECT-GPG-PUBKEY-2023.gpg \ + && echo "deb [signed-by=/etc/apt/keyrings/salt-archive-keyring-2023.gpg arch=amd64] https://repo.saltproject.io/salt/py3/debian/11/amd64/3006 bullseye main" | tee /etc/apt/sources.list.d/salt.list \ + && apt update \ + && apt install -y salt-common + +COPY ../../dist/salt*.whl /src/ +RUN ls -lah /src \ + && /opt/saltstack/salt/salt-pip install /src/salt_analytics_framework*.whl \ + && rm -f /src/*.whl + +COPY ../../examples/dist/salt*.whl /src/ +RUN ls -lah /src \ + && /opt/saltstack/salt/salt-pip install --find-links /src/ salt-analytics.examples[elasticsearch] \ + && rm -f /src/*.whl + + +FROM base as minion-4 + +RUN apt install -y salt-minion +ADD docker/elastic/conf/supervisord.minion.conf /etc/supervisor/conf.d/minion.conf +ADD docker/elastic/conf/beacons.conf /etc/salt/minion.d/beacons.conf +ADD docker/elastic/conf/analytics.minion.conf /etc/salt/minion.d/salt-analytics.conf +ADD docker/elastic/conf/minion-4.conf /etc/salt/minion.d/minion-4.conf + +CMD ["/usr/bin/supervisord","-c","/etc/supervisor/supervisord.conf"] diff --git a/docker/elastic/debian11.Dockerfile b/docker/elastic/debian11.Dockerfile new file mode 100644 index 0000000..9383799 --- /dev/null +++ b/docker/elastic/debian11.Dockerfile @@ -0,0 +1,52 @@ +FROM ghcr.io/saltstack/salt-ci-containers/debian:11 as base + +ENV LANG=C.UTF-8 +ENV LANGUAGE=C.UTF-8 +RUN ln -sf /etc/localtime /usr/share/zoneinfo/America/Denver + +RUN apt update \ + && apt upgrade -y \ + && apt install -y curl sed vim tmux sudo tree net-tools bind9-utils lsof nmap binutils multitail supervisor iputils-ping procps + +RUN mkdir -p /etc/supervisor/conf.d/ +ADD docker/elastic/conf/supervisord.conf /etc/supervisor/supervisord.conf + +RUN mkdir /etc/apt/keyrings \ + && curl -fsSL -o /etc/apt/keyrings/salt-archive-keyring-2023.gpg https://repo.saltproject.io/salt/py3/debian/11/amd64/SALT-PROJECT-GPG-PUBKEY-2023.gpg \ + && echo "deb [signed-by=/etc/apt/keyrings/salt-archive-keyring-2023.gpg arch=amd64] https://repo.saltproject.io/salt/py3/debian/11/amd64/3006 bullseye main" | tee /etc/apt/sources.list.d/salt.list \ + && apt update \ + && apt install -y salt-common + +COPY ../../dist/salt*.whl /src/ +RUN ls -lah /src \ + && /opt/saltstack/salt/salt-pip install /src/salt_analytics_framework*.whl \ + && rm -f /src/*.whl + +COPY ../../examples/dist/salt*.whl /src/ +RUN ls -lah /src \ + && /opt/saltstack/salt/salt-pip install --find-links /src/ salt-analytics.examples[elasticsearch] \ + && rm -f /src/*.whl + + +FROM base as master-2 + +RUN apt install -y salt-master +ADD docker/elastic/conf/supervisord.master.conf /etc/supervisor/conf.d/master.conf +ADD docker/elastic/conf/supervisord.loop-jobs.conf /etc/supervisor/conf.d/loop-jobs.conf +ADD docker/elastic/loop-jobs.sh /usr/bin/loop-jobs.sh +ADD docker/elastic/conf/analytics.master.conf /etc/salt/master.d/salt-analytics.conf +ADD docker/elastic/conf/master-1.conf /etc/salt/master.d/master-1.conf +ADD docker/elastic/state-tree/*.sls /srv/salt/ + +CMD ["/usr/bin/supervisord","-c","/etc/supervisor/supervisord.conf"] + + +FROM base as minion-3 + +RUN apt install -y salt-minion +ADD docker/elastic/conf/supervisord.minion.conf /etc/supervisor/conf.d/minion.conf +ADD docker/elastic/conf/beacons.conf /etc/salt/minion.d/beacons.conf +ADD docker/elastic/conf/analytics.minion.conf /etc/salt/minion.d/salt-analytics.conf +ADD docker/elastic/conf/minion-3.conf /etc/salt/minion.d/minion-3.conf + +CMD ["/usr/bin/supervisord","-c","/etc/supervisor/supervisord.conf"] diff --git a/docker/elastic/docker-compose.yml b/docker/elastic/docker-compose.yml new file mode 100644 index 0000000..3394f22 --- /dev/null +++ b/docker/elastic/docker-compose.yml @@ -0,0 +1,168 @@ +version: '3.8' +services: + node01: + image: docker.elastic.co/elasticsearch/elasticsearch:${STACK_VERSION} + container_name: node01 + environment: + - node.name=node01 + - cluster.name=es-cluster-7 + - discovery.type=single-node + - "ES_JAVA_OPTS=-Xms512m -Xmx512m" + ulimits: + memlock: + soft: -1 + hard: -1 + volumes: + - es-data01:/usr/share/elasticsearch/data + ports: + - ${ES_PORT}:9200 + networks: + - es-network + + kibana: + image: docker.elastic.co/kibana/kibana:${STACK_VERSION} + container_name: kibana + environment: + ELASTICSEARCH_HOSTS: http://node01:9200 + ports: + - ${KIBANA_PORT}:5601 + networks: + - es-network + depends_on: + - node01 + healthcheck: + test: + [ + "CMD-SHELL", + "curl -s -I http://localhost:5601 | grep -q 'HTTP/1.1 302 Found'", + ] + interval: 10s + timeout: 10s + retries: 120 + + metricbeat: + image: docker.elastic.co/beats/metricbeat:${STACK_VERSION} + container_name: metricbeat + environment: + ELASTICSEARCH_HOSTS: http://node01:9200 + volumes: + - metricbeat-data01:/usr/share/metricbeat/data + networks: + - es-network + depends_on: + - node01 + + master-1: + depends_on: + kibana: + condition: service_healthy + container_name: master-1 + build: + context: ../../ + dockerfile: docker/elastic/centosstream9.Dockerfile + target: master-1 + networks: + es-network: + aliases: + - master-1 + healthcheck: + test: + [ + "CMD-SHELL", + "ps $(cat /var/run/salt-master.pid) > /dev//null 2>&1 && test -S /var/run/salt/master/workers.ipc", + ] + interval: 5s + timeout: 10s + retries: 120 + minion-1: + depends_on: + kibana: + condition: service_healthy + master-1: + condition: service_healthy + container_name: minion-1 + build: + context: ../../ + dockerfile: docker/elastic/centosstream9.Dockerfile + target: minion-1 + networks: + es-network: + aliases: + - minion-1 + minion-2: + depends_on: + kibana: + condition: service_healthy + master-1: + condition: service_healthy + container_name: minion-2 + build: + context: ../../ + dockerfile: docker/elastic/centosstream8.Dockerfile + target: minion-2 + networks: + es-network: + aliases: + - minion-2 + master-2: + depends_on: + kibana: + condition: service_healthy + container_name: master-2 + build: + context: ../../ + dockerfile: docker/elastic/debian11.Dockerfile + target: master-2 + networks: + es-network: + aliases: + - master-2 + healthcheck: + test: + [ + "CMD-SHELL", + "ps $(cat /var/run/salt-master.pid) > /dev//null 2>&1 && test -S /var/run/salt/master/workers.ipc", + ] + interval: 5s + timeout: 10s + retries: 120 + minion-3: + depends_on: + kibana: + condition: service_healthy + master-2: + condition: service_healthy + container_name: minion-3 + build: + context: ../../ + dockerfile: docker/elastic/debian11.Dockerfile + target: minion-3 + networks: + es-network: + aliases: + - minion-3 + minion-4: + depends_on: + kibana: + condition: service_healthy + master-2: + condition: service_healthy + container_name: minion-4 + build: + context: ../../ + dockerfile: docker/elastic/debian10.Dockerfile + target: minion-4 + networks: + es-network: + aliases: + - minion-4 + +volumes: + es-data01: + driver: local + metricbeat-data01: + driver: local + +networks: + es-network: + driver: bridge diff --git a/docker/elastic/loop-jobs.sh b/docker/elastic/loop-jobs.sh new file mode 100644 index 0000000..d6d9871 --- /dev/null +++ b/docker/elastic/loop-jobs.sh @@ -0,0 +1,22 @@ +#!/bin/sh +sleep 20 +while true; do + SLEEP=$(shuf -i 3-10 -n 1) + sleep $SLEEP + /usr/bin/salt \* test.fib 3 + SLEEP=$(shuf -i 3-10 -n 1) + sleep $SLEEP + /usr/bin/salt \* test.ping + SLEEP=$(shuf -i 3-10 -n 1) + sleep $SLEEP + /usr/bin/salt \* state.sls add-user + SLEEP=$(shuf -i 3-10 -n 1) + sleep $SLEEP + /usr/bin/salt \* state.sls remove-user + SLEEP=$(shuf -i 3-10 -n 1) + sleep $SLEEP + /usr/bin/salt \* state.sls install-nginx + SLEEP=$(shuf -i 3-10 -n 1) + sleep $SLEEP + /usr/bin/salt \* state.sls uninstall-nginx +done diff --git a/docker/elastic/state-tree/add-user.sls b/docker/elastic/state-tree/add-user.sls new file mode 100644 index 0000000..478c527 --- /dev/null +++ b/docker/elastic/state-tree/add-user.sls @@ -0,0 +1,4 @@ +james-bond: + user.present: + - createhome: true + - empty_password: true diff --git a/docker/elastic/state-tree/install-nginx.sls b/docker/elastic/state-tree/install-nginx.sls new file mode 100644 index 0000000..517dce7 --- /dev/null +++ b/docker/elastic/state-tree/install-nginx.sls @@ -0,0 +1,2 @@ +nginx: + pkg.installed diff --git a/docker/elastic/state-tree/remove-user.sls b/docker/elastic/state-tree/remove-user.sls new file mode 100644 index 0000000..de801bf --- /dev/null +++ b/docker/elastic/state-tree/remove-user.sls @@ -0,0 +1,4 @@ +james-bond: + user.absent: + - purge: true + - force: true diff --git a/docker/elastic/state-tree/uninstall-nginx.sls b/docker/elastic/state-tree/uninstall-nginx.sls new file mode 100644 index 0000000..5edc4bb --- /dev/null +++ b/docker/elastic/state-tree/uninstall-nginx.sls @@ -0,0 +1,2 @@ +nginx: + pkg.removed diff --git a/examples/requirements/all.txt b/examples/requirements/all.txt index 3acf00e..7d81ef1 100644 --- a/examples/requirements/all.txt +++ b/examples/requirements/all.txt @@ -1,2 +1,3 @@ +-r elasticsearch.txt -r mnist-notebook.txt -r mnist.txt diff --git a/examples/requirements/elasticsearch.txt b/examples/requirements/elasticsearch.txt new file mode 100644 index 0000000..8c7dabc --- /dev/null +++ b/examples/requirements/elasticsearch.txt @@ -0,0 +1 @@ +elasticsearch[async] diff --git a/examples/setup.cfg b/examples/setup.cfg index 9c48d80..1f502a5 100644 --- a/examples/setup.cfg +++ b/examples/setup.cfg @@ -48,6 +48,9 @@ saf.process = mnist_network = saltext.safexamples.process.mnist_network notebook_output = saltext.safexamples.process.notebook_output numpy_save_keys = saltext.safexamples.process.numpy_save_keys + beacons_to_es = saltext.safexamples.process.beacons_to_es + jobs_to_es = saltext.safexamples.process.jobs_to_es + state_jobs_to_es = saltext.safexamples.process.state_jobs_to_es [bdist_wheel] # Use this option if your package is pure-python @@ -56,3 +59,11 @@ universal = 1 [sdist] owner = root group = root + + +[requirements-files] +extras_require = + all = requirements/all.txt + elasticsearch = requirements/elasticsearch.txt + mnist = requirements/mnist.txt + mnist-notebook = requirements/mnist-notebook.txt diff --git a/examples/src/saltext/safexamples/process/beacons_to_es.py b/examples/src/saltext/safexamples/process/beacons_to_es.py new file mode 100644 index 0000000..afc9ea9 --- /dev/null +++ b/examples/src/saltext/safexamples/process/beacons_to_es.py @@ -0,0 +1,50 @@ +# Copyright 2021-2023 VMware, Inc. +# SPDX-License-Identifier: Apache-2.0 +""" +Convert beacon event to an elastic search index event. +""" +from __future__ import annotations + +import logging +import pprint +from typing import TYPE_CHECKING +from typing import Any +from typing import AsyncIterator +from typing import Type +from typing import cast + +from saf.forward.elasticsearch import ElasticSearchEvent +from saf.models import PipelineRunContext +from saf.models import ProcessConfigBase + +if TYPE_CHECKING: + from saf.collect.beacons import BeaconCollectedEvent +log = logging.getLogger(__name__) + + +class BeaconToESConfig(ProcessConfigBase): + """ + Processor configuration. + """ + + +def get_config_schema() -> Type[BeaconToESConfig]: + """ + Get the test collect plugin configuration schema. + """ + return BeaconToESConfig + + +async def process( + *, + ctx: PipelineRunContext[BeaconToESConfig], + event: BeaconCollectedEvent, +) -> AsyncIterator[ElasticSearchEvent]: + """ + Method called to collect events, in this case, generate. + """ + data = cast(dict[str, Any], event.data).copy() + data["role"] = ctx.info.salt.role + evt = ElasticSearchEvent.construct(index=event.beacon, data=data) + log.debug("ElasticSearchEvent: %s", pprint.pformat(evt.dict())) + yield evt diff --git a/examples/src/saltext/safexamples/process/jobs_to_es.py b/examples/src/saltext/safexamples/process/jobs_to_es.py new file mode 100644 index 0000000..8d2b9b0 --- /dev/null +++ b/examples/src/saltext/safexamples/process/jobs_to_es.py @@ -0,0 +1,57 @@ +# Copyright 2021-2023 VMware, Inc. +# SPDX-License-Identifier: Apache-2.0 +""" +Convert beacon event to an elastic search index event. +""" +from __future__ import annotations + +import json +import logging +import pprint +from typing import TYPE_CHECKING +from typing import AsyncIterator +from typing import Type + +from saf.forward.elasticsearch import ElasticSearchEvent +from saf.models import PipelineRunContext +from saf.models import ProcessConfigBase + +if TYPE_CHECKING: + from saf.process.job_aggregate import JobAggregateCollectedEvent + +log = logging.getLogger(__name__) + + +class SaltJobsToESConfig(ProcessConfigBase): + """ + Processor configuration. + """ + + +def get_config_schema() -> Type[SaltJobsToESConfig]: + """ + Get the test collect plugin configuration schema. + """ + return SaltJobsToESConfig + + +async def process( + *, + ctx: PipelineRunContext[SaltJobsToESConfig], # noqa: ARG001 + event: JobAggregateCollectedEvent, +) -> AsyncIterator[ElasticSearchEvent]: + """ + Method called to collect events, in this case, generate. + """ + data = event.dict() + data.pop("data", None) + data.update(event.data) + # Some field must be cast to JSON strings or EL will complain about + # different types + for key in ("fun_args", "return"): + if key in data: + data[key] = json.dumps(data[key]) + data["@timestamp"] = event.start_time + evt = ElasticSearchEvent.construct(index="salt_jobs", data=data) + log.debug("ElasticSearchEvent: %s", pprint.pformat(evt.dict())) + yield evt diff --git a/examples/src/saltext/safexamples/process/state_jobs_to_es.py b/examples/src/saltext/safexamples/process/state_jobs_to_es.py new file mode 100644 index 0000000..a18ac62 --- /dev/null +++ b/examples/src/saltext/safexamples/process/state_jobs_to_es.py @@ -0,0 +1,58 @@ +# Copyright 2021-2023 VMware, Inc. +# SPDX-License-Identifier: Apache-2.0 +""" +Convert beacon event to an elastic search index event. +""" +from __future__ import annotations + +import json +import logging +import pprint +from typing import TYPE_CHECKING +from typing import AsyncIterator +from typing import Type + +from saf.forward.elasticsearch import ElasticSearchEvent +from saf.models import PipelineRunContext +from saf.models import ProcessConfigBase + +if TYPE_CHECKING: + from saf.process.job_aggregate import JobAggregateCollectedEvent + +log = logging.getLogger(__name__) + + +class SaltStateJobsToESConfig(ProcessConfigBase): + """ + Processor configuration. + """ + + +def get_config_schema() -> Type[SaltStateJobsToESConfig]: + """ + Get the test collect plugin configuration schema. + """ + return SaltStateJobsToESConfig + + +async def process( + *, + ctx: PipelineRunContext[SaltStateJobsToESConfig], # noqa: ARG001 + event: JobAggregateCollectedEvent, +) -> AsyncIterator[ElasticSearchEvent]: + """ + Method called to collect events, in this case, generate. + """ + data = event.dict() + data.pop("data", None) + data.update(event.data) + data["state_name"] = data["fun_args"][0] + # Some field must be cast to JSON strings or EL will complain about + # different types + for key in ("fun_args", "return"): + if key in data: + data[key] = json.dumps(data[key]) + data["@timestamp"] = event.start_time + evt = ElasticSearchEvent.construct(index="salt_state_runs", data=data) + log.debug("ElasticSearchEvent: %s", pprint.pformat(evt.dict())) + yield evt diff --git a/pyproject.toml b/pyproject.toml index 09a0538..2431e6e 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -81,6 +81,8 @@ ignore = [ "PERF203", # `try`-`except` within a loop incurs performance overhead" "PERF401", # Use a list comprehension to create a transformed list "PERF402", # Use `list` or `list.copy` to create a copy of a list + "FIX002", # Line contains TODO" + "TD003", # Missing issue link on the line following this TODO" ] ignore-init-module-imports = true builtins = [ diff --git a/requirements/elasticsearch.txt b/requirements/elasticsearch.txt new file mode 100644 index 0000000..8c7dabc --- /dev/null +++ b/requirements/elasticsearch.txt @@ -0,0 +1 @@ +elasticsearch[async] diff --git a/setup.cfg b/setup.cfg index d92d255..c7f9833 100644 --- a/setup.cfg +++ b/setup.cfg @@ -50,18 +50,22 @@ salt.loader = salt-analytics-framework = saf.saltext saf.collect = beacons = saf.collect.beacons + event-bus = saf.collect.event_bus file = saf.collect.file salt_exec = saf.collect.salt_exec + grains = saf.collect.grains test = saf.collect.test saf.process = regex_mask = saf.process.regex_mask shannon_mask = saf.process.shannon_mask jupyter_notebook = saf.process.jupyter_notebook + job-aggregate = saf.process.job_aggregate test = saf.process.test saf.forward = disk = saf.forward.disk noop = saf.forward.noop test = saf.forward.test + elasticsearch = saf.forward.elasticsearch [requirements-files] @@ -75,6 +79,7 @@ extras_require = changelog = requirements/changelog.txt build = requirements/build.txt jupyter = requirements/jupyter.txt + elasticsearch = requirements/elasticsearch.txt [bdist_wheel] diff --git a/src/saf/collect/beacons.py b/src/saf/collect/beacons.py index f517988..22bf4a2 100644 --- a/src/saf/collect/beacons.py +++ b/src/saf/collect/beacons.py @@ -46,6 +46,7 @@ class BeaconCollectedEvent(CollectedEvent): """ beacon: str + id: str # noqa: A003 tag: str stamp: datetime raw_data: Dict[str, Any] @@ -84,8 +85,16 @@ async def collect(*, ctx: PipelineRunContext[BeaconsConfig]) -> AsyncIterator[Be tags = {f"salt/beacon/*/{beacon}/*" for beacon in config.beacons} log.info("The beacons collect plugin is configured to listen to tags: %s", tags) async for salt_event in eventbus.iter_events(opts=ctx.salt_config.copy(), tags=tags): + if "beacon_name" not in salt_event.raw_data: + # TODO @s0undt3ch: We're listening master side, and the payload is not the same... Fix it? + continue + daemon_id = salt_event.data.get("id") + if daemon_id is None: + tag_parts = salt_event.tag.split("/") + daemon_id = tag_parts[2] yield BeaconCollectedEvent( beacon=salt_event.raw_data["beacon_name"], + id=daemon_id, tag=salt_event.tag, stamp=salt_event.stamp, data=salt_event.data, diff --git a/src/saf/collect/event_bus.py b/src/saf/collect/event_bus.py new file mode 100644 index 0000000..997da3f --- /dev/null +++ b/src/saf/collect/event_bus.py @@ -0,0 +1,55 @@ +# Copyright 2021-2023 VMware, Inc. +# SPDX-License-Identifier: Apache-2.0 +""" +Collect events from the event bus. +""" +from __future__ import annotations + +import logging +from typing import AsyncIterator +from typing import Set +from typing import Type + +from saf.models import CollectConfigBase +from saf.models import CollectedEvent +from saf.models import PipelineRunContext +from saf.models import SaltEvent +from saf.utils import eventbus + +log = logging.getLogger(__name__) + + +class EventBusConfig(CollectConfigBase): + """ + Configuration schema for the beacons collect plugin. + """ + + tags: Set[str] + + +def get_config_schema() -> Type[EventBusConfig]: + """ + Get the event bus collect plugin configuration schema. + """ + return EventBusConfig + + +class EventBusCollectedEvent(CollectedEvent): + """ + A collected event surrounding a SaltEvent. + """ + + salt_event: SaltEvent + + +async def collect( + *, ctx: PipelineRunContext[EventBusConfig] +) -> AsyncIterator[EventBusCollectedEvent]: + """ + Method called to collect events. + """ + config = ctx.config + salt_event: SaltEvent + log.info("The event bus collect plugin is configured to listen to tags: %s", config.tags) + async for salt_event in eventbus.iter_events(opts=ctx.salt_config.copy(), tags=config.tags): + yield EventBusCollectedEvent.construct(salt_event=salt_event, data={"tag": salt_event.tag}) diff --git a/src/saf/collect/grains.py b/src/saf/collect/grains.py new file mode 100644 index 0000000..4282cba --- /dev/null +++ b/src/saf/collect/grains.py @@ -0,0 +1,65 @@ +# Copyright 2021-2023 VMware, Inc. +# SPDX-License-Identifier: Apache-2.0 +""" +Collect events from the event bus. +""" +from __future__ import annotations + +import asyncio +import logging +from typing import Any +from typing import AsyncIterator +from typing import Dict +from typing import List +from typing import Type + +from pydantic import Field +from salt.client import LocalClient + +from saf.models import CollectConfigBase +from saf.models import CollectedEvent +from saf.models import PipelineRunContext + +log = logging.getLogger(__name__) + + +class GrainsConfig(CollectConfigBase): + """ + Configuration schema for the beacons collect plugin. + """ + + targets: str = Field(default="*") + grains: List[str] + interval: float = Field(default=5) + + +def get_config_schema() -> Type[GrainsConfig]: + """ + Get the event bus collect plugin configuration schema. + """ + return GrainsConfig + + +class GrainsCollectedEvent(CollectedEvent): + """ + A collected event surrounding a SaltEvent. + """ + + minion: str + grains: Dict[str, Any] + + +async def collect(*, ctx: PipelineRunContext[GrainsConfig]) -> AsyncIterator[GrainsCollectedEvent]: + """ + Method called to collect events. + """ + config = ctx.config + client = LocalClient(mopts=ctx.salt_config.copy()) + + while True: + ret = client.cmd(config.targets, "grains.item", arg=config.grains) + for minion, grains in ret.items(): + if grains: + event = GrainsCollectedEvent.construct(data=ret, minion=minion, grains=grains) + yield event + await asyncio.sleep(config.interval) diff --git a/src/saf/forward/elasticsearch.py b/src/saf/forward/elasticsearch.py new file mode 100644 index 0000000..03fa381 --- /dev/null +++ b/src/saf/forward/elasticsearch.py @@ -0,0 +1,138 @@ +# Copyright 2021-2023 VMware, Inc. +# SPDX-License-Identifier: Apache-2.0 +""" +ElasticSearch forwarder. + +Requires the elasticsearch-py python library. +""" +from __future__ import annotations + +import logging +import pprint +import uuid +from datetime import datetime +from datetime import timedelta +from typing import Any +from typing import Dict +from typing import List +from typing import Optional +from typing import Type +from typing import cast + +from pydantic import Field + +from saf.models import CollectedEvent +from saf.models import ForwardConfigBase +from saf.models import PipelineRunContext + +try: + from elasticsearch import JsonSerializer +except ImportError: + + class JsonSerializer: # type: ignore[no-redef] + """ + Simple class definition in order not to cause errors when elasticsearch fails to import. + """ + + +log = logging.getLogger(__name__) + + +class AnalyticsJsonSerializer(JsonSerializer): + """ + Custom JSON serializer. + """ + + def default(self, data: Any) -> Any: # noqa: ANN101,ANN401 + """ + Convert the passed data to a type that can be JSON serialized. + """ + if isinstance(data, set): + return list(data) + if isinstance(data, datetime): + return data.isoformat() + if isinstance(data, timedelta): + return data.total_seconds() + return super().default(data) + + +class ElasticSearchEvent(CollectedEvent): + """ + ElasticSearch Event. + """ + + index: str + id: str = Field(default_factory=lambda: str(uuid.uuid4())) # noqa: A003 + data: Dict[str, Any] + + +class ElasticSearchConfig(ForwardConfigBase): + """ + Configuration schema for the disk forward plugin. + """ + + hosts: List[str] + username: Optional[str] = None + password: Optional[str] = None + timeout: int = 10 + index: Optional[str] = None + + +def get_config_schema() -> Type[ElasticSearchConfig]: + """ + Get the ElasticSearch forwarder configuration schema. + """ + return ElasticSearchConfig + + +async def forward( + *, + ctx: PipelineRunContext[ElasticSearchConfig], + event: ElasticSearchEvent | CollectedEvent, +) -> None: + """ + Method called to forward the event. + """ + from elasticsearch import AsyncElasticsearch + + if "client" not in ctx.cache: + config = ctx.config + optional_config = {} + if config.username and config.password: + optional_config["http_auth"] = (config.username, config.password) + ctx.cache["client"] = client = AsyncElasticsearch( + hosts=config.hosts, + serializers={"application/json": AnalyticsJsonSerializer()}, + **optional_config, + ) + connection_info = await client.info() + log.warning("ES Connection Info:\n%s", pprint.pformat(connection_info)) + client = ctx.cache["client"] + if isinstance(event, ElasticSearchEvent): + es_id = event.id + es_index = event.index + else: + es_id = str(uuid.uuid4()) + if ctx.config.index is None: + msg = ( + "Event to forward is not an instance of ElasticSearchEvent and the " + "elasticsearch plugin index configuration is not defined." + ) + raise RuntimeError(msg) + es_index = ctx.config.index + data = cast(Dict[str, Any], event.data).copy() + if "@timestamp" not in data: + data["@timestamp"] = event.timestamp + log.debug( + "ElasticSearch Forward Details:\nindex: %s\nid: %s\ndata:\n%s", + es_index, + es_id, + pprint.pformat(data), + ) + ret = await client.index(index=es_index, id=es_id, body=data) + log.debug( + "ElasticSearch Forward Response(index: %s; id: %s):\n%s", + es_index, + es_id, + pprint.pformat(ret), + ) diff --git a/src/saf/models.py b/src/saf/models.py index 82dfd44..bd82c86 100644 --- a/src/saf/models.py +++ b/src/saf/models.py @@ -109,7 +109,15 @@ def loaded_plugin(self: CCB) -> ModuleType: """ Return the plugin instance(module) for which this configuration refers to. """ - return PluginsList.instance().collectors[self.plugin] + try: + return PluginsList.instance().collectors[self.plugin] + except KeyError as exc: + log.warning( + "Failed to load %r collector plugin. Available collector plugins: %s", + self.plugin, + list(PluginsList.instance().collectors), + ) + raise exc from None PCB = TypeVar("PCB", bound="ProcessConfigBase") @@ -125,7 +133,15 @@ def loaded_plugin(self: PCB) -> ModuleType: """ Return the plugin instance(module) for which this configuration refers to. """ - return PluginsList.instance().processors[self.plugin] + try: + return PluginsList.instance().processors[self.plugin] + except KeyError as exc: + log.warning( + "Failed to load %r processor plugin. Available processor plugins: %s", + self.plugin, + list(PluginsList.instance().processors), + ) + raise exc from None FCB = TypeVar("FCB", bound="ForwardConfigBase") @@ -141,7 +157,15 @@ def loaded_plugin(self: FCB) -> ModuleType: """ Return the plugin instance(module) for which this configuration refers to. """ - return PluginsList.instance().forwarders[self.plugin] + try: + return PluginsList.instance().forwarders[self.plugin] + except KeyError as exc: + log.warning( + "Failed to load %r forwarder plugin. Available forwarder plugins: %s", + self.plugin, + list(PluginsList.instance().forwarders), + ) + raise exc from None PC = TypeVar("PC", bound="PipelineConfig") diff --git a/src/saf/process/job_aggregate.py b/src/saf/process/job_aggregate.py new file mode 100644 index 0000000..b886247 --- /dev/null +++ b/src/saf/process/job_aggregate.py @@ -0,0 +1,135 @@ +# Copyright 2021-2023 VMware, Inc. +# SPDX-License-Identifier: Apache-2.0 +""" +Aggregate the necessary job info into one event to be forwarded. +""" +from __future__ import annotations + +import fnmatch +import logging +import pprint +from typing import TYPE_CHECKING +from typing import AsyncIterator +from typing import Dict +from typing import Set +from typing import Type + +from pydantic import Field + +from saf.collect.event_bus import EventBusCollectedEvent +from saf.collect.grains import GrainsCollectedEvent +from saf.models import CollectedEvent +from saf.models import PipelineRunContext +from saf.models import ProcessConfigBase + +if TYPE_CHECKING: + from datetime import datetime + from datetime import timedelta + +log = logging.getLogger(__name__) + + +class JobAggregateConfig(ProcessConfigBase): + """ + Job aggregate collector configuration. + """ + + jobs: Set[str] = Field(default_factory=lambda: {"*"}) + + +def get_config_schema() -> Type[JobAggregateConfig]: + """ + Get the job aggregate collect plugin configuration schema. + """ + return JobAggregateConfig + + +class JobAggregateCollectedEvent(CollectedEvent): + """ + A collected event with aggregated job run information. + """ + + start_time: datetime + end_time: datetime + duration: timedelta + minion_id: str + grains: Dict[str, str] + + +async def process( + *, + ctx: PipelineRunContext[JobAggregateConfig], + event: EventBusCollectedEvent | GrainsCollectedEvent, +) -> AsyncIterator[CollectedEvent]: + """ + Aggregate received events, otherwise store in cache. + """ + if "watched_jids" not in ctx.cache: + ctx.cache["watched_jids"] = {} + if "waiting_for_grains" not in ctx.cache: + ctx.cache["waiting_for_grains"] = {} + if isinstance(event, EventBusCollectedEvent): + log.debug("Event Bus Collected Event:\n%s", pprint.pformat(event.dict())) + salt_event = event.salt_event + log.debug("Event Bus Collected Salt Event:\n%s", pprint.pformat(salt_event)) + tag = salt_event.tag + data = salt_event.data + if fnmatch.fnmatch(tag, "salt/job/*/new"): + jid = tag.split("/")[2] + # We will probably want to make this condition configurable + salt_func = data.get("fun", "") + for func_filter in ctx.config.jobs: + if fnmatch.fnmatch(salt_func, func_filter): + log.debug( + "The job with JID %r and func %r matched function filter %r", + jid, + salt_func, + func_filter, + ) + if jid not in ctx.cache["watched_jids"]: + ctx.cache["watched_jids"][jid] = { + "minions": set(data["minions"]), + "event": salt_event, + } + break + elif fnmatch.fnmatch(tag, "salt/job/*/ret/*"): + split_tag = tag.split("/") + jid = split_tag[2] + minion_id = split_tag[-1] + if jid in ctx.cache["watched_jids"]: + ctx.cache["watched_jids"][jid]["minions"].remove(minion_id) + if not ctx.cache["watched_jids"][jid]["minions"]: + # No more minions should return. Remove jid from cache + job_start_event = ctx.cache["watched_jids"].pop(jid)["event"] + else: + job_start_event = ctx.cache["watched_jids"][jid]["event"] + start_time = job_start_event.stamp + end_time = salt_event.stamp + duration = end_time - start_time + grains = ctx.cache.get("grains", {}).get(minion_id, {}) + ret = JobAggregateCollectedEvent.construct( + data=data, + start_time=start_time, + end_time=end_time, + duration=duration, + minion_id=minion_id, + grains=grains, + ) + if grains: + yield ret + else: + if minion_id not in ctx.cache["waiting_for_grains"]: + ctx.cache["waiting_for_grains"][minion_id] = [] + ctx.cache["waiting_for_grains"][minion_id].append(ret) + else: + log.debug( + "The JID %r was not found in the 'watched_jids' processor cache. Ignoring", jid + ) + elif isinstance(event, GrainsCollectedEvent): + if "grains" not in ctx.cache: + ctx.cache["grains"] = {} + ctx.cache["grains"][event.minion] = event.grains + waiting_events = ctx.cache["waiting_for_grains"].pop(event.minion, ()) + for event_with_grains in waiting_events: + event_with_grains.grains = event.grains + yield event_with_grains diff --git a/src/saf/utils/eventbus.py b/src/saf/utils/eventbus.py index 4c1b631..8eedcbd 100644 --- a/src/saf/utils/eventbus.py +++ b/src/saf/utils/eventbus.py @@ -19,12 +19,13 @@ from saf.models import SaltEvent if TYPE_CHECKING: + from datetime import datetime from queue import Queue log = logging.getLogger(__name__) -def _construct_event(event_data: dict[str, Any]) -> SaltEvent | None: +def _construct_event(tag: str, stamp: datetime, event_data: dict[str, Any]) -> SaltEvent | None: """ Construct a :py:class:`~saf.models.SaltEvent` from a salt event payload. """ @@ -36,10 +37,15 @@ def _construct_event(event_data: dict[str, Any]) -> SaltEvent | None: for key in list(event_data): if key.startswith("_"): event_data.pop(key) + event_data_value = event_data.get("data") or event_data + if isinstance(event_data_value, str): + data = {"return": event_data_value} + else: + data = event_data_value salt_event = SaltEvent( - tag=event_data["tag"], - stamp=event_raw_data["_stamp"], - data=event_data["data"], + tag=tag, + stamp=stamp, + data=data, raw_data=event_raw_data, ) log.debug("Constructed SaltEvent: %s", salt_event) @@ -80,9 +86,6 @@ def _process_events( for tag in tags: try: if fnmatch.fnmatch(beacon_event_data["tag"], tag): - if "_stamp" not in beacon_event_data: - # Wrapped beacon data usually lack the _stamp key/value pair. Use parent's. - beacon_event_data["_stamp"] = event_data["_stamp"] # Unwrap the nested data key/value pair if needed if "data" in beacon_event_data["data"]: beacon_event_data["data"] = beacon_event_data["data"].pop( @@ -93,7 +96,11 @@ def _process_events( beacon_event_data["tag"], beacon_event_data["data"], ) - salt_event = _construct_event(beacon_event_data) + salt_event = _construct_event( + beacon_event_data["tag"], + event_data["_stamp"], + beacon_event_data, + ) if salt_event: events_queue.put_nowait(salt_event) # We found a matching tag, stop iterating tags @@ -109,7 +116,7 @@ def _process_events( for tag in tags: if fnmatch.fnmatch(event_tag, tag): log.debug("Matching event; TAG: %r DATA: %r", event_tag, event_data) - salt_event = _construct_event(event_data) + salt_event = _construct_event(event_tag, event_data["_stamp"], event_data) if salt_event: events_queue.put_nowait(salt_event) # We found a matching tag, stop iterating tags diff --git a/tools/ci.py b/tools/ci.py index 7699378..fc23610 100644 --- a/tools/ci.py +++ b/tools/ci.py @@ -88,7 +88,12 @@ def download_onedir( tempdir_path = pathlib.Path(tempfile.gettempdir()) with ctx.web: repo_json_file = _download_file(ctx, repo_json_url, tempdir_path / "repo.json") - repo_json_data = json.loads(repo_json_file.read_text()) + repo_json_contents = repo_json_file.read_text() + try: + repo_json_data = json.loads(repo_json_contents) + except json.JSONDecodeError: + ctx.error("Failed to parse JSON from:\n{repo_json_contents}") + ctx.exit(1) ctx.info("Contents of the downloaded 'repo.json' file:") ctx.print(repo_json_data, soft_wrap=True) if salt_version not in repo_json_data: