Skip to content

Commit 1a5f5ab

Browse files
MKLebs0undt3ch
authored andcommitted
Make some additional improvements to the JobAggregate Processor
1 parent 248fa1c commit 1a5f5ab

File tree

2 files changed

+27
-21
lines changed

2 files changed

+27
-21
lines changed

src/saf/collect/grains.py

Lines changed: 8 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -21,21 +21,21 @@
2121
log = logging.getLogger(__name__)
2222

2323

24-
class SaltCommandConfig(CollectConfigBase):
24+
class GrainsConfig(CollectConfigBase):
2525
"""
2626
Configuration schema for the beacons collect plugin.
2727
"""
2828

2929
targets: str = "*"
3030
grains: List[str]
31-
interval: float = 20
31+
interval: float = 5
3232

3333

34-
def get_config_schema() -> Type[SaltCommandConfig]:
34+
def get_config_schema() -> Type[GrainsConfig]:
3535
"""
3636
Get the event bus collect plugin configuration schema.
3737
"""
38-
return SaltCommandConfig
38+
return GrainsConfig
3939

4040

4141
class GrainsCollectedEvent(CollectedEvent):
@@ -47,9 +47,7 @@ class GrainsCollectedEvent(CollectedEvent):
4747
grains: Dict[str, str]
4848

4949

50-
async def collect(
51-
*, ctx: PipelineRunContext[SaltCommandConfig]
52-
) -> AsyncIterator[GrainsCollectedEvent]:
50+
async def collect(*, ctx: PipelineRunContext[GrainsConfig]) -> AsyncIterator[GrainsCollectedEvent]:
5351
"""
5452
Method called to collect events.
5553
"""
@@ -59,6 +57,7 @@ async def collect(
5957
while True:
6058
ret = client.cmd(config.targets, "grains.item", arg=config.grains)
6159
for minion, grains in ret.items():
62-
event = GrainsCollectedEvent(data=ret, minion=minion, grains=grains)
63-
yield event
60+
if grains:
61+
event = GrainsCollectedEvent(data=ret, minion=minion, grains=grains)
62+
yield event
6463
await asyncio.sleep(config.interval)

src/saf/process/job_aggregate.py

Lines changed: 19 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,7 @@
1010
from typing import TYPE_CHECKING
1111
from typing import AsyncIterator
1212
from typing import Dict
13+
from typing import Set
1314
from typing import Type
1415

1516
from saf.collect.event_bus import EventBusCollectedEvent
@@ -25,22 +26,24 @@
2526
log = logging.getLogger(__name__)
2627

2728

28-
class StateAggregateConfig(ProcessConfigBase):
29+
class JobAggregateConfig(ProcessConfigBase):
2930
"""
3031
Job aggregate collector configuration.
3132
"""
3233

34+
jobs: Set[str]
3335

34-
def get_config_schema() -> Type[StateAggregateConfig]:
36+
37+
def get_config_schema() -> Type[JobAggregateConfig]:
3538
"""
3639
Get the job aggregate collect plugin configuration schema.
3740
"""
38-
return StateAggregateConfig
41+
return JobAggregateConfig
3942

4043

41-
class StateAggregateCollectedEvent(CollectedEvent):
44+
class JobAggregateCollectedEvent(CollectedEvent):
4245
"""
43-
A collected event with aggregated state run information.
46+
A collected event with aggregated job run information.
4447
"""
4548

4649
start_time: datetime
@@ -52,7 +55,7 @@ class StateAggregateCollectedEvent(CollectedEvent):
5255

5356
async def process(
5457
*,
55-
ctx: PipelineRunContext[StateAggregateConfig],
58+
ctx: PipelineRunContext[JobAggregateConfig],
5659
event: CollectedEvent,
5760
) -> AsyncIterator[CollectedEvent]:
5861
"""
@@ -67,11 +70,15 @@ async def process(
6770
if fnmatch.fnmatch(tag, "salt/job/*/new"):
6871
jid = tag.split("/")[2]
6972
# We will probably want to make this condition configurable
70-
if jid not in ctx.cache["watched_jids"]:
71-
ctx.cache["watched_jids"][jid] = {
72-
"minions": set(salt_event.data["minions"]),
73-
"event": salt_event,
74-
}
73+
salt_func = data.get("fun", "")
74+
for func_filter in ctx.config.jobs:
75+
if fnmatch.fnmatch(salt_func, func_filter):
76+
if jid not in ctx.cache["watched_jids"]:
77+
ctx.cache["watched_jids"][jid] = {
78+
"minions": set(data["minions"]),
79+
"event": salt_event,
80+
}
81+
break
7582
elif fnmatch.fnmatch(tag, "salt/job/*/ret/*"):
7683
split_tag = tag.split("/")
7784
jid = split_tag[2]
@@ -87,7 +94,7 @@ async def process(
8794
end_time = salt_event.stamp
8895
duration = end_time - start_time
8996
grains = ctx.cache.get("grains", {}).get(minion_id, {})
90-
ret = StateAggregateCollectedEvent.construct(
97+
ret = JobAggregateCollectedEvent.construct(
9198
data=data,
9299
start_time=start_time,
93100
end_time=end_time,

0 commit comments

Comments
 (0)