Skip to content

Commit 86d9d9f

Browse files
authored
update akka 2.5.32, use built-in shard passivation instead of rolling our own (#139)
This reverts most of #136, with the exception of the configuration plumbing. akka 2.5 already has a built-in mechanism for automated actor passivation. It will also handle buffering messages to the actor in case it is resurrected (e.g. if the timeout fires but it is given another message). This required updating akka to the latest patch release (2.5.16 -> 2.5.32). As a consequence a few deprecated calls were updated, notably PatternsCS -> Patterns.
1 parent 8fbb924 commit 86d9d9f

File tree

13 files changed

+37
-167
lines changed

13 files changed

+37
-167
lines changed

pom.xml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -101,7 +101,7 @@
101101

102102
<properties>
103103
<!--Dependency versions-->
104-
<akka.version>2.5.16</akka.version>
104+
<akka.version>2.5.32</akka.version>
105105
<akka.http.version>10.1.5</akka.http.version>
106106
<akka.persistence.cassandra.version>0.89</akka.persistence.cassandra.version>
107107
<akka.kryo.version>0.5.2</akka.kryo.version>

src/main/java/com/arpnetworking/clusteraggregator/GuiceModule.java

Lines changed: 10 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -262,8 +262,7 @@ private java.util.concurrent.CompletionStage<akka.http.javadsl.ServerBinding> pr
262262
final akka.stream.javadsl.Source<IncomingConnection, CompletionStage<ServerBinding>> binding = http.bind(
263263
ConnectHttp.toHost(
264264
_configuration.getHttpHost(),
265-
_configuration.getHttpPort()),
266-
materializer);
265+
_configuration.getHttpPort()));
267266
return binding.to(
268267
akka.stream.javadsl.Sink.foreach(
269268
connection -> connection.handleWithAsyncHandler(routes, materializer)))
@@ -285,13 +284,20 @@ private ActorRef providePeriodicStatsActor(final ActorSystem system, final Metri
285284
private ActorRef provideAggregatorShardRegion(
286285
final ActorSystem system,
287286
final Injector injector,
288-
final AggMessageExtractor extractor) {
287+
final AggMessageExtractor extractor,
288+
@Named("aggregator-liveliness-timeout") final Duration livelinessTimeout) {
289289
final ClusterSharding clusterSharding = ClusterSharding.get(system);
290290
final RebalanceConfiguration rebalanceConfiguration = _configuration.getRebalanceConfiguration();
291+
292+
final ClusterShardingSettings settings =
293+
ClusterShardingSettings
294+
.create(system)
295+
.withPassivateIdleAfter(livelinessTimeout);
296+
291297
return clusterSharding.start(
292298
"Aggregator",
293299
GuiceActorCreator.props(injector, AggregationRouter.class),
294-
ClusterShardingSettings.create(system),
300+
settings,
295301
extractor,
296302
new ParallelLeastShardAllocationStrategy(
297303
rebalanceConfiguration.getMaxParallel(),
@@ -365,7 +371,6 @@ private Duration provideLivelinessTimeout(final ClusterAggregatorConfiguration c
365371
return config.getAggregatorLivelinessTimeout();
366372
}
367373

368-
@Provides
369374
@Named("circonus-partition-set")
370375
@SuppressFBWarnings("UPM_UNCALLED_PRIVATE_METHOD") // Invoked reflectively by Guice
371376
private PartitionSet provideDatabasePartitionSet(final Injector injector) {

src/main/java/com/arpnetworking/clusteraggregator/Status.java

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -21,7 +21,7 @@
2121
import akka.actor.Props;
2222
import akka.cluster.Cluster;
2323
import akka.cluster.MemberStatus;
24-
import akka.pattern.PatternsCS;
24+
import akka.pattern.Patterns;
2525
import akka.remote.AssociationErrorEvent;
2626
import com.arpnetworking.clusteraggregator.models.MetricsRequest;
2727
import com.arpnetworking.clusteraggregator.models.PeriodMetrics;
@@ -106,7 +106,7 @@ public Receive createReceive() {
106106
private void processStatusRequest() {
107107
// Call the bookkeeper
108108
final CompletableFuture<ClusterStatusCache.StatusResponse> clusterStateFuture =
109-
PatternsCS.ask(
109+
Patterns.ask(
110110
_clusterStatusCache,
111111
new ClusterStatusCache.GetRequest(),
112112
Duration.ofSeconds(3))
@@ -115,15 +115,15 @@ private void processStatusRequest() {
115115
.toCompletableFuture();
116116

117117
final CompletableFuture<Map<Duration, PeriodMetrics>> localMetricsFuture =
118-
PatternsCS.ask(
118+
Patterns.ask(
119119
_localMetrics,
120120
new MetricsRequest(),
121121
Duration.ofSeconds(3))
122122
.<Map<Duration, PeriodMetrics>>thenApply(new CastMapper<>())
123123
.exceptionally(new AsNullRecovery<>())
124124
.toCompletableFuture();
125125

126-
PatternsCS.pipe(
126+
Patterns.pipe(
127127
CompletableFuture.allOf(
128128
clusterStateFuture.toCompletableFuture(),
129129
localMetricsFuture.toCompletableFuture())

src/main/java/com/arpnetworking/clusteraggregator/aggregation/AggregationRouter.java

Lines changed: 5 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -49,7 +49,6 @@ public class AggregationRouter extends AbstractActor {
4949
* @param reaggregationDimensions The dimensions to reaggregate over.
5050
* @param injectClusterAsHost Whether to inject a host dimension based on cluster.
5151
* @param aggregatorTimeout The time to wait from the start of the period for all data.
52-
* @param livelinessTimeout How often to check that a child is still receiving data.
5352
* @return A new {@link Props}.
5453
*/
5554
public static Props props(
@@ -58,17 +57,15 @@ public static Props props(
5857
final String clusterHostSuffix,
5958
final ImmutableSet<String> reaggregationDimensions,
6059
final boolean injectClusterAsHost,
61-
final Duration aggregatorTimeout,
62-
final Duration livelinessTimeout) {
60+
final Duration aggregatorTimeout) {
6361
return Props.create(
6462
AggregationRouter.class,
6563
metricsListener,
6664
emitter,
6765
clusterHostSuffix,
6866
reaggregationDimensions,
6967
injectClusterAsHost,
70-
aggregatorTimeout,
71-
livelinessTimeout);
68+
aggregatorTimeout);
7269
}
7370

7471
/**
@@ -80,7 +77,6 @@ public static Props props(
8077
* @param reaggregationDimensions The dimensions to reaggregate over.
8178
* @param injectClusterAsHost Whether to inject a host dimension based on cluster.
8279
* @param aggregatorTimeout The time to wait from the start of the period for all data.
83-
* @param livelinessTimeout How often to check that a child is still receiving data.
8480
*/
8581
@Inject
8682
public AggregationRouter(
@@ -89,18 +85,15 @@ public AggregationRouter(
8985
@Named("cluster-host-suffix") final String clusterHostSuffix,
9086
@Named("reaggregation-dimensions") final ImmutableSet<String> reaggregationDimensions,
9187
@Named("reaggregation-cluster-as-host") final boolean injectClusterAsHost,
92-
@Named("reaggregation-timeout") final Duration aggregatorTimeout,
93-
@Named("aggregator-liveliness-timeout") final Duration livelinessTimeout) {
94-
_streamingChild = context().actorOf(
88+
@Named("reaggregation-timeout") final Duration aggregatorTimeout) {
89+
_streamingChild = context().actorOf(
9590
StreamingAggregator.props(
9691
periodicStatistics,
9792
emitter,
9893
clusterHostSuffix,
9994
reaggregationDimensions,
10095
injectClusterAsHost,
101-
aggregatorTimeout,
102-
livelinessTimeout
103-
),
96+
aggregatorTimeout),
10497
"streaming");
10598
context().setReceiveTimeout(FiniteDuration.apply(30, TimeUnit.MINUTES));
10699
}

src/main/java/com/arpnetworking/clusteraggregator/aggregation/StreamingAggregator.java

Lines changed: 6 additions & 39 deletions
Original file line numberDiff line numberDiff line change
@@ -68,7 +68,6 @@ public class StreamingAggregator extends AbstractActorWithTimers {
6868
* @param reaggregationDimensions The dimensions to reaggregate over.
6969
* @param injectClusterAsHost Whether to inject a host dimension based on cluster.
7070
* @param aggregatorTimeout The time to wait from the start of the period for all data.
71-
* @param livelinessTimeout How often to check that this actor is still receiving data.
7271
* @return A new {@link Props}.
7372
*/
7473
public static Props props(
@@ -77,17 +76,15 @@ public static Props props(
7776
final String clusterHostSuffix,
7877
final ImmutableSet<String> reaggregationDimensions,
7978
final boolean injectClusterAsHost,
80-
final Duration aggregatorTimeout,
81-
final Duration livelinessTimeout) {
79+
final Duration aggregatorTimeout) {
8280
return Props.create(
8381
StreamingAggregator.class,
8482
metricsListener,
8583
emitter,
8684
clusterHostSuffix,
8785
reaggregationDimensions,
8886
injectClusterAsHost,
89-
aggregatorTimeout,
90-
livelinessTimeout);
87+
aggregatorTimeout);
9188
}
9289

9390
/**
@@ -99,7 +96,6 @@ public static Props props(
9996
* @param reaggregationDimensions The dimensions to reaggregate over.
10097
* @param injectClusterAsHost Whether to inject a host dimension based on cluster.
10198
* @param aggregatorTimeout The time to wait from the start of the period for all data.
102-
* @param livelinessTimeout How often to check that this actor is still receiving data.
10399
*/
104100
@Inject
105101
public StreamingAggregator(
@@ -108,8 +104,7 @@ public StreamingAggregator(
108104
@Named("cluster-host-suffix") final String clusterHostSuffix,
109105
@Named("reaggregation-dimensions") final ImmutableSet<String> reaggregationDimensions,
110106
@Named("reaggregation-cluster-as-host") final boolean injectClusterAsHost,
111-
@Named("reaggregation-timeout") final Duration aggregatorTimeout,
112-
@Named("aggregator-liveliness-timeout") final Duration livelinessTimeout) {
107+
@Named("reaggregation-timeout") final Duration aggregatorTimeout) {
113108
_periodicStatistics = periodicStatistics;
114109
_clusterHostSuffix = clusterHostSuffix;
115110
_reaggregationDimensions = reaggregationDimensions;
@@ -118,7 +113,6 @@ public StreamingAggregator(
118113
context().setReceiveTimeout(FiniteDuration.apply(30, TimeUnit.MINUTES));
119114

120115
timers().startPeriodicTimer(BUCKET_CHECK_TIMER_KEY, BucketCheck.getInstance(), FiniteDuration.apply(5, TimeUnit.SECONDS));
121-
timers().startPeriodicTimer(LIVELINESS_CHECK_TIMER, LIVELINESS_CHECK_MSG, livelinessTimeout);
122116

123117
_emitter = emitter;
124118
}
@@ -127,8 +121,6 @@ public StreamingAggregator(
127121
public Receive createReceive() {
128122
return receiveBuilder()
129123
.match(Messages.StatisticSetRecord.class, record -> {
130-
// Mark this actor as live since we're still receiving data.
131-
_live = true;
132124
LOGGER.debug()
133125
.setMessage("Processing a StatisticSetRecord")
134126
.addData("workItem", record)
@@ -190,25 +182,10 @@ public Receive createReceive() {
190182
}
191183
}
192184
})
193-
.matchEquals(LIVELINESS_CHECK_MSG, msg -> {
194-
// If we've received data since our last check, reset and wait until another round.
195-
// otherwise shutdown.
196-
if (_live) {
197-
LOGGER.debug()
198-
.setMessage("aggregator is still live, continuing.")
199-
.addContext("actor", self())
200-
.log();
201-
_live = false;
202-
return;
203-
}
204-
LOGGER.debug()
205-
.setMessage("aggregator is stale, requesting shutdown.")
206-
.addContext("actor", self())
207-
.log();
208-
requestShutdownFromParent();
209-
})
210185
.match(ShutdownAggregator.class, message -> context().stop(self()))
211-
.match(ReceiveTimeout.class, message -> requestShutdownFromParent())
186+
.match(ReceiveTimeout.class, message -> {
187+
getContext().parent().tell(new ShardRegion.Passivate(ShutdownAggregator.getInstance()), getSelf());
188+
})
212189
.build();
213190
}
214191

@@ -332,10 +309,6 @@ private void initialize(final Messages.StatisticSetRecord data, final CombinedMe
332309
}
333310
}
334311

335-
private void requestShutdownFromParent() {
336-
getContext().parent().tell(new ShardRegion.Passivate(ShutdownAggregator.getInstance()), getSelf());
337-
}
338-
339312
private ImmutableMap<String, String> dimensionsToMap(final Messages.StatisticSetRecord statisticSetRecord) {
340313
// Build a map of dimension key-value pairs dropping any that are to be reaggregated over
341314
final ImmutableMap.Builder<String, String> builder = ImmutableMap.builder();
@@ -383,9 +356,6 @@ private String createHost() {
383356
private final Set<Statistic> _statistics = Sets.newHashSet();
384357
private final Duration _aggregatorTimeout;
385358
private boolean _initialized = false;
386-
// This actor is _live if it's received data since the last LIVELINESS_CHECK_MSG.
387-
// If this is ever false during a check, the actor will shutdown.
388-
private boolean _live = false;
389359
private Duration _period;
390360
private String _cluster;
391361
private String _metric;
@@ -397,9 +367,6 @@ private String createHost() {
397367
|| entry.getKey().equals(CombinedMetricData.HOST_KEY)
398368
|| entry.getKey().equals(CombinedMetricData.SERVICE_KEY));
399369
private static final String BUCKET_CHECK_TIMER_KEY = "bucketcheck";
400-
private static final String LIVELINESS_CHECK_TIMER = "livelinesscheck";
401-
402-
private static final String LIVELINESS_CHECK_MSG = "LIVELINESS_CHECK_MSG";
403370

404371
private static final Logger LOGGER = LoggerFactory.getLogger(StreamingAggregator.class);
405372
private static final Logger WORK_TOO_OLD_LOGGER = LoggerFactory.getRateLimitLogger(

src/main/java/com/arpnetworking/clusteraggregator/client/HttpSourceActor.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -86,7 +86,7 @@ public class HttpSourceActor extends AbstractActor {
8686
final ActorRef shardRegion,
8787
final ActorRef emitter,
8888
final ClusterAggregatorConfiguration configuration) {
89-
return Props.create(() -> new HttpSourceActor(shardRegion, emitter, configuration));
89+
return Props.create(HttpSourceActor.class, () -> new HttpSourceActor(shardRegion, emitter, configuration));
9090
}
9191

9292
/**

src/main/java/com/arpnetworking/clusteraggregator/configuration/ClusterAggregatorConfiguration.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -662,7 +662,7 @@ public boolean validateAggregatorLivelinessTimeout(final Duration aggregatorLive
662662
private Duration _reaggregationTimeout = Duration.ofMinutes(1);
663663
@NotNull
664664
@ValidateWithMethod(methodName = "validateAggregatorLivelinessTimeout", parameterType = Duration.class)
665-
private Duration _aggregatorLivelinessTimeout = _reaggregationTimeout.multipliedBy(2);
665+
private Duration _aggregatorLivelinessTimeout = Duration.ofMinutes(10);
666666
@NotNull
667667
private File _hostPipelineConfiguration;
668668
@NotNull

src/main/java/com/arpnetworking/clusteraggregator/http/Routes.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -27,7 +27,7 @@
2727
import akka.http.javadsl.model.headers.CacheControl;
2828
import akka.http.javadsl.model.headers.CacheDirectives;
2929
import akka.japi.function.Function;
30-
import akka.pattern.PatternsCS;
30+
import akka.pattern.Patterns;
3131
import akka.util.ByteString;
3232
import com.arpnetworking.clusteraggregator.Status;
3333
import com.arpnetworking.clusteraggregator.models.StatusResponse;
@@ -224,7 +224,7 @@ private CompletionStage<HttpResponse> process(final HttpRequest request) {
224224
@SuppressWarnings("unchecked")
225225
private <T> CompletionStage<T> ask(final String actorPath, final Object request, final T defaultValue) {
226226
return
227-
PatternsCS.ask(
227+
Patterns.ask(
228228
_actorSystem.actorSelection(actorPath),
229229
request,
230230
Duration.ofSeconds(5))

src/main/java/com/arpnetworking/tsdcore/sinks/HttpSinkActor.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -18,7 +18,7 @@
1818
import akka.actor.AbstractActor;
1919
import akka.actor.Props;
2020
import akka.http.javadsl.model.StatusCodes;
21-
import akka.pattern.PatternsCS;
21+
import akka.pattern.Patterns;
2222
import com.arpnetworking.logback.annotations.LogValue;
2323
import com.arpnetworking.metrics.Metrics;
2424
import com.arpnetworking.metrics.MetricsFactory;
@@ -372,7 +372,7 @@ private void fireRequest(final RequestEntry request, final int attempt) {
372372
return new PostFailure(attempt, request, err);
373373
}
374374
});
375-
PatternsCS.pipe(responsePromise, context().dispatcher()).to(self());
375+
Patterns.pipe(responsePromise, context().dispatcher()).to(self());
376376
}
377377

378378
private void scheduleRetry(final RequestEntry requestEntry, final int attempt) {

src/main/java/com/arpnetworking/tsdcore/sinks/circonus/BrokerRefresher.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -17,7 +17,7 @@
1717

1818
import akka.actor.AbstractActor;
1919
import akka.actor.Props;
20-
import akka.pattern.PatternsCS;
20+
import akka.pattern.Patterns;
2121
import com.arpnetworking.akka.UniformRandomTimeScheduler;
2222
import com.arpnetworking.steno.Logger;
2323
import com.arpnetworking.steno.LoggerFactory;
@@ -111,7 +111,7 @@ private void lookupBrokers() {
111111
final CompletionStage<Object> promise = _client.getBrokers()
112112
.<Object>thenApply(BrokerLookupComplete::new)
113113
.exceptionally(BrokerLookupFailure::new);
114-
PatternsCS.pipe(promise, context().dispatcher()).to(self());
114+
Patterns.pipe(promise, context().dispatcher()).to(self());
115115
}
116116

117117
private final CirconusClient _client;

0 commit comments

Comments
 (0)