Skip to content

Commit 001768a

Browse files
MKLebs0undt3ch
authored andcommitted
Initial check in for job aggregations
1 parent 46e5efd commit 001768a

File tree

5 files changed

+209
-1
lines changed

5 files changed

+209
-1
lines changed

setup.cfg

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -50,13 +50,16 @@ salt.loader =
5050
salt-analytics-framework = saf.saltext
5151
saf.collect =
5252
beacons = saf.collect.beacons
53+
event-bus = saf.collect.event_bus
5354
file = saf.collect.file
5455
salt_exec = saf.collect.salt_exec
56+
salt_cmd = saf.collect.salt_cmd
5557
test = saf.collect.test
5658
saf.process =
5759
regex_mask = saf.process.regex_mask
5860
shannon_mask = saf.process.shannon_mask
5961
jupyter_notebook = saf.process.jupyter_notebook
62+
state-aggregate = saf.process.state_aggregate
6063
test = saf.process.test
6164
saf.forward =
6265
disk = saf.forward.disk

src/saf/collect/event_bus.py

Lines changed: 55 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,55 @@
1+
# Copyright 2021-2023 VMware, Inc.
2+
# SPDX-License-Identifier: Apache-2.0
3+
"""
4+
Collect events from the event bus.
5+
"""
6+
from __future__ import annotations
7+
8+
import logging
9+
from typing import AsyncIterator
10+
from typing import Set
11+
from typing import Type
12+
13+
from saf.models import CollectConfigBase
14+
from saf.models import CollectedEvent
15+
from saf.models import PipelineRunContext
16+
from saf.models import SaltEvent
17+
from saf.utils import eventbus
18+
19+
log = logging.getLogger(__name__)
20+
21+
22+
class EventBusConfig(CollectConfigBase):
23+
"""
24+
Configuration schema for the beacons collect plugin.
25+
"""
26+
27+
tags: Set[str]
28+
29+
30+
def get_config_schema() -> Type[EventBusConfig]:
31+
"""
32+
Get the event bus collect plugin configuration schema.
33+
"""
34+
return EventBusConfig
35+
36+
37+
class EventBusCollectedEvent(CollectedEvent):
38+
"""
39+
A collected event surrounding a SaltEvent.
40+
"""
41+
42+
salt_event: SaltEvent
43+
44+
45+
async def collect(
46+
*, ctx: PipelineRunContext[EventBusConfig]
47+
) -> AsyncIterator[EventBusCollectedEvent]:
48+
"""
49+
Method called to collect events.
50+
"""
51+
config = ctx.config
52+
salt_event: SaltEvent
53+
log.info("The event bus collect plugin is configured to listen to tags: %s", config.tags)
54+
async for salt_event in eventbus.iter_events(opts=ctx.salt_config.copy(), tags=config.tags):
55+
yield EventBusCollectedEvent(salt_event=salt_event, data={"tag": salt_event.tag})

src/saf/collect/salt_cmd.py

Lines changed: 56 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,56 @@
1+
# Copyright 2021-2023 VMware, Inc.
2+
# SPDX-License-Identifier: Apache-2.0
3+
"""
4+
Collect events from the event bus.
5+
"""
6+
from __future__ import annotations
7+
8+
import asyncio
9+
import logging
10+
from typing import AsyncIterator
11+
from typing import Dict
12+
from typing import List
13+
from typing import Type
14+
15+
from salt.client import LocalClient
16+
17+
from saf.models import CollectConfigBase
18+
from saf.models import CollectedEvent
19+
from saf.models import PipelineRunContext
20+
21+
log = logging.getLogger(__name__)
22+
23+
24+
class SaltCommandConfig(CollectConfigBase):
25+
"""
26+
Configuration schema for the beacons collect plugin.
27+
"""
28+
29+
targets: str
30+
cmd: str
31+
args: List[str] | None
32+
kwargs: Dict[str, str] | None
33+
interval: float = 5
34+
cache_flag: str | None = None
35+
36+
37+
def get_config_schema() -> Type[SaltCommandConfig]:
38+
"""
39+
Get the event bus collect plugin configuration schema.
40+
"""
41+
return SaltCommandConfig
42+
43+
44+
async def collect(*, ctx: PipelineRunContext[SaltCommandConfig]) -> AsyncIterator[CollectedEvent]:
45+
"""
46+
Method called to collect events.
47+
"""
48+
config = ctx.config
49+
client = LocalClient(mopts=ctx.salt_config.copy())
50+
51+
while True:
52+
ret = client.cmd(config.targets, config.cmd, arg=config.args, kwarg=config.kwargs)
53+
event = CollectedEvent(data={config._name: ret}) # noqa: SLF001
54+
log.debug("CollectedEvent: %s", event)
55+
yield event
56+
await asyncio.sleep(config.interval)

