10
10
from typing import TYPE_CHECKING
11
11
from typing import AsyncIterator
12
12
from typing import Dict
13
+ from typing import Set
13
14
from typing import Type
14
15
15
16
from saf .collect .event_bus import EventBusCollectedEvent
25
26
log = logging .getLogger (__name__ )
26
27
27
28
28
- class StateAggregateConfig (ProcessConfigBase ):
29
+ class JobAggregateConfig (ProcessConfigBase ):
29
30
"""
30
31
Job aggregate collector configuration.
31
32
"""
32
33
34
+ jobs : Set [str ]
33
35
34
- def get_config_schema () -> Type [StateAggregateConfig ]:
36
+
37
+ def get_config_schema () -> Type [JobAggregateConfig ]:
35
38
"""
36
39
Get the job aggregate collect plugin configuration schema.
37
40
"""
38
- return StateAggregateConfig
41
+ return JobAggregateConfig
39
42
40
43
41
- class StateAggregateCollectedEvent (CollectedEvent ):
44
+ class JobAggregateCollectedEvent (CollectedEvent ):
42
45
"""
43
- A collected event with aggregated state run information.
46
+ A collected event with aggregated job run information.
44
47
"""
45
48
46
49
start_time : datetime
@@ -52,7 +55,7 @@ class StateAggregateCollectedEvent(CollectedEvent):
52
55
53
56
async def process (
54
57
* ,
55
- ctx : PipelineRunContext [StateAggregateConfig ],
58
+ ctx : PipelineRunContext [JobAggregateConfig ],
56
59
event : CollectedEvent ,
57
60
) -> AsyncIterator [CollectedEvent ]:
58
61
"""
@@ -67,11 +70,15 @@ async def process(
67
70
if fnmatch .fnmatch (tag , "salt/job/*/new" ):
68
71
jid = tag .split ("/" )[2 ]
69
72
# We will probably want to make this condition configurable
70
- if jid not in ctx .cache ["watched_jids" ]:
71
- ctx .cache ["watched_jids" ][jid ] = {
72
- "minions" : set (salt_event .data ["minions" ]),
73
- "event" : salt_event ,
74
- }
73
+ salt_func = data .get ("fun" , "" )
74
+ for func_filter in ctx .config .jobs :
75
+ if fnmatch .fnmatch (salt_func , func_filter ):
76
+ if jid not in ctx .cache ["watched_jids" ]:
77
+ ctx .cache ["watched_jids" ][jid ] = {
78
+ "minions" : set (data ["minions" ]),
79
+ "event" : salt_event ,
80
+ }
81
+ break
75
82
elif fnmatch .fnmatch (tag , "salt/job/*/ret/*" ):
76
83
split_tag = tag .split ("/" )
77
84
jid = split_tag [2 ]
@@ -87,7 +94,7 @@ async def process(
87
94
end_time = salt_event .stamp
88
95
duration = end_time - start_time
89
96
grains = ctx .cache .get ("grains" , {}).get (minion_id , {})
90
- ret = StateAggregateCollectedEvent .construct (
97
+ ret = JobAggregateCollectedEvent .construct (
91
98
data = data ,
92
99
start_time = start_time ,
93
100
end_time = end_time ,
0 commit comments