Skip to content

Commit f68652c

Browse files
committed
Add a jobs_to_es processor
Signed-off-by: Pedro Algarvio <[email protected]>
1 parent bb525da commit f68652c

File tree

3 files changed

+60
-5
lines changed

3 files changed

+60
-5
lines changed

examples/setup.cfg

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -49,6 +49,7 @@ saf.process =
4949
notebook_output = saltext.safexamples.process.notebook_output
5050
numpy_save_keys = saltext.safexamples.process.numpy_save_keys
5151
beacons_to_es = saltext.safexamples.process.beacons_to_es
52+
jobs_to_es = saltext.safexamples.process.jobs_to_es
5253

5354
[bdist_wheel]
5455
# Use this option if your package is pure-python

examples/src/saltext/safexamples/process/beacons_to_es.py

Lines changed: 4 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -6,7 +6,7 @@
66
from __future__ import annotations
77

88
import logging
9-
import uuid
9+
import pprint
1010
from typing import TYPE_CHECKING
1111
from typing import Any
1212
from typing import AsyncIterator
@@ -45,7 +45,6 @@ async def process(
4545
"""
4646
data = cast(dict[str, Any], event.data).copy()
4747
data["role"] = ctx.info.salt.role
48-
if TYPE_CHECKING:
49-
assert event.timestamp
50-
data["@timestamp"] = event.timestamp.isoformat()
51-
yield ElasticSearchEvent.construct(index=event.beacon, id=str(uuid.uuid4()), data=data)
48+
evt = ElasticSearchEvent.construct(index=event.beacon, data=data)
49+
log.debug("ElasticSearchEvent: %s", pprint.pformat(evt.dict()))
50+
yield evt
Lines changed: 55 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,55 @@
1+
# Copyright 2021-2023 VMware, Inc.
2+
# SPDX-License-Identifier: Apache-2.0
3+
"""
4+
Convert beacon event to an elastic search index event.
5+
"""
6+
from __future__ import annotations
7+
8+
import json
9+
import logging
10+
import pprint
11+
from typing import TYPE_CHECKING
12+
from typing import AsyncIterator
13+
from typing import Type
14+
15+
from saf.forward.elasticsearch import ElasticSearchEvent
16+
from saf.models import PipelineRunContext
17+
from saf.models import ProcessConfigBase
18+
19+
if TYPE_CHECKING:
20+
from saf.process.job_aggregate import JobAggregateCollectedEvent
21+
22+
log = logging.getLogger(__name__)
23+
24+
25+
class SaltJobsToESConfig(ProcessConfigBase):
26+
"""
27+
Processor configuration.
28+
"""
29+
30+
31+
def get_config_schema() -> Type[SaltJobsToESConfig]:
32+
"""
33+
Get the test collect plugin configuration schema.
34+
"""
35+
return SaltJobsToESConfig
36+
37+
38+
async def process(
39+
*,
40+
ctx: PipelineRunContext[SaltJobsToESConfig], # noqa: ARG001
41+
event: JobAggregateCollectedEvent,
42+
) -> AsyncIterator[ElasticSearchEvent]:
43+
"""
44+
Method called to collect events, in this case, generate.
45+
"""
46+
data = event.dict()
47+
data.pop("data", None)
48+
data.update(event.data)
49+
# Have the return field always be a JSON string
50+
if "return" in data:
51+
data["return"] = json.dumps(data["return"])
52+
data["@timestamp"] = event.start_time
53+
evt = ElasticSearchEvent.construct(index="salt_jobs", data=data)
54+
log.debug("ElasticSearchEvent: %s", pprint.pformat(evt.dict()))
55+
yield evt

0 commit comments

Comments
 (0)