Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
17 changes: 14 additions & 3 deletions src/main/java/com/arpnetworking/metrics/mad/Aggregator.java
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,6 @@
import org.apache.pekko.actor.AbstractActor;
import org.apache.pekko.actor.ActorRef;
import org.apache.pekko.actor.ActorSystem;
import org.apache.pekko.actor.PoisonPill;
import org.apache.pekko.actor.Props;
import org.apache.pekko.pattern.Patterns;

Expand Down Expand Up @@ -300,6 +299,18 @@ public Key getKey() {
private final Key _key;
}

static final class PeriodWorkerShutdown {

private PeriodWorkerShutdown() {
}

public static PeriodWorkerShutdown getInstance() {
return INSTANCE;
}

private static final PeriodWorkerShutdown INSTANCE = new PeriodWorkerShutdown();
}

/**
* Internal actor to process requests.
*/
Expand Down Expand Up @@ -348,7 +359,7 @@ private void shutdown() {
// Start period worker shutdown
final List<CompletableFuture<Boolean>> periodWorkerShutdown = new ArrayList<>();
for (final List<ActorRef> workers : _periodWorkerActors.values()) {
periodWorkerShutdown.addAll(_aggregator.shutdownActors(workers, PoisonPill.getInstance()));
periodWorkerShutdown.addAll(_aggregator.shutdownActors(workers, PeriodWorkerShutdown.getInstance()));
}

// Wait for shutdown
Expand Down Expand Up @@ -394,7 +405,7 @@ private void idleWorker(final PeriodWorkerIdle idle) {
.log();

for (final ActorRef worker : workers) {
worker.tell(PoisonPill.getInstance(), self());
worker.tell(PeriodWorkerShutdown.getInstance(), self());
}
}
}
Expand Down
7 changes: 7 additions & 0 deletions src/main/java/com/arpnetworking/metrics/mad/PeriodWorker.java
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
import org.apache.pekko.actor.AbstractActor;
import org.apache.pekko.actor.AbstractActorWithTimers;
import org.apache.pekko.actor.ActorRef;
import org.apache.pekko.actor.PoisonPill;
import org.apache.pekko.actor.Props;

import java.time.Duration;
Expand Down Expand Up @@ -138,6 +139,7 @@ public void preRestart(final Throwable reason, final Optional<Object> message) {
public AbstractActor.Receive createReceive() {
return receiveBuilder()
.match(Record.class, this::processRecord)
.match(Aggregator.PeriodWorkerShutdown.class, this::shutdown)
.matchEquals(ROTATE_MESSAGE, m -> rotateAndSchedule())
.matchEquals(IDLE_CHECK_MESSAGE, m -> checkForIdle())
.build();
Expand All @@ -163,6 +165,11 @@ private void checkForIdle() {
_hasReceivedRecords = false;
}

private void shutdown(final Aggregator.PeriodWorkerShutdown shutdown) {
timers().cancelAll();
self().tell(PoisonPill.getInstance(), self());
}

private void scheduleRotation(final ZonedDateTime now) {
if (timers().isTimerActive(ROTATE_TIMER_KEY)) {
timers().cancel(ROTATE_TIMER_KEY);
Expand Down