|
17 | 17 | import org.elasticsearch.xpack.core.watcher.support.WatcherDateTimeUtils; |
18 | 18 | import org.elasticsearch.xpack.core.watcher.trigger.TriggerEvent; |
19 | 19 | import org.elasticsearch.xpack.core.watcher.watch.Watch; |
| 20 | +import org.elasticsearch.xpack.core.watcher.watch.WatchStatus; |
| 21 | +import org.elasticsearch.xpack.watcher.trigger.schedule.IntervalSchedule; |
20 | 22 | import org.elasticsearch.xpack.watcher.trigger.schedule.Schedule; |
21 | 23 | import org.elasticsearch.xpack.watcher.trigger.schedule.ScheduleRegistry; |
22 | 24 | import org.elasticsearch.xpack.watcher.trigger.schedule.ScheduleTrigger; |
|
32 | 34 | import java.util.Collections; |
33 | 35 | import java.util.List; |
34 | 36 | import java.util.Map; |
| 37 | +import java.util.Optional; |
35 | 38 | import java.util.concurrent.ConcurrentHashMap; |
36 | 39 | import java.util.concurrent.CountDownLatch; |
37 | 40 | import java.util.concurrent.atomic.AtomicBoolean; |
@@ -67,7 +70,17 @@ public synchronized void start(Collection<Watch> jobs) { |
67 | 70 | Map<String, ActiveSchedule> startingSchedules = Maps.newMapWithExpectedSize(jobs.size()); |
68 | 71 | for (Watch job : jobs) { |
69 | 72 | if (job.trigger() instanceof ScheduleTrigger trigger) { |
70 | | - startingSchedules.put(job.id(), new ActiveSchedule(job.id(), trigger.getSchedule(), startTime)); |
| 73 | + if (trigger.getSchedule() instanceof IntervalSchedule) { |
| 74 | + long firstActivated = Optional.ofNullable(job.status()) |
| 75 | + .map(WatchStatus::state) |
| 76 | + .map(WatchStatus.State::getTimestamp) |
| 77 | + .map(ZonedDateTime::toInstant) |
| 78 | + .map(Instant::toEpochMilli) |
| 79 | + .orElse(startTime); |
| 80 | + startingSchedules.put(job.id(), new ActiveSchedule(job.id(), trigger.getSchedule(), firstActivated)); |
| 81 | + } else { |
| 82 | + startingSchedules.put(job.id(), new ActiveSchedule(job.id(), trigger.getSchedule(), startTime)); |
| 83 | + } |
71 | 84 | } |
72 | 85 | } |
73 | 86 | // why are we calling putAll() here instead of assigning a brand |
@@ -108,7 +121,18 @@ public void add(Watch watch) { |
108 | 121 | // watcher indexing listener |
109 | 122 | // this also means that updating an existing watch would not retrigger the schedule time, if it remains the same schedule |
110 | 123 | if (currentSchedule == null || currentSchedule.schedule.equals(trigger.getSchedule()) == false) { |
111 | | - schedules.put(watch.id(), new ActiveSchedule(watch.id(), trigger.getSchedule(), clock.millis())); |
| 124 | + if (trigger.getSchedule() instanceof IntervalSchedule) { |
| 125 | + long firstActivated = Optional.ofNullable(watch.status()) |
| 126 | + .map(WatchStatus::state) |
| 127 | + .map(WatchStatus.State::getTimestamp) |
| 128 | + .map(ZonedDateTime::toInstant) |
| 129 | + .map(Instant::toEpochMilli) |
| 130 | + .orElse(clock.millis()); |
| 131 | + schedules.put(watch.id(), new ActiveSchedule(watch.id(), trigger.getSchedule(), firstActivated)); |
| 132 | + } else { |
| 133 | + schedules.put(watch.id(), new ActiveSchedule(watch.id(), trigger.getSchedule(), clock.millis())); |
| 134 | + } |
| 135 | + |
112 | 136 | } |
113 | 137 | } |
114 | 138 |
|
|
0 commit comments