Skip to content

Commit f129285

Browse files
committed
Fix elastic search docker example after the include grains update
Signed-off-by: Pedro Algarvio <[email protected]>
1 parent 56acb4b commit f129285

File tree

4 files changed

+41
-22
lines changed

4 files changed

+41
-22
lines changed

docker/elastic/conf/analytics.master.conf

Lines changed: 12 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,15 @@ analytics:
99
tags:
1010
- "salt/job/*"
1111

12+
grains-collector:
13+
plugin: grains
14+
interval: 30
15+
grains:
16+
- "os"
17+
- "id"
18+
- "role"
19+
- "datacenter"
20+
1221
processors:
1322
job-aggregate:
1423
plugin: job-aggregate
@@ -24,7 +33,9 @@ analytics:
2433
pipelines:
2534

2635
jobs-pipeline:
27-
collect: events-collector
36+
collect:
37+
- grains-collector
38+
- events-collector
2839
process:
2940
- job-aggregate
3041
- cast-to-es

src/saf/collect/event_bus.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -52,4 +52,4 @@ async def collect(
5252
salt_event: SaltEvent
5353
log.info("The event bus collect plugin is configured to listen to tags: %s", config.tags)
5454
async for salt_event in eventbus.iter_events(opts=ctx.salt_config.copy(), tags=config.tags):
55-
yield EventBusCollectedEvent(salt_event=salt_event, data={"tag": salt_event.tag})
55+
yield EventBusCollectedEvent.construct(salt_event=salt_event, data={"tag": salt_event.tag})

src/saf/collect/grains.py

Lines changed: 6 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -7,11 +7,13 @@
77

88
import asyncio
99
import logging
10+
from typing import Any
1011
from typing import AsyncIterator
1112
from typing import Dict
1213
from typing import List
1314
from typing import Type
1415

16+
from pydantic import Field
1517
from salt.client import LocalClient
1618

1719
from saf.models import CollectConfigBase
@@ -26,9 +28,9 @@ class GrainsConfig(CollectConfigBase):
2628
Configuration schema for the beacons collect plugin.
2729
"""
2830

29-
targets: str = "*"
31+
targets: str = Field(default="*")
3032
grains: List[str]
31-
interval: float = 5
33+
interval: float = Field(default=5)
3234

3335

3436
def get_config_schema() -> Type[GrainsConfig]:
@@ -44,7 +46,7 @@ class GrainsCollectedEvent(CollectedEvent):
4446
"""
4547

4648
minion: str
47-
grains: Dict[str, str]
49+
grains: Dict[str, Any]
4850

4951

5052
async def collect(*, ctx: PipelineRunContext[GrainsConfig]) -> AsyncIterator[GrainsCollectedEvent]:
@@ -58,6 +60,6 @@ async def collect(*, ctx: PipelineRunContext[GrainsConfig]) -> AsyncIterator[Gra
5860
ret = client.cmd(config.targets, "grains.item", arg=config.grains)
5961
for minion, grains in ret.items():
6062
if grains:
61-
event = GrainsCollectedEvent(data=ret, minion=minion, grains=grains)
63+
event = GrainsCollectedEvent.construct(data=ret, minion=minion, grains=grains)
6264
yield event
6365
await asyncio.sleep(config.interval)

src/saf/process/job_aggregate.py

Lines changed: 22 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -34,7 +34,7 @@ class JobAggregateConfig(ProcessConfigBase):
3434
Job aggregate collector configuration.
3535
"""
3636

37-
jobs: Set[str] = Field(default_factory=set)
37+
jobs: Set[str] = Field(default_factory=lambda: {"*"})
3838

3939

4040
def get_config_schema() -> Type[JobAggregateConfig]:
@@ -59,7 +59,7 @@ class JobAggregateCollectedEvent(CollectedEvent):
5959
async def process(
6060
*,
6161
ctx: PipelineRunContext[JobAggregateConfig],
62-
event: CollectedEvent,
62+
event: EventBusCollectedEvent | GrainsCollectedEvent,
6363
) -> AsyncIterator[CollectedEvent]:
6464
"""
6565
Aggregate received events, otherwise store in cache.
@@ -72,15 +72,20 @@ async def process(
7272
data = salt_event.data
7373
if "watched_jids" not in ctx.cache:
7474
ctx.cache["watched_jids"] = {}
75+
if "waiting_for_grains" not in ctx.cache:
76+
ctx.cache["waiting_for_grains"] = {}
7577
if fnmatch.fnmatch(tag, "salt/job/*/new"):
7678
jid = tag.split("/")[2]
7779
# We will probably want to make this condition configurable
7880
salt_func = data.get("fun", "")
79-
matching_jobs = ctx.config.jobs
80-
if not matching_jobs:
81-
matching_jobs.add("*")
82-
for func_filter in matching_jobs:
81+
for func_filter in ctx.config.jobs:
8382
if fnmatch.fnmatch(salt_func, func_filter):
83+
log.debug(
84+
"The job with JID %r and func %r matched function filter %r",
85+
jid,
86+
salt_func,
87+
func_filter,
88+
)
8489
if jid not in ctx.cache["watched_jids"]:
8590
ctx.cache["watched_jids"][jid] = {
8691
"minions": set(data["minions"]),
@@ -113,17 +118,18 @@ async def process(
113118
if grains:
114119
yield ret
115120
else:
116-
if "waiting_for_grains" not in ctx.cache:
117-
ctx.cache["waiting_for_grains"] = set()
118-
ctx.cache["waiting_for_grains"].add(ret)
121+
if minion_id not in ctx.cache["waiting_for_grains"]:
122+
ctx.cache["waiting_for_grains"][minion_id] = []
123+
ctx.cache["waiting_for_grains"][minion_id].append(ret)
124+
else:
125+
log.debug(
126+
"The JID %r was not found in the 'watched_jids' processor cache. Ignoring", jid
127+
)
119128
elif isinstance(event, GrainsCollectedEvent):
120129
if "grains" not in ctx.cache:
121130
ctx.cache["grains"] = {}
122131
ctx.cache["grains"][event.minion] = event.grains
123-
waiting = ctx.cache.get("waiting_for_grains")
124-
if waiting:
125-
to_remove = [agg_event for agg_event in waiting if agg_event.minion_id == event.minion]
126-
for event_with_grains in to_remove:
127-
event_with_grains.grains = event.grains
128-
waiting.remove(event_with_grains)
129-
yield event_with_grains
132+
waiting_events = ctx.cache["waiting_for_grains"].pop(event.minion, ())
133+
for event_with_grains in waiting_events:
134+
event_with_grains.grains = event.grains
135+
yield event_with_grains

0 commit comments

Comments
 (0)