Skip to content

Commit d9fb5f8

Browse files
authored
fix(rules): automatic refire with backoff (#732)
1 parent 9ca5669 commit d9fb5f8

File tree

2 files changed

+360
-177
lines changed

2 files changed

+360
-177
lines changed
Lines changed: 259 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,259 @@
1+
/*
2+
* Copyright The Cryostat Authors.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
package io.cryostat.rules;
17+
18+
import java.time.Duration;
19+
import java.util.Date;
20+
import java.util.List;
21+
import java.util.Map;
22+
import java.util.Objects;
23+
import java.util.Optional;
24+
import java.util.concurrent.CopyOnWriteArrayList;
25+
import java.util.stream.Collectors;
26+
27+
import org.openjdk.jmc.common.unit.QuantityConversionException;
28+
29+
import io.cryostat.ConfigProperties;
30+
import io.cryostat.expressions.MatchExpressionEvaluator;
31+
import io.cryostat.libcryostat.templates.Template;
32+
import io.cryostat.libcryostat.templates.TemplateType;
33+
import io.cryostat.recordings.ActiveRecording;
34+
import io.cryostat.recordings.RecordingHelper;
35+
import io.cryostat.recordings.RecordingHelper.RecordingOptions;
36+
import io.cryostat.recordings.RecordingHelper.RecordingReplace;
37+
import io.cryostat.rules.Rule.RuleEvent;
38+
import io.cryostat.rules.RuleService.ActivationAttempt;
39+
import io.cryostat.targets.Target;
40+
import io.cryostat.targets.Target.TargetDiscovery;
41+
42+
import io.quarkus.runtime.ShutdownEvent;
43+
import io.quarkus.vertx.ConsumeEvent;
44+
import jakarta.enterprise.context.ApplicationScoped;
45+
import jakarta.enterprise.event.Observes;
46+
import jakarta.inject.Inject;
47+
import jakarta.transaction.Transactional;
48+
import org.apache.commons.lang3.tuple.Pair;
49+
import org.eclipse.microprofile.config.inject.ConfigProperty;
50+
import org.jboss.logging.Logger;
51+
import org.quartz.JobBuilder;
52+
import org.quartz.JobDetail;
53+
import org.quartz.JobKey;
54+
import org.quartz.Scheduler;
55+
import org.quartz.SchedulerException;
56+
import org.quartz.SimpleScheduleBuilder;
57+
import org.quartz.Trigger;
58+
import org.quartz.TriggerBuilder;
59+
60+
@ApplicationScoped
61+
public class RuleExecutor {
62+
63+
@Inject Logger logger;
64+
@Inject RecordingHelper recordingHelper;
65+
@Inject MatchExpressionEvaluator evaluator;
66+
@Inject Scheduler quartz;
67+
68+
@ConfigProperty(name = ConfigProperties.CONNECTIONS_FAILED_TIMEOUT)
69+
Duration connectionFailedTimeout;
70+
71+
private final List<JobKey> jobs = new CopyOnWriteArrayList<>();
72+
73+
void onStop(@Observes ShutdownEvent evt) throws SchedulerException {
74+
quartz.shutdown();
75+
}
76+
77+
@ConsumeEvent(blocking = true)
78+
@Transactional
79+
void onMessage(ActivationAttempt attempt) throws QuantityConversionException {
80+
Target attachedTarget = Target.<Target>find("id", attempt.target().id).singleResult();
81+
recordingHelper
82+
.getActiveRecording(
83+
attachedTarget,
84+
r -> Objects.equals(r.name, attempt.rule().getRecordingName()))
85+
.ifPresent(
86+
rec -> {
87+
try {
88+
recordingHelper
89+
.stopRecording(rec)
90+
.await()
91+
.atMost(connectionFailedTimeout);
92+
} catch (Exception e) {
93+
logger.warn(e);
94+
}
95+
});
96+
97+
Pair<String, TemplateType> pair =
98+
recordingHelper.parseEventSpecifier(attempt.rule().eventSpecifier);
99+
Template template =
100+
recordingHelper.getPreferredTemplate(
101+
attempt.target(), pair.getKey(), pair.getValue());
102+
103+
ActiveRecording recording =
104+
recordingHelper
105+
.startRecording(
106+
attachedTarget,
107+
RecordingReplace.STOPPED,
108+
template,
109+
createRecordingOptions(attempt.rule()),
110+
Map.of("rule", attempt.rule().name))
111+
.await()
112+
.atMost(Duration.ofSeconds(10));
113+
114+
if (attempt.rule().isArchiver()) {
115+
scheduleArchival(attempt.rule(), attachedTarget, recording);
116+
}
117+
}
118+
119+
@ConsumeEvent(value = Target.TARGET_JVM_DISCOVERY, blocking = true)
120+
void onMessage(TargetDiscovery event) {
121+
switch (event.kind()) {
122+
case LOST:
123+
for (var jk : jobs) {
124+
if (Objects.equals(event.serviceRef().jvmId, jk.getGroup())) {
125+
try {
126+
quartz.deleteJob(jk);
127+
} catch (SchedulerException e) {
128+
logger.errorv(
129+
"Failed to delete job {0} due to loss of target {1}",
130+
jk.getName(), event.serviceRef().connectUrl);
131+
} finally {
132+
jobs.remove(jk);
133+
}
134+
}
135+
}
136+
break;
137+
default:
138+
break;
139+
}
140+
}
141+
142+
@ConsumeEvent(value = Rule.RULE_ADDRESS, blocking = true)
143+
@Transactional
144+
public void handleRuleModification(RuleEvent event) {
145+
Rule rule = event.rule();
146+
switch (event.category()) {
147+
case UPDATED:
148+
if (!rule.enabled) {
149+
cancelTasksForRule(rule);
150+
}
151+
break;
152+
case DELETED:
153+
cancelTasksForRule(rule);
154+
break;
155+
default:
156+
break;
157+
}
158+
}
159+
160+
@ConsumeEvent(value = Rule.RULE_ADDRESS + "?clean", blocking = true)
161+
@Transactional
162+
public void handleRuleRecordingCleanup(Rule rule) {
163+
cancelTasksForRule(rule);
164+
var targets =
165+
evaluator.getMatchedTargets(rule.matchExpression).stream()
166+
.collect(Collectors.toList());
167+
for (var target : targets) {
168+
recordingHelper
169+
.getActiveRecording(
170+
target, r -> Objects.equals(r.name, rule.getRecordingName()))
171+
.ifPresent(
172+
recording -> {
173+
try {
174+
recordingHelper
175+
.stopRecording(recording)
176+
.await()
177+
.atMost(connectionFailedTimeout);
178+
} catch (Exception e) {
179+
logger.warn(e);
180+
}
181+
});
182+
}
183+
}
184+
185+
private void cancelTasksForRule(Rule rule) {
186+
if (rule.isArchiver()) {
187+
List<String> targets =
188+
evaluator.getMatchedTargets(rule.matchExpression).stream()
189+
.map(t -> t.jvmId)
190+
.collect(Collectors.toList());
191+
for (var jk : jobs) {
192+
if (targets.contains(jk.getGroup())) {
193+
try {
194+
quartz.deleteJob(jk);
195+
} catch (SchedulerException e) {
196+
logger.error(
197+
"Failed to delete job " + jk.getName() + " for rule " + rule.name);
198+
} finally {
199+
jobs.remove(jk);
200+
}
201+
}
202+
}
203+
}
204+
}
205+
206+
private RecordingOptions createRecordingOptions(Rule rule) {
207+
return new RecordingOptions(
208+
rule.getRecordingName(),
209+
Optional.of(true),
210+
Optional.of(true),
211+
Optional.empty(),
212+
Optional.ofNullable((long) rule.maxSizeBytes),
213+
Optional.ofNullable((long) rule.maxAgeSeconds));
214+
}
215+
216+
private void scheduleArchival(Rule rule, Target target, ActiveRecording recording) {
217+
JobDetail jobDetail =
218+
JobBuilder.newJob(ScheduledArchiveJob.class)
219+
.withIdentity(rule.name, target.jvmId)
220+
.build();
221+
222+
if (jobs.contains(jobDetail.getKey())) {
223+
return;
224+
}
225+
226+
int initialDelay = rule.initialDelaySeconds;
227+
int archivalPeriodSeconds = rule.archivalPeriodSeconds;
228+
if (initialDelay <= 0) {
229+
initialDelay = archivalPeriodSeconds;
230+
}
231+
232+
Map<String, Object> data = jobDetail.getJobDataMap();
233+
data.put("rule", rule.id);
234+
data.put("target", target.id);
235+
data.put("recording", recording.remoteId);
236+
237+
Trigger trigger =
238+
TriggerBuilder.newTrigger()
239+
.withIdentity(rule.name, target.jvmId)
240+
.usingJobData(jobDetail.getJobDataMap())
241+
.withSchedule(
242+
SimpleScheduleBuilder.simpleSchedule()
243+
.withIntervalInSeconds(archivalPeriodSeconds)
244+
.repeatForever()
245+
.withMisfireHandlingInstructionNowWithExistingCount())
246+
.startAt(new Date(System.currentTimeMillis() + initialDelay * 1000))
247+
.build();
248+
try {
249+
quartz.scheduleJob(jobDetail, trigger);
250+
} catch (SchedulerException e) {
251+
logger.errorv(
252+
e,
253+
"Failed to schedule archival job for rule {0} in target {1}",
254+
rule.name,
255+
target.alias);
256+
}
257+
jobs.add(jobDetail.getKey());
258+
}
259+
}

0 commit comments

Comments
 (0)