Skip to content

Commit 4cefc05

Browse files
lukewhitinggeorgewallace
authored andcommitted
elastic#111433 Watch Next Run Interval Resets On Shard Move or Node Restart (elastic#115102)
* Switch Watcher scheduler to use last exec time when restarting, moving shards or resuming from stopped. * Add tests for last runtime calculation * Update docs/changelog/115102.yaml * Add counter to watcher job executions to check no additional executions happen during test
1 parent db68952 commit 4cefc05

File tree

3 files changed

+283
-2
lines changed

3 files changed

+283
-2
lines changed

docs/changelog/115102.yaml

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,6 @@
1+
pr: 115102
2+
summary: Watch Next Run Interval Resets On Shard Move or Node Restart
3+
area: Watcher
4+
type: bug
5+
issues:
6+
- 111433

x-pack/plugin/watcher/src/main/java/org/elasticsearch/xpack/watcher/trigger/schedule/engine/TickerScheduleTriggerEngine.java

Lines changed: 38 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,8 @@
1717
import org.elasticsearch.xpack.core.watcher.support.WatcherDateTimeUtils;
1818
import org.elasticsearch.xpack.core.watcher.trigger.TriggerEvent;
1919
import org.elasticsearch.xpack.core.watcher.watch.Watch;
20+
import org.elasticsearch.xpack.core.watcher.watch.WatchStatus;
21+
import org.elasticsearch.xpack.watcher.trigger.schedule.IntervalSchedule;
2022
import org.elasticsearch.xpack.watcher.trigger.schedule.Schedule;
2123
import org.elasticsearch.xpack.watcher.trigger.schedule.ScheduleRegistry;
2224
import org.elasticsearch.xpack.watcher.trigger.schedule.ScheduleTrigger;
@@ -32,6 +34,7 @@
3234
import java.util.Collections;
3335
import java.util.List;
3436
import java.util.Map;
37+
import java.util.Optional;
3538
import java.util.concurrent.ConcurrentHashMap;
3639
import java.util.concurrent.CountDownLatch;
3740
import java.util.concurrent.atomic.AtomicBoolean;
@@ -67,7 +70,11 @@ public synchronized void start(Collection<Watch> jobs) {
6770
Map<String, ActiveSchedule> startingSchedules = Maps.newMapWithExpectedSize(jobs.size());
6871
for (Watch job : jobs) {
6972
if (job.trigger() instanceof ScheduleTrigger trigger) {
70-
startingSchedules.put(job.id(), new ActiveSchedule(job.id(), trigger.getSchedule(), startTime));
73+
if (trigger.getSchedule() instanceof IntervalSchedule) {
74+
startingSchedules.put(job.id(), new ActiveSchedule(job.id(), trigger.getSchedule(), calculateLastStartTime(job)));
75+
} else {
76+
startingSchedules.put(job.id(), new ActiveSchedule(job.id(), trigger.getSchedule(), startTime));
77+
}
7178
}
7279
}
7380
// why are we calling putAll() here instead of assigning a brand
@@ -108,10 +115,39 @@ public void add(Watch watch) {
108115
// watcher indexing listener
109116
// this also means that updating an existing watch would not retrigger the schedule time, if it remains the same schedule
110117
if (currentSchedule == null || currentSchedule.schedule.equals(trigger.getSchedule()) == false) {
111-
schedules.put(watch.id(), new ActiveSchedule(watch.id(), trigger.getSchedule(), clock.millis()));
118+
if (trigger.getSchedule() instanceof IntervalSchedule) {
119+
schedules.put(watch.id(), new ActiveSchedule(watch.id(), trigger.getSchedule(), calculateLastStartTime(watch)));
120+
} else {
121+
schedules.put(watch.id(), new ActiveSchedule(watch.id(), trigger.getSchedule(), clock.millis()));
122+
}
123+
112124
}
113125
}
114126

127+
/**
128+
* Attempts to calculate the epoch millis of the last time the watch was checked, If the watch has never been checked, the timestamp of
129+
* the last state change is used. If the watch has never been checked and has never been in an active state, the current time is used.
130+
* @param job the watch to calculate the last start time for
131+
* @return the epoch millis of the last time the watch was checked or now
132+
*/
133+
private long calculateLastStartTime(Watch job) {
134+
var lastChecked = Optional.ofNullable(job)
135+
.map(Watch::status)
136+
.map(WatchStatus::lastChecked)
137+
.map(ZonedDateTime::toInstant)
138+
.map(Instant::toEpochMilli);
139+
140+
return lastChecked.orElseGet(
141+
() -> Optional.ofNullable(job)
142+
.map(Watch::status)
143+
.map(WatchStatus::state)
144+
.map(WatchStatus.State::getTimestamp)
145+
.map(ZonedDateTime::toInstant)
146+
.map(Instant::toEpochMilli)
147+
.orElse(clock.millis())
148+
);
149+
}
150+
115151
@Override
116152
public boolean remove(String jobId) {
117153
logger.debug("Removing watch [{}] from engine (engine is running: {})", jobId, isRunning.get());

x-pack/plugin/watcher/src/test/java/org/elasticsearch/xpack/watcher/trigger/schedule/engine/TickerScheduleEngineTests.java

Lines changed: 239 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,7 @@
1212
import org.elasticsearch.xpack.core.watcher.trigger.TriggerEvent;
1313
import org.elasticsearch.xpack.core.watcher.watch.ClockMock;
1414
import org.elasticsearch.xpack.core.watcher.watch.Watch;
15+
import org.elasticsearch.xpack.core.watcher.watch.WatchStatus;
1516
import org.elasticsearch.xpack.watcher.condition.InternalAlwaysCondition;
1617
import org.elasticsearch.xpack.watcher.input.none.ExecutableNoneInput;
1718
import org.elasticsearch.xpack.watcher.trigger.schedule.Schedule;
@@ -283,6 +284,244 @@ public void testAddOnlyWithNewSchedule() {
283284
assertThat(engine.getSchedules().get("_id"), not(is(activeSchedule)));
284285
}
285286

287+
/**
288+
* This test verifies that a watch with a valid lastCheckedTime executes before the interval time to ensure the job resumes waiting
289+
* from the same point it left off before the reallocation / restart
290+
*/
291+
public void testWatchWithLastCheckedTimeExecutesBeforeInitialInterval() throws Exception {
292+
final var firstLatch = new CountDownLatch(1);
293+
final var secondLatch = new CountDownLatch(1);
294+
295+
Watch watch = new Watch(
296+
"watch",
297+
new ScheduleTrigger(interval("1s")),
298+
new ExecutableNoneInput(),
299+
InternalAlwaysCondition.INSTANCE,
300+
null,
301+
null,
302+
Collections.emptyList(),
303+
null,
304+
new WatchStatus(-1L, null, null, clock.instant().minusMillis(500).atZone(ZoneOffset.UTC), null, null, null),
305+
SequenceNumbers.UNASSIGNED_SEQ_NO,
306+
SequenceNumbers.UNASSIGNED_PRIMARY_TERM
307+
);
308+
309+
var watches = Collections.singletonList(watch);
310+
311+
var runCount = new AtomicInteger(0);
312+
313+
engine.register(events -> {
314+
for (TriggerEvent ignored : events) {
315+
if (runCount.get() == 0) {
316+
logger.info("job first fire");
317+
firstLatch.countDown();
318+
} else {
319+
logger.info("job second fire");
320+
secondLatch.countDown();
321+
}
322+
runCount.incrementAndGet();
323+
}
324+
});
325+
326+
engine.start(watches);
327+
advanceClockIfNeeded(clock.instant().plusMillis(510).atZone(ZoneOffset.UTC));
328+
if (firstLatch.await(3, TimeUnit.SECONDS) == false) {
329+
fail("waiting too long for all watches to be triggered");
330+
}
331+
332+
advanceClockIfNeeded(clock.instant().plusMillis(1100).atZone(ZoneOffset.UTC));
333+
if (secondLatch.await(3, TimeUnit.SECONDS) == false) {
334+
fail("waiting too long for all watches to be triggered");
335+
}
336+
337+
assertThat(runCount.get(), is(2));
338+
339+
engine.stop();
340+
}
341+
342+
/**
343+
* This test verifies that a watch without a lastCheckedTime but with a valid activationTime executes before the interval time to
344+
* ensure the job resumes waiting from the same point it left off before the reallocation / restart
345+
*/
346+
public void testWatchWithNoLastCheckedTimeButHasActivationTimeExecutesBeforeInitialInterval() throws Exception {
347+
final var firstLatch = new CountDownLatch(1);
348+
final var secondLatch = new CountDownLatch(1);
349+
350+
Watch watch = new Watch(
351+
"watch",
352+
new ScheduleTrigger(interval("1s")),
353+
new ExecutableNoneInput(),
354+
InternalAlwaysCondition.INSTANCE,
355+
null,
356+
null,
357+
Collections.emptyList(),
358+
null,
359+
new WatchStatus(
360+
-1L,
361+
new WatchStatus.State(true, clock.instant().minusMillis(500).atZone(ZoneOffset.UTC)),
362+
null,
363+
null,
364+
null,
365+
null,
366+
null
367+
),
368+
SequenceNumbers.UNASSIGNED_SEQ_NO,
369+
SequenceNumbers.UNASSIGNED_PRIMARY_TERM
370+
);
371+
372+
var watches = Collections.singletonList(watch);
373+
374+
var runCount = new AtomicInteger(0);
375+
376+
engine.register(events -> {
377+
for (TriggerEvent ignored : events) {
378+
if (runCount.get() == 0) {
379+
logger.info("job first fire");
380+
firstLatch.countDown();
381+
} else {
382+
logger.info("job second fire");
383+
secondLatch.countDown();
384+
}
385+
runCount.incrementAndGet();
386+
}
387+
});
388+
389+
engine.start(watches);
390+
advanceClockIfNeeded(clock.instant().plusMillis(510).atZone(ZoneOffset.UTC));
391+
if (firstLatch.await(3, TimeUnit.SECONDS) == false) {
392+
fail("waiting too long for all watches to be triggered");
393+
}
394+
395+
advanceClockIfNeeded(clock.instant().plusMillis(1100).atZone(ZoneOffset.UTC));
396+
if (secondLatch.await(3, TimeUnit.SECONDS) == false) {
397+
fail("waiting too long for all watches to be triggered");
398+
}
399+
400+
assertThat(runCount.get(), is(2));
401+
402+
engine.stop();
403+
}
404+
405+
/**
406+
* This test verifies that a watch added after service start with a lastCheckedTime executes before the interval time to ensure the job
407+
* resumes waiting from the same point it left off before the reallocation / restart
408+
*/
409+
public void testAddWithLastCheckedTimeExecutesBeforeInitialInterval() throws Exception {
410+
final var firstLatch = new CountDownLatch(1);
411+
final var secondLatch = new CountDownLatch(1);
412+
413+
Watch watch = new Watch(
414+
"watch",
415+
new ScheduleTrigger(interval("1s")),
416+
new ExecutableNoneInput(),
417+
InternalAlwaysCondition.INSTANCE,
418+
null,
419+
null,
420+
Collections.emptyList(),
421+
null,
422+
new WatchStatus(-1L, null, null, clock.instant().minusMillis(500).atZone(ZoneOffset.UTC), null, null, null),
423+
SequenceNumbers.UNASSIGNED_SEQ_NO,
424+
SequenceNumbers.UNASSIGNED_PRIMARY_TERM
425+
);
426+
427+
var runCount = new AtomicInteger(0);
428+
429+
engine.register(events -> {
430+
for (TriggerEvent ignored : events) {
431+
if (runCount.get() == 0) {
432+
logger.info("job first fire");
433+
firstLatch.countDown();
434+
} else {
435+
logger.info("job second fire");
436+
secondLatch.countDown();
437+
}
438+
runCount.incrementAndGet();
439+
}
440+
});
441+
442+
engine.start(Collections.emptyList());
443+
advanceClockIfNeeded(clock.instant().plusMillis(1100).atZone(ZoneOffset.UTC));
444+
engine.add(watch);
445+
446+
advanceClockIfNeeded(clock.instant().plusMillis(510).atZone(ZoneOffset.UTC));
447+
if (firstLatch.await(3, TimeUnit.SECONDS) == false) {
448+
fail("waiting too long for all watches to be triggered");
449+
}
450+
451+
advanceClockIfNeeded(clock.instant().plusMillis(1100).atZone(ZoneOffset.UTC));
452+
if (secondLatch.await(3, TimeUnit.SECONDS) == false) {
453+
fail("waiting too long for all watches to be triggered");
454+
}
455+
456+
assertThat(runCount.get(), is(2));
457+
458+
engine.stop();
459+
}
460+
461+
/**
462+
* This test verifies that a watch added after service start without a lastCheckedTime but with a valid activationTime executes before
463+
* the interval time to ensure the job resumes waiting from the same point it left off before the reallocation / restart
464+
*/
465+
public void testAddWithNoLastCheckedTimeButHasActivationTimeExecutesBeforeInitialInterval() throws Exception {
466+
final var firstLatch = new CountDownLatch(1);
467+
final var secondLatch = new CountDownLatch(1);
468+
469+
Watch watch = new Watch(
470+
"watch",
471+
new ScheduleTrigger(interval("1s")),
472+
new ExecutableNoneInput(),
473+
InternalAlwaysCondition.INSTANCE,
474+
null,
475+
null,
476+
Collections.emptyList(),
477+
null,
478+
new WatchStatus(
479+
-1L,
480+
new WatchStatus.State(true, clock.instant().minusMillis(500).atZone(ZoneOffset.UTC)),
481+
null,
482+
null,
483+
null,
484+
null,
485+
null
486+
),
487+
SequenceNumbers.UNASSIGNED_SEQ_NO,
488+
SequenceNumbers.UNASSIGNED_PRIMARY_TERM
489+
);
490+
491+
var runCount = new AtomicInteger(0);
492+
493+
engine.register(events -> {
494+
for (TriggerEvent ignored : events) {
495+
if (runCount.get() == 0) {
496+
logger.info("job first fire");
497+
firstLatch.countDown();
498+
} else {
499+
logger.info("job second fire");
500+
secondLatch.countDown();
501+
}
502+
runCount.incrementAndGet();
503+
}
504+
});
505+
506+
engine.start(Collections.emptyList());
507+
advanceClockIfNeeded(clock.instant().plusMillis(1100).atZone(ZoneOffset.UTC));
508+
engine.add(watch);
509+
510+
advanceClockIfNeeded(clock.instant().plusMillis(510).atZone(ZoneOffset.UTC));
511+
if (firstLatch.await(3, TimeUnit.SECONDS) == false) {
512+
fail("waiting too long for all watches to be triggered");
513+
}
514+
515+
advanceClockIfNeeded(clock.instant().plusMillis(1100).atZone(ZoneOffset.UTC));
516+
if (secondLatch.await(3, TimeUnit.SECONDS) == false) {
517+
fail("waiting too long for all watches to be triggered");
518+
}
519+
520+
assertThat(runCount.get(), is(2));
521+
522+
engine.stop();
523+
}
524+
286525
private Watch createWatch(String name, Schedule schedule) {
287526
return new Watch(
288527
name,

0 commit comments

Comments
 (0)