src/saf/process/state_aggregate.py

Lines changed: 89 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,89 @@
1+
# Copyright 2021-2023 VMware, Inc.
2+
# SPDX-License-Identifier: Apache-2.0
3+
"""
4+
Aggregate the necessary job info into one event to be forwarded.
5+
"""
6+
from __future__ import annotations
7+
8+
import fnmatch
9+
import logging
10+
from typing import TYPE_CHECKING
11+
from typing import AsyncIterator
12+
from typing import Type
13+
14+
from saf.collect.event_bus import EventBusCollectedEvent
15+
from saf.models import CollectedEvent
16+
from saf.models import PipelineRunContext
17+
from saf.models import ProcessConfigBase
18+
19+
if TYPE_CHECKING:
20+
from datetime import datetime
21+
from datetime import timedelta
22+
23+
log = logging.getLogger(__name__)
24+
25+
26+
class StateAggregateConfig(ProcessConfigBase):
27+
"""
28+
Job aggregate collector configuration.
29+
"""
30+
31+
32+
def get_config_schema() -> Type[StateAggregateConfig]:
33+
"""
34+
Get the job aggregate collect plugin configuration schema.
35+
"""
36+
return StateAggregateConfig
37+
38+
39+
class StateAggregateCollectedEvent(CollectedEvent):
40+
"""
41+
A collected event with aggregated state run information.
42+
"""
43+
44+
start_time: datetime
45+
end_time: datetime
46+
duration: timedelta
47+
minion_id: str
48+
49+
50+
async def process(
51+
*,
52+
ctx: PipelineRunContext[StateAggregateConfig],
53+
event: CollectedEvent,
54+
) -> AsyncIterator[CollectedEvent]:
55+
"""
56+
Aggregate received events, otherwise store in cache.
57+
"""
58+
if isinstance(event, EventBusCollectedEvent):
59+
salt_event = event.salt_event
60+
tag = salt_event.tag
61+
data = salt_event.data
62+
if fnmatch.fnmatch(tag, "salt/job/*/new"):
63+
# We will probably want to make this condition configurable
64+
if TYPE_CHECKING:
65+
assert isinstance(salt_event.data, dict)
66+
if data.get("fun") == "state.apply":
67+
jid = tag.split("/")[2]
68+
if "watched_jids" not in ctx.cache:
69+
ctx.cache["watched_jids"] = {}
70+
# We are going to want a TTL at some point for the watched jids
71+
ctx.cache["watched_jids"][jid] = salt_event
72+
elif fnmatch.fnmatch(tag, "salt/job/*/ret/*"):
73+
split_tag = tag.split("/")
74+
jid = split_tag[2]
75+
if "watched_jids" not in ctx.cache:
76+
ctx.cache["watched_jids"] = {}
77+
if jid in ctx.cache["watched_jids"]:
78+
job_start_event = ctx.cache["watched_jids"][jid]
79+
minion_id = split_tag[-1]
80+
start_time = job_start_event.stamp
81+
end_time = salt_event.stamp
82+
duration = end_time - start_time
83+
yield StateAggregateCollectedEvent.construct(
84+
data=data,
85+
start_time=start_time,
86+
end_time=end_time,
87+
duration=duration,
88+
minion_id=minion_id,
89+
)

src/saf/utils/eventbus.py

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -37,10 +37,15 @@ def _construct_event(tag: str, stamp: datetime, event_data: dict[str, Any]) -> S
3737
for key in list(event_data):
3838
if key.startswith("_"):
3939
event_data.pop(key)
40+
event_data_value = event_data.get("data") or event_data
41+
if isinstance(event_data_value, str):
42+
data = {"return": event_data_value}
43+
else:
44+
data = event_data_value
4045
salt_event = SaltEvent(
4146
tag=tag,
4247
stamp=stamp,
43-
data=event_data.get("data") or event_data,
48+
data=data,
4449
raw_data=event_raw_data,
4550
)
4651
log.debug("Constructed SaltEvent: %s", salt_event)

0 commit comments

Comments
 (0)