Skip to content

Commit d9dae11

Browse files
authored
two phase shutdown to prevent race conditions (#542)
1 parent 4feca29 commit d9dae11

File tree

2 files changed

+21
-3
lines changed

2 files changed

+21
-3
lines changed

src/main/java/com/arpnetworking/metrics/mad/Aggregator.java

Lines changed: 14 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -44,7 +44,6 @@
4444
import org.apache.pekko.actor.AbstractActor;
4545
import org.apache.pekko.actor.ActorRef;
4646
import org.apache.pekko.actor.ActorSystem;
47-
import org.apache.pekko.actor.PoisonPill;
4847
import org.apache.pekko.actor.Props;
4948
import org.apache.pekko.pattern.Patterns;
5049

@@ -300,6 +299,18 @@ public Key getKey() {
300299
private final Key _key;
301300
}
302301

302+
static final class PeriodWorkerShutdown {
303+
304+
private PeriodWorkerShutdown() {
305+
}
306+
307+
public static PeriodWorkerShutdown getInstance() {
308+
return INSTANCE;
309+
}
310+
311+
private static final PeriodWorkerShutdown INSTANCE = new PeriodWorkerShutdown();
312+
}
313+
303314
/**
304315
* Internal actor to process requests.
305316
*/
@@ -348,7 +359,7 @@ private void shutdown() {
348359
// Start period worker shutdown
349360
final List<CompletableFuture<Boolean>> periodWorkerShutdown = new ArrayList<>();
350361
for (final List<ActorRef> workers : _periodWorkerActors.values()) {
351-
periodWorkerShutdown.addAll(_aggregator.shutdownActors(workers, PoisonPill.getInstance()));
362+
periodWorkerShutdown.addAll(_aggregator.shutdownActors(workers, PeriodWorkerShutdown.getInstance()));
352363
}
353364

354365
// Wait for shutdown
@@ -394,7 +405,7 @@ private void idleWorker(final PeriodWorkerIdle idle) {
394405
.log();
395406

396407
for (final ActorRef worker : workers) {
397-
worker.tell(PoisonPill.getInstance(), self());
408+
worker.tell(PeriodWorkerShutdown.getInstance(), self());
398409
}
399410
}
400411
}

src/main/java/com/arpnetworking/metrics/mad/PeriodWorker.java

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,7 @@
2626
import org.apache.pekko.actor.AbstractActor;
2727
import org.apache.pekko.actor.AbstractActorWithTimers;
2828
import org.apache.pekko.actor.ActorRef;
29+
import org.apache.pekko.actor.PoisonPill;
2930
import org.apache.pekko.actor.Props;
3031

3132
import java.time.Duration;
@@ -138,6 +139,7 @@ public void preRestart(final Throwable reason, final Optional<Object> message) {
138139
public AbstractActor.Receive createReceive() {
139140
return receiveBuilder()
140141
.match(Record.class, this::processRecord)
142+
.match(Aggregator.PeriodWorkerShutdown.class, this::shutdown)
141143
.matchEquals(ROTATE_MESSAGE, m -> rotateAndSchedule())
142144
.matchEquals(IDLE_CHECK_MESSAGE, m -> checkForIdle())
143145
.build();
@@ -163,6 +165,11 @@ private void checkForIdle() {
163165
_hasReceivedRecords = false;
164166
}
165167

168+
private void shutdown(final Aggregator.PeriodWorkerShutdown shutdown) {
169+
timers().cancelAll();
170+
self().tell(PoisonPill.getInstance(), self());
171+
}
172+
166173
private void scheduleRotation(final ZonedDateTime now) {
167174
if (timers().isTimerActive(ROTATE_TIMER_KEY)) {
168175
timers().cancel(ROTATE_TIMER_KEY);

0 commit comments

Comments
 (0)