From 4975d280cef8761398708bfc23b3c77f970cc2a3 Mon Sep 17 00:00:00 2001 From: Brandon Arp Date: Thu, 5 Jun 2025 09:10:58 -0700 Subject: [PATCH] two phase shutdown to prevent race conditions --- .../arpnetworking/metrics/mad/Aggregator.java | 17 ++++++++++++++--- .../arpnetworking/metrics/mad/PeriodWorker.java | 7 +++++++ 2 files changed, 21 insertions(+), 3 deletions(-) diff --git a/src/main/java/com/arpnetworking/metrics/mad/Aggregator.java b/src/main/java/com/arpnetworking/metrics/mad/Aggregator.java index 6f4772ef..40ac6d13 100644 --- a/src/main/java/com/arpnetworking/metrics/mad/Aggregator.java +++ b/src/main/java/com/arpnetworking/metrics/mad/Aggregator.java @@ -44,7 +44,6 @@ import org.apache.pekko.actor.AbstractActor; import org.apache.pekko.actor.ActorRef; import org.apache.pekko.actor.ActorSystem; -import org.apache.pekko.actor.PoisonPill; import org.apache.pekko.actor.Props; import org.apache.pekko.pattern.Patterns; @@ -300,6 +299,18 @@ public Key getKey() { private final Key _key; } + static final class PeriodWorkerShutdown { + + private PeriodWorkerShutdown() { + } + + public static PeriodWorkerShutdown getInstance() { + return INSTANCE; + } + + private static final PeriodWorkerShutdown INSTANCE = new PeriodWorkerShutdown(); + } + /** * Internal actor to process requests. */ @@ -348,7 +359,7 @@ private void shutdown() { // Start period worker shutdown final List> periodWorkerShutdown = new ArrayList<>(); for (final List workers : _periodWorkerActors.values()) { - periodWorkerShutdown.addAll(_aggregator.shutdownActors(workers, PoisonPill.getInstance())); + periodWorkerShutdown.addAll(_aggregator.shutdownActors(workers, PeriodWorkerShutdown.getInstance())); } // Wait for shutdown @@ -394,7 +405,7 @@ private void idleWorker(final PeriodWorkerIdle idle) { .log(); for (final ActorRef worker : workers) { - worker.tell(PoisonPill.getInstance(), self()); + worker.tell(PeriodWorkerShutdown.getInstance(), self()); } } } diff --git a/src/main/java/com/arpnetworking/metrics/mad/PeriodWorker.java b/src/main/java/com/arpnetworking/metrics/mad/PeriodWorker.java index 27efcb7a..9acd6b13 100644 --- a/src/main/java/com/arpnetworking/metrics/mad/PeriodWorker.java +++ b/src/main/java/com/arpnetworking/metrics/mad/PeriodWorker.java @@ -26,6 +26,7 @@ import org.apache.pekko.actor.AbstractActor; import org.apache.pekko.actor.AbstractActorWithTimers; import org.apache.pekko.actor.ActorRef; +import org.apache.pekko.actor.PoisonPill; import org.apache.pekko.actor.Props; import java.time.Duration; @@ -138,6 +139,7 @@ public void preRestart(final Throwable reason, final Optional message) { public AbstractActor.Receive createReceive() { return receiveBuilder() .match(Record.class, this::processRecord) + .match(Aggregator.PeriodWorkerShutdown.class, this::shutdown) .matchEquals(ROTATE_MESSAGE, m -> rotateAndSchedule()) .matchEquals(IDLE_CHECK_MESSAGE, m -> checkForIdle()) .build(); @@ -163,6 +165,11 @@ private void checkForIdle() { _hasReceivedRecords = false; } + private void shutdown(final Aggregator.PeriodWorkerShutdown shutdown) { + timers().cancelAll(); + self().tell(PoisonPill.getInstance(), self()); + } + private void scheduleRotation(final ZonedDateTime now) { if (timers().isTimerActive(ROTATE_TIMER_KEY)) { timers().cancel(ROTATE_TIMER_KEY);