Skip to content

Commit d5acd76

Browse files
MKLebs0undt3ch
authored andcommitted
Add grains collector and state aggregate processor
1 parent 001768a commit d5acd76

File tree

3 files changed

+43
-13
lines changed

3 files changed

+43
-13
lines changed

setup.cfg

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -53,7 +53,7 @@ saf.collect =
5353
event-bus = saf.collect.event_bus
5454
file = saf.collect.file
5555
salt_exec = saf.collect.salt_exec
56-
salt_cmd = saf.collect.salt_cmd
56+
grains = saf.collect.grains
5757
test = saf.collect.test
5858
saf.process =
5959
regex_mask = saf.process.regex_mask

src/saf/collect/salt_cmd.py renamed to src/saf/collect/grains.py

Lines changed: 19 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -26,12 +26,9 @@ class SaltCommandConfig(CollectConfigBase):
2626
Configuration schema for the beacons collect plugin.
2727
"""
2828

29-
targets: str
30-
cmd: str
31-
args: List[str] | None
32-
kwargs: Dict[str, str] | None
33-
interval: float = 5
34-
cache_flag: str | None = None
29+
targets: str = "*"
30+
grains: List[str]
31+
interval: float = 20
3532

3633

3734
def get_config_schema() -> Type[SaltCommandConfig]:
@@ -41,16 +38,27 @@ def get_config_schema() -> Type[SaltCommandConfig]:
4138
return SaltCommandConfig
4239

4340

44-
async def collect(*, ctx: PipelineRunContext[SaltCommandConfig]) -> AsyncIterator[CollectedEvent]:
41+
class GrainsCollectedEvent(CollectedEvent):
42+
"""
43+
A collected event surrounding a SaltEvent.
44+
"""
45+
46+
minion: str
47+
grains: Dict[str, str]
48+
49+
50+
async def collect(
51+
*, ctx: PipelineRunContext[SaltCommandConfig]
52+
) -> AsyncIterator[GrainsCollectedEvent]:
4553
"""
4654
Method called to collect events.
4755
"""
4856
config = ctx.config
4957
client = LocalClient(mopts=ctx.salt_config.copy())
5058

5159
while True:
52-
ret = client.cmd(config.targets, config.cmd, arg=config.args, kwarg=config.kwargs)
53-
event = CollectedEvent(data={config._name: ret}) # noqa: SLF001
54-
log.debug("CollectedEvent: %s", event)
55-
yield event
60+
ret = client.cmd(config.targets, "grains.item", arg=config.grains)
61+
for minion, grains in ret.items():
62+
event = GrainsCollectedEvent(data=ret, minion=minion, grains=grains)
63+
yield event
5664
await asyncio.sleep(config.interval)

src/saf/process/state_aggregate.py

Lines changed: 23 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -9,9 +9,11 @@
99
import logging
1010
from typing import TYPE_CHECKING
1111
from typing import AsyncIterator
12+
from typing import Dict
1213
from typing import Type
1314

1415
from saf.collect.event_bus import EventBusCollectedEvent
16+
from saf.collect.grains import GrainsCollectedEvent
1517
from saf.models import CollectedEvent
1618
from saf.models import PipelineRunContext
1719
from saf.models import ProcessConfigBase
@@ -45,6 +47,7 @@ class StateAggregateCollectedEvent(CollectedEvent):
4547
end_time: datetime
4648
duration: timedelta
4749
minion_id: str
50+
grains: Dict[str, str]
4851

4952

5053
async def process(
@@ -80,10 +83,29 @@ async def process(
8083
start_time = job_start_event.stamp
8184
end_time = salt_event.stamp
8285
duration = end_time - start_time
83-
yield StateAggregateCollectedEvent.construct(
86+
grains = ctx.cache.get("grains", {}).get(minion_id, {})
87+
ret = StateAggregateCollectedEvent.construct(
8488
data=data,
8589
start_time=start_time,
8690
end_time=end_time,
8791
duration=duration,
8892
minion_id=minion_id,
93+
grains=grains,
8994
)
95+
if grains:
96+
yield ret
97+
else:
98+
if "waiting_for_grains" not in ctx.cache:
99+
ctx.cache["waiting_for_grains"] = set()
100+
ctx.cache["waiting_for_grains"].add(ret)
101+
elif isinstance(event, GrainsCollectedEvent):
102+
if "grains" not in ctx.cache:
103+
ctx.cache["grains"] = {}
104+
ctx.cache["grains"][event.minion] = event.grains
105+
waiting = ctx.cache.get("waiting_for_grains")
106+
if waiting:
107+
to_remove = [agg_event for agg_event in waiting if agg_event.minion_id == event.minion]
108+
for event_with_grains in to_remove:
109+
event_with_grains.grains = event.grains
110+
waiting.remove(event_with_grains)
111+
yield event_with_grains

0 commit comments

Comments
 (0)