Skip to content

Commit ad965a2

Browse files
authored
rebalance notifications to trace, fix timers on dead actors (#72)
1 parent 5b91020 commit ad965a2

File tree

2 files changed

+47
-26
lines changed

2 files changed

+47
-26
lines changed

src/main/java/com/arpnetworking/clusteraggregator/aggregation/StreamingAggregator.java

Lines changed: 40 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -15,11 +15,10 @@
1515
*/
1616
package com.arpnetworking.clusteraggregator.aggregation;
1717

18-
import akka.actor.AbstractActor;
18+
import akka.actor.AbstractActorWithTimers;
1919
import akka.actor.ActorRef;
2020
import akka.actor.Props;
2121
import akka.actor.ReceiveTimeout;
22-
import akka.actor.Scheduler;
2322
import akka.cluster.sharding.ShardRegion;
2423
import com.arpnetworking.clusteraggregator.AggregatorLifecycle;
2524
import com.arpnetworking.clusteraggregator.models.CombinedMetricData;
@@ -58,7 +57,7 @@
5857
*
5958
* @author Brandon Arp (brandon dot arp at inscopemetrics dot com)
6059
*/
61-
public class StreamingAggregator extends AbstractActor {
60+
public class StreamingAggregator extends AbstractActorWithTimers {
6261

6362
/**
6463
* Creates a <code>Props</code> for use in Akka.
@@ -96,21 +95,9 @@ public StreamingAggregator(
9695
_clusterHostSuffix = clusterHostSuffix;
9796
context().setReceiveTimeout(FiniteDuration.apply(30, TimeUnit.MINUTES));
9897

99-
final Scheduler scheduler = getContext().system().scheduler();
100-
scheduler.schedule(
101-
FiniteDuration.apply(5, TimeUnit.SECONDS),
102-
FiniteDuration.apply(5, TimeUnit.SECONDS),
103-
getSelf(),
104-
new BucketCheck(),
105-
getContext().dispatcher(),
106-
getSelf());
107-
scheduler.schedule(
108-
FiniteDuration.apply(5, TimeUnit.SECONDS),
109-
FiniteDuration.apply(1, TimeUnit.HOURS),
110-
getSelf(),
111-
new UpdateBookkeeper(),
112-
getContext().dispatcher(),
113-
getSelf());
98+
timers().startPeriodicTimer(BUCKET_CHECK_TIMER_KEY, BucketCheck.getInstance(), FiniteDuration.apply(5, TimeUnit.SECONDS));
99+
timers().startSingleTimer(BOOKKEEPER_UPDATE_TIMER_KEY, UpdateBookkeeper.getInstance(), FiniteDuration.apply(5, TimeUnit.SECONDS));
100+
114101
_emitter = emitter;
115102
}
116103

@@ -188,10 +175,12 @@ public Receive createReceive() {
188175
}
189176
_statistics.clear();
190177
}
178+
timers().startSingleTimer(BOOKKEEPER_UPDATE_TIMER_KEY, UpdateBookkeeper.getInstance(),
179+
FiniteDuration.apply(1, TimeUnit.HOURS));
191180
})
192181
.match(ShutdownAggregator.class, message -> context().stop(self()))
193182
.match(ReceiveTimeout.class, message -> {
194-
getContext().parent().tell(new ShardRegion.Passivate(new ShutdownAggregator()), getSelf());
183+
getContext().parent().tell(new ShardRegion.Passivate(ShutdownAggregator.getInstance()), getSelf());
195184
})
196185
.build();
197186
}
@@ -332,16 +321,48 @@ private String createHost() {
332321
!(entry.getKey().equals(CombinedMetricData.CLUSTER_KEY)
333322
|| entry.getKey().equals(CombinedMetricData.HOST_KEY)
334323
|| entry.getKey().equals(CombinedMetricData.SERVICE_KEY));
324+
private static final String BUCKET_CHECK_TIMER_KEY = "bucketcheck";
325+
private static final String BOOKKEEPER_UPDATE_TIMER_KEY = "updatebookkeeper";
335326

336327
private static final class BucketCheck implements Serializable {
328+
/**
329+
* Gets the singleton instance.
330+
*
331+
* @return singleton instance
332+
*/
333+
public static BucketCheck getInstance() {
334+
return INSTANCE;
335+
}
336+
337+
private static final BucketCheck INSTANCE = new BucketCheck();
337338
private static final long serialVersionUID = 1L;
338339
}
339340

340341
private static final class UpdateBookkeeper implements Serializable {
342+
/**
343+
* Gets the singleton instance.
344+
*
345+
* @return singleton instance
346+
*/
347+
public static UpdateBookkeeper getInstance() {
348+
return INSTANCE;
349+
}
350+
351+
private static final UpdateBookkeeper INSTANCE = new UpdateBookkeeper();
341352
private static final long serialVersionUID = 1L;
342353
}
343354

344355
private static final class ShutdownAggregator implements Serializable {
356+
/**
357+
* Gets the singleton instance.
358+
*
359+
* @return singleton instance
360+
*/
361+
public static ShutdownAggregator getInstance() {
362+
return INSTANCE;
363+
}
364+
365+
private static final ShutdownAggregator INSTANCE = new ShutdownAggregator();
345366
private static final long serialVersionUID = 1L;
346367
}
347368
}

src/main/java/com/arpnetworking/utility/ParallelLeastShardAllocationStrategy.java

Lines changed: 7 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -115,15 +115,15 @@ public Future<Set<String>> rebalance(
115115

116116
// Make sure that we have more than 1 region
117117
if (leastShards == null || mostShards == null) {
118-
LOGGER.debug()
118+
LOGGER.trace()
119119
.setMessage("Cannot rebalance shards, less than 2 shard regions found.")
120120
.log();
121121
break;
122122
}
123123

124124
// Make sure that the difference is enough to warrant a rebalance
125125
if (mostShards.getEffectiveShardCount() - leastShards.getEffectiveShardCount() < _rebalanceThreshold) {
126-
LOGGER.debug()
126+
LOGGER.trace()
127127
.setMessage("Not rebalancing any (more) shards, shard region with most shards already balanced with least")
128128
.addData("most", mostShards.getEffectiveShardCount())
129129
.addData("least", leastShards.getEffectiveShardCount())
@@ -155,12 +155,12 @@ public Future<Set<String>> rebalance(
155155
currentAllocations,
156156
rebalanceInProgress,
157157
_pendingRebalances);
158-
LOGGER.debug()
159-
.setMessage("Broadcasting rebalance info")
160-
.addData("target", _notify)
161-
.addData("shardAllocations", notification)
162-
.log();
163158
if (_notify.isPresent()) {
159+
LOGGER.trace()
160+
.setMessage("Broadcasting rebalance info")
161+
.addData("target", _notify)
162+
.addData("shardAllocations", notification)
163+
.log();
164164
_notify.get().tell(notification, ActorRef.noSender());
165165
}
166166
return Futures.successful(toRebalance);

0 commit comments

Comments
 (0)