@@ -62,24 +62,27 @@ async def process(
62
62
salt_event = event .salt_event
63
63
tag = salt_event .tag
64
64
data = salt_event .data
65
+ if "watched_jids" not in ctx .cache :
66
+ ctx .cache ["watched_jids" ] = {}
65
67
if fnmatch .fnmatch (tag , "salt/job/*/new" ):
68
+ jid = tag .split ("/" )[2 ]
66
69
# We will probably want to make this condition configurable
67
- if TYPE_CHECKING :
68
- assert isinstance (salt_event .data , dict )
69
- if data .get ("fun" ) == "state.apply" :
70
- jid = tag .split ("/" )[2 ]
71
- if "watched_jids" not in ctx .cache :
72
- ctx .cache ["watched_jids" ] = {}
73
- # We are going to want a TTL at some point for the watched jids
74
- ctx .cache ["watched_jids" ][jid ] = salt_event
70
+ if jid not in ctx .cache ["watched_jids" ]:
71
+ ctx .cache ["watched_jids" ][jid ] = {
72
+ "minions" : set (salt_event .data ["minions" ]),
73
+ "event" : salt_event ,
74
+ }
75
75
elif fnmatch .fnmatch (tag , "salt/job/*/ret/*" ):
76
76
split_tag = tag .split ("/" )
77
77
jid = split_tag [2 ]
78
- if "watched_jids" not in ctx .cache :
79
- ctx .cache ["watched_jids" ] = {}
78
+ minion_id = split_tag [- 1 ]
80
79
if jid in ctx .cache ["watched_jids" ]:
81
- job_start_event = ctx .cache ["watched_jids" ][jid ]
82
- minion_id = split_tag [- 1 ]
80
+ ctx .cache ["watched_jids" ][jid ]["minions" ].remove (minion_id )
81
+ if not ctx .cache ["watched_jids" ][jid ]["minions" ]:
82
+ # No more minions should return. Remove jid from cache
83
+ job_start_event = ctx .cache ["watched_jids" ].pop (jid )["event" ]
84
+ else :
85
+ job_start_event = ctx .cache ["watched_jids" ][jid ]["event" ]
83
86
start_time = job_start_event .stamp
84
87
end_time = salt_event .stamp
85
88
duration = end_time - start_time
0 commit comments