Skip to content

Commit 68ff1f5

Browse files
committed
make StreamingAggregator actors shutdown when no data is received
1 parent 2c3ad7a commit 68ff1f5

File tree

6 files changed

+191
-12
lines changed

6 files changed

+191
-12
lines changed

src/main/java/com/arpnetworking/clusteraggregator/GuiceModule.java

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -358,6 +358,13 @@ private Duration provideReaggregationTimeout(final ClusterAggregatorConfiguratio
358358
return config.getReaggregationTimeout();
359359
}
360360

361+
@Provides
362+
@Named("aggregator-liveliness-timeout")
363+
@SuppressFBWarnings("UPM_UNCALLED_PRIVATE_METHOD") // Invoked reflectively by Guice
364+
private Duration provideLivelinessTimeout(final ClusterAggregatorConfiguration config) {
365+
return config.getAggregatorLivelinessTimeout();
366+
}
367+
361368
@Provides
362369
@Named("circonus-partition-set")
363370
@SuppressFBWarnings("UPM_UNCALLED_PRIVATE_METHOD") // Invoked reflectively by Guice

src/main/java/com/arpnetworking/clusteraggregator/aggregation/AggregationRouter.java

Lines changed: 12 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -49,6 +49,7 @@ public class AggregationRouter extends AbstractActor {
4949
* @param reaggregationDimensions The dimensions to reaggregate over.
5050
* @param injectClusterAsHost Whether to inject a host dimension based on cluster.
5151
* @param aggregatorTimeout The time to wait from the start of the period for all data.
52+
* @param livelinessTimeout How often to check that a child is still receiving data.
5253
* @return A new {@link Props}.
5354
*/
5455
public static Props props(
@@ -57,15 +58,17 @@ public static Props props(
5758
final String clusterHostSuffix,
5859
final ImmutableSet<String> reaggregationDimensions,
5960
final boolean injectClusterAsHost,
60-
final Duration aggregatorTimeout) {
61+
final Duration aggregatorTimeout,
62+
final Duration livelinessTimeout) {
6163
return Props.create(
6264
AggregationRouter.class,
6365
metricsListener,
6466
emitter,
6567
clusterHostSuffix,
6668
reaggregationDimensions,
6769
injectClusterAsHost,
68-
aggregatorTimeout);
70+
aggregatorTimeout,
71+
livelinessTimeout);
6972
}
7073

7174
/**
@@ -77,6 +80,7 @@ public static Props props(
7780
* @param reaggregationDimensions The dimensions to reaggregate over.
7881
* @param injectClusterAsHost Whether to inject a host dimension based on cluster.
7982
* @param aggregatorTimeout The time to wait from the start of the period for all data.
83+
* @param livelinessTimeout How often to check that a child is still receiving data.
8084
*/
8185
@Inject
8286
public AggregationRouter(
@@ -85,15 +89,18 @@ public AggregationRouter(
8589
@Named("cluster-host-suffix") final String clusterHostSuffix,
8690
@Named("reaggregation-dimensions") final ImmutableSet<String> reaggregationDimensions,
8791
@Named("reaggregation-cluster-as-host") final boolean injectClusterAsHost,
88-
@Named("reaggregation-timeout") final Duration aggregatorTimeout) {
89-
_streamingChild = context().actorOf(
92+
@Named("reaggregation-timeout") final Duration aggregatorTimeout,
93+
@Named("aggregator-liveliness-timeout") final Duration livelinessTimeout) {
94+
_streamingChild = context().actorOf(
9095
StreamingAggregator.props(
9196
periodicStatistics,
9297
emitter,
9398
clusterHostSuffix,
9499
reaggregationDimensions,
95100
injectClusterAsHost,
96-
aggregatorTimeout),
101+
aggregatorTimeout,
102+
livelinessTimeout
103+
),
97104
"streaming");
98105
context().setReceiveTimeout(FiniteDuration.apply(30, TimeUnit.MINUTES));
99106
}

src/main/java/com/arpnetworking/clusteraggregator/aggregation/StreamingAggregator.java

Lines changed: 39 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -68,6 +68,7 @@ public class StreamingAggregator extends AbstractActorWithTimers {
6868
* @param reaggregationDimensions The dimensions to reaggregate over.
6969
* @param injectClusterAsHost Whether to inject a host dimension based on cluster.
7070
* @param aggregatorTimeout The time to wait from the start of the period for all data.
71+
* @param livelinessTimeout How often to check that this actor is still receiving data.
7172
* @return A new {@link Props}.
7273
*/
7374
public static Props props(
@@ -76,15 +77,17 @@ public static Props props(
7677
final String clusterHostSuffix,
7778
final ImmutableSet<String> reaggregationDimensions,
7879
final boolean injectClusterAsHost,
79-
final Duration aggregatorTimeout) {
80+
final Duration aggregatorTimeout,
81+
final Duration livelinessTimeout) {
8082
return Props.create(
8183
StreamingAggregator.class,
8284
metricsListener,
8385
emitter,
8486
clusterHostSuffix,
8587
reaggregationDimensions,
8688
injectClusterAsHost,
87-
aggregatorTimeout);
89+
aggregatorTimeout,
90+
livelinessTimeout);
8891
}
8992

9093
/**
@@ -96,6 +99,7 @@ public static Props props(
9699
* @param reaggregationDimensions The dimensions to reaggregate over.
97100
* @param injectClusterAsHost Whether to inject a host dimension based on cluster.
98101
* @param aggregatorTimeout The time to wait from the start of the period for all data.
102+
* @param livelinessTimeout How often to check that this actor is still receiving data.
99103
*/
100104
@Inject
101105
public StreamingAggregator(
@@ -104,7 +108,8 @@ public StreamingAggregator(
104108
@Named("cluster-host-suffix") final String clusterHostSuffix,
105109
@Named("reaggregation-dimensions") final ImmutableSet<String> reaggregationDimensions,
106110
@Named("reaggregation-cluster-as-host") final boolean injectClusterAsHost,
107-
@Named("reaggregation-timeout") final Duration aggregatorTimeout) {
111+
@Named("reaggregation-timeout") final Duration aggregatorTimeout,
112+
@Named("aggregator-liveliness-timeout") final Duration livelinessTimeout) {
108113
_periodicStatistics = periodicStatistics;
109114
_clusterHostSuffix = clusterHostSuffix;
110115
_reaggregationDimensions = reaggregationDimensions;
@@ -113,6 +118,7 @@ public StreamingAggregator(
113118
context().setReceiveTimeout(FiniteDuration.apply(30, TimeUnit.MINUTES));
114119

115120
timers().startPeriodicTimer(BUCKET_CHECK_TIMER_KEY, BucketCheck.getInstance(), FiniteDuration.apply(5, TimeUnit.SECONDS));
121+
timers().startPeriodicTimer(LIVELINESS_CHECK_TIMER, LIVELINESS_CHECK_MSG, livelinessTimeout);
116122

117123
_emitter = emitter;
118124
}
@@ -121,6 +127,8 @@ public StreamingAggregator(
121127
public Receive createReceive() {
122128
return receiveBuilder()
123129
.match(Messages.StatisticSetRecord.class, record -> {
130+
// Mark this actor as live since we're still receiving data.
131+
_live = true;
124132
LOGGER.debug()
125133
.setMessage("Processing a StatisticSetRecord")
126134
.addData("workItem", record)
@@ -182,10 +190,25 @@ public Receive createReceive() {
182190
}
183191
}
184192
})
185-
.match(ShutdownAggregator.class, message -> context().stop(self()))
186-
.match(ReceiveTimeout.class, message -> {
187-
getContext().parent().tell(new ShardRegion.Passivate(ShutdownAggregator.getInstance()), getSelf());
193+
.matchEquals(LIVELINESS_CHECK_MSG, msg -> {
194+
// If we've received data since our last check, reset and wait until another round.
195+
// otherwise shutdown.
196+
if (_live) {
197+
LOGGER.debug()
198+
.setMessage("aggregator is still live, continuing.")
199+
.addContext("actor", self())
200+
.log();
201+
_live = false;
202+
return;
203+
}
204+
LOGGER.debug()
205+
.setMessage("aggregator is stale, requesting shutdown.")
206+
.addContext("actor", self())
207+
.log();
208+
requestShutdownFromParent();
188209
})
210+
.match(ShutdownAggregator.class, message -> context().stop(self()))
211+
.match(ReceiveTimeout.class, message -> requestShutdownFromParent())
189212
.build();
190213
}
191214

@@ -309,6 +332,10 @@ private void initialize(final Messages.StatisticSetRecord data, final CombinedMe
309332
}
310333
}
311334

335+
private void requestShutdownFromParent() {
336+
getContext().parent().tell(new ShardRegion.Passivate(ShutdownAggregator.getInstance()), getSelf());
337+
}
338+
312339
private ImmutableMap<String, String> dimensionsToMap(final Messages.StatisticSetRecord statisticSetRecord) {
313340
// Build a map of dimension key-value pairs dropping any that are to be reaggregated over
314341
final ImmutableMap.Builder<String, String> builder = ImmutableMap.builder();
@@ -356,6 +383,9 @@ private String createHost() {
356383
private final Set<Statistic> _statistics = Sets.newHashSet();
357384
private final Duration _aggregatorTimeout;
358385
private boolean _initialized = false;
386+
// This actor is _live if it's received data since the last LIVELINESS_CHECK_MSG.
387+
// If this is ever false during a check, the actor will shutdown.
388+
private boolean _live = false;
359389
private Duration _period;
360390
private String _cluster;
361391
private String _metric;
@@ -367,6 +397,9 @@ private String createHost() {
367397
|| entry.getKey().equals(CombinedMetricData.HOST_KEY)
368398
|| entry.getKey().equals(CombinedMetricData.SERVICE_KEY));
369399
private static final String BUCKET_CHECK_TIMER_KEY = "bucketcheck";
400+
private static final String LIVELINESS_CHECK_TIMER = "livelinesscheck";
401+
402+
private static final String LIVELINESS_CHECK_MSG = "LIVELINESS_CHECK_MSG";
370403

371404
private static final Logger LOGGER = LoggerFactory.getLogger(StreamingAggregator.class);
372405
private static final Logger WORK_TOO_OLD_LOGGER = LoggerFactory.getRateLimitLogger(

src/main/java/com/arpnetworking/clusteraggregator/configuration/ClusterAggregatorConfiguration.java

Lines changed: 37 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -25,9 +25,11 @@
2525
import com.google.common.collect.ImmutableList;
2626
import com.google.common.collect.ImmutableSet;
2727
import com.google.common.collect.Maps;
28+
import edu.umd.cs.findbugs.annotations.SuppressFBWarnings;
2829
import net.sf.oval.constraint.NotEmpty;
2930
import net.sf.oval.constraint.NotNull;
3031
import net.sf.oval.constraint.Range;
32+
import net.sf.oval.constraint.ValidateWithMethod;
3133

3234
import java.io.File;
3335
import java.time.Duration;
@@ -132,6 +134,10 @@ public Duration getReaggregationTimeout() {
132134
return _reaggregationTimeout;
133135
}
134136

137+
public Duration getAggregatorLivelinessTimeout() {
138+
return _aggregatorLivelinessTimeout;
139+
}
140+
135141
public RebalanceConfiguration getRebalanceConfiguration() {
136142
return _rebalanceConfiguration;
137143
}
@@ -180,6 +186,7 @@ public String toString() {
180186
.add("ReaggregationDimensions", _reaggregationDimensions)
181187
.add("ReaggregationInjectClusterAsHost", _reaggregationInjectClusterAsHost)
182188
.add("ReaggregationTimeout", _reaggregationTimeout)
189+
.add("AggregatorLivelinessTimeout", _aggregatorLivelinessTimeout)
183190
.add("MinConnectionTimeout", _minConnectionTimeout)
184191
.add("MaxConnectionTimeout", _maxConnectionTimeout)
185192
.add("RebalanceConfiguration", _rebalanceConfiguration)
@@ -208,6 +215,7 @@ private ClusterAggregatorConfiguration(final Builder builder) {
208215
_reaggregationDimensions = builder._reaggregationDimensions;
209216
_reaggregationInjectClusterAsHost = builder._reaggregationInjectClusterAsHost;
210217
_reaggregationTimeout = builder._reaggregationTimeout;
218+
_aggregatorLivelinessTimeout = builder._aggregatorLivelinessTimeout;
211219
_minConnectionTimeout = builder._minConnectionTimeout;
212220
_maxConnectionTimeout = builder._maxConnectionTimeout;
213221
_jvmMetricsCollectionInterval = builder._jvmMetricsCollectionInterval;
@@ -236,6 +244,7 @@ private ClusterAggregatorConfiguration(final Builder builder) {
236244
private final ImmutableSet<String> _reaggregationDimensions;
237245
private final boolean _reaggregationInjectClusterAsHost;
238246
private final Duration _reaggregationTimeout;
247+
private final Duration _aggregatorLivelinessTimeout;
239248
private final Duration _minConnectionTimeout;
240249
private final Duration _maxConnectionTimeout;
241250
private final Duration _jvmMetricsCollectionInterval;
@@ -540,6 +549,26 @@ public Builder setReaggregationTimeout(final Duration value) {
540549
return this;
541550
}
542551

552+
/**
553+
* How often an aggregator actor should check for liveliness. An actor is considered live
554+
* if any data is received between subsequent checks.
555+
*
556+
* This control is useful for culling aggregator instances for very infrequently occurring
557+
* dimension sets, especially if the application is long-lived.
558+
*
559+
* This must be greater than the reaggregation timeout, otherwise an actor could be
560+
* incorrectly marked as stale before flushing its data.
561+
*
562+
* Optional. Defaults to twice the reaggregation timeout. Cannot be null.
563+
*
564+
* @param value Timeout from period start to wait for all data to arrive.
565+
* @return This instance of {@link Builder}.
566+
*/
567+
public Builder setAggregatorLivelinessTimeout(final Duration value) {
568+
_aggregatorLivelinessTimeout = value;
569+
return this;
570+
}
571+
543572
/**
544573
* Configuration for the shard rebalance settings.
545574
*
@@ -574,6 +603,11 @@ public Builder setCalculateClusterAggregations(final Boolean value) {
574603
return this;
575604
}
576605

606+
@SuppressFBWarnings(value = "UPM_UNCALLED_PRIVATE_METHOD", justification = "invoked reflectively by @ValidateWithMethod")
607+
public boolean validateAggregatorLivelinessTimeout(final Duration aggregatorLivelinessTimeout) {
608+
return aggregatorLivelinessTimeout.compareTo(_reaggregationTimeout) < 0;
609+
}
610+
577611
@NotNull
578612
@NotEmpty
579613
private String _monitoringCluster;
@@ -621,6 +655,9 @@ public Builder setCalculateClusterAggregations(final Boolean value) {
621655
@NotNull
622656
private Duration _reaggregationTimeout = Duration.ofMinutes(1);
623657
@NotNull
658+
@ValidateWithMethod(methodName = "validateAggregatorLivelinessTimeout", parameterType=Duration.class)
659+
private Duration _aggregatorLivelinessTimeout = _reaggregationTimeout.multipliedBy(2);
660+
@NotNull
624661
private File _hostPipelineConfiguration;
625662
@NotNull
626663
private Map<String, ?> _akkaConfiguration;

src/test/java/com/arpnetworking/clusteraggregator/aggregation/AggregatorTest.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -59,7 +59,8 @@ public ActorRef createAggregator(final TestProbe probe) {
5959
"",
6060
ImmutableSet.of(),
6161
true,
62-
Duration.ofMinutes(1)),
62+
Duration.ofMinutes(1),
63+
Duration.ofMinutes(2)),
6364
probe.ref(),
6465
"agg",
6566
getSystem());
Lines changed: 94 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,94 @@
1+
/*
2+
* Copyright 2015 Groupon.com
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
package com.arpnetworking.clusteraggregator.aggregation;
17+
18+
import akka.actor.ActorRef;
19+
import akka.actor.ReceiveTimeout;
20+
import akka.actor.Terminated;
21+
import akka.cluster.sharding.ShardRegion;
22+
import akka.testkit.TestActorRef;
23+
import akka.testkit.TestProbe;
24+
import com.arpnetworking.metrics.aggregation.protocol.Messages;
25+
import com.arpnetworking.utility.BaseActorTest;
26+
import com.google.common.collect.ImmutableSet;
27+
import org.junit.Assert;
28+
import org.junit.Test;
29+
import scala.concurrent.duration.FiniteDuration;
30+
31+
import java.time.Duration;
32+
import java.util.concurrent.TimeUnit;
33+
34+
/**
35+
* Tests for the {@link StreamingAggregator} actor.
36+
*
37+
* @author Christian Briones (cbriones at dropbox dot com)
38+
*/
39+
public class StreamingAggregatorTest extends BaseActorTest {
40+
41+
@Test
42+
public void passivatesProperlyWhenLivelinessTimeoutIsExceeded() {
43+
final TestProbe probe = TestProbe.apply(getSystem());
44+
final ActorRef aggregator = createAggregator(probe);
45+
probe.watch(aggregator);
46+
47+
// We expect a passivate message after at least LIVELINESS_TIMEOUT_SECS
48+
final ShardRegion.Passivate passivate = probe.expectMsgClass(PROBE_TIMEOUT, ShardRegion.Passivate.class);
49+
aggregator.tell(passivate.stopMessage(), aggregator);
50+
final Terminated terminated = probe.expectMsgClass(Terminated.class);
51+
Assert.assertEquals(aggregator, terminated.getActor());
52+
}
53+
54+
@Test
55+
public void dataKeepsActorLive() {
56+
final TestProbe probe = TestProbe.apply(getSystem());
57+
final ActorRef aggregator = createAggregator(probe);
58+
probe.watch(aggregator);
59+
60+
final Messages.StatisticSetRecord record = Messages.StatisticSetRecord.newBuilder()
61+
.setMetric("my_metric")
62+
.setPeriod("PT1M")
63+
.setPeriodStart("2020-12-10T19:00:00Z")
64+
.build();
65+
66+
aggregator.tell(record, probe.ref());
67+
probe.expectNoMessage(FiniteDuration.apply(LIVELINESS_TIMEOUT_SEC, TimeUnit.SECONDS));
68+
69+
aggregator.tell(ReceiveTimeout.getInstance(), aggregator);
70+
final ShardRegion.Passivate passivate = probe.expectMsgClass(PROBE_TIMEOUT, ShardRegion.Passivate.class);
71+
aggregator.tell(passivate.stopMessage(), aggregator);
72+
final Terminated terminated = probe.expectMsgClass(Terminated.class);
73+
Assert.assertEquals(aggregator, terminated.getActor());
74+
}
75+
76+
public ActorRef createAggregator(final TestProbe probe) {
77+
final TestProbe ignored = TestProbe.apply(getSystem());
78+
return TestActorRef.apply(
79+
StreamingAggregator.props(
80+
ignored.ref(),
81+
ignored.ref(),
82+
"",
83+
ImmutableSet.of(),
84+
true,
85+
Duration.ofSeconds(5),
86+
Duration.ofSeconds(LIVELINESS_TIMEOUT_SEC)),
87+
probe.ref(),
88+
"agg",
89+
getSystem());
90+
}
91+
92+
private static final int LIVELINESS_TIMEOUT_SEC = 5;
93+
private static final FiniteDuration PROBE_TIMEOUT = FiniteDuration.apply(10, TimeUnit.SECONDS);
94+
}

0 commit comments

Comments
 (0)