@@ -68,6 +68,7 @@ public class StreamingAggregator extends AbstractActorWithTimers {
68
68
* @param reaggregationDimensions The dimensions to reaggregate over.
69
69
* @param injectClusterAsHost Whether to inject a host dimension based on cluster.
70
70
* @param aggregatorTimeout The time to wait from the start of the period for all data.
71
+ * @param livelinessTimeout How often to check that this actor is still receiving data.
71
72
* @return A new {@link Props}.
72
73
*/
73
74
public static Props props (
@@ -76,15 +77,17 @@ public static Props props(
76
77
final String clusterHostSuffix ,
77
78
final ImmutableSet <String > reaggregationDimensions ,
78
79
final boolean injectClusterAsHost ,
79
- final Duration aggregatorTimeout ) {
80
+ final Duration aggregatorTimeout ,
81
+ final Duration livelinessTimeout ) {
80
82
return Props .create (
81
83
StreamingAggregator .class ,
82
84
metricsListener ,
83
85
emitter ,
84
86
clusterHostSuffix ,
85
87
reaggregationDimensions ,
86
88
injectClusterAsHost ,
87
- aggregatorTimeout );
89
+ aggregatorTimeout ,
90
+ livelinessTimeout );
88
91
}
89
92
90
93
/**
@@ -96,6 +99,7 @@ public static Props props(
96
99
* @param reaggregationDimensions The dimensions to reaggregate over.
97
100
* @param injectClusterAsHost Whether to inject a host dimension based on cluster.
98
101
* @param aggregatorTimeout The time to wait from the start of the period for all data.
102
+ * @param livelinessTimeout How often to check that this actor is still receiving data.
99
103
*/
100
104
@ Inject
101
105
public StreamingAggregator (
@@ -104,7 +108,8 @@ public StreamingAggregator(
104
108
@ Named ("cluster-host-suffix" ) final String clusterHostSuffix ,
105
109
@ Named ("reaggregation-dimensions" ) final ImmutableSet <String > reaggregationDimensions ,
106
110
@ Named ("reaggregation-cluster-as-host" ) final boolean injectClusterAsHost ,
107
- @ Named ("reaggregation-timeout" ) final Duration aggregatorTimeout ) {
111
+ @ Named ("reaggregation-timeout" ) final Duration aggregatorTimeout ,
112
+ @ Named ("aggregator-liveliness-timeout" ) final Duration livelinessTimeout ) {
108
113
_periodicStatistics = periodicStatistics ;
109
114
_clusterHostSuffix = clusterHostSuffix ;
110
115
_reaggregationDimensions = reaggregationDimensions ;
@@ -113,6 +118,7 @@ public StreamingAggregator(
113
118
context ().setReceiveTimeout (FiniteDuration .apply (30 , TimeUnit .MINUTES ));
114
119
115
120
timers ().startPeriodicTimer (BUCKET_CHECK_TIMER_KEY , BucketCheck .getInstance (), FiniteDuration .apply (5 , TimeUnit .SECONDS ));
121
+ timers ().startPeriodicTimer (LIVELINESS_CHECK_TIMER , LIVELINESS_CHECK_MSG , livelinessTimeout );
116
122
117
123
_emitter = emitter ;
118
124
}
@@ -121,6 +127,8 @@ public StreamingAggregator(
121
127
public Receive createReceive () {
122
128
return receiveBuilder ()
123
129
.match (Messages .StatisticSetRecord .class , record -> {
130
+ // Mark this actor as live since we're still receiving data.
131
+ _live = true ;
124
132
LOGGER .debug ()
125
133
.setMessage ("Processing a StatisticSetRecord" )
126
134
.addData ("workItem" , record )
@@ -182,10 +190,25 @@ public Receive createReceive() {
182
190
}
183
191
}
184
192
})
185
- .match (ShutdownAggregator .class , message -> context ().stop (self ()))
186
- .match (ReceiveTimeout .class , message -> {
187
- getContext ().parent ().tell (new ShardRegion .Passivate (ShutdownAggregator .getInstance ()), getSelf ());
193
+ .matchEquals (LIVELINESS_CHECK_MSG , msg -> {
194
+ // If we've received data since our last check, reset and wait until another round.
195
+ // otherwise shutdown.
196
+ if (_live ) {
197
+ LOGGER .debug ()
198
+ .setMessage ("aggregator is still live, continuing." )
199
+ .addContext ("actor" , self ())
200
+ .log ();
201
+ _live = false ;
202
+ return ;
203
+ }
204
+ LOGGER .debug ()
205
+ .setMessage ("aggregator is stale, requesting shutdown." )
206
+ .addContext ("actor" , self ())
207
+ .log ();
208
+ requestShutdownFromParent ();
188
209
})
210
+ .match (ShutdownAggregator .class , message -> context ().stop (self ()))
211
+ .match (ReceiveTimeout .class , message -> requestShutdownFromParent ())
189
212
.build ();
190
213
}
191
214
@@ -309,6 +332,10 @@ private void initialize(final Messages.StatisticSetRecord data, final CombinedMe
309
332
}
310
333
}
311
334
335
+ private void requestShutdownFromParent () {
336
+ getContext ().parent ().tell (new ShardRegion .Passivate (ShutdownAggregator .getInstance ()), getSelf ());
337
+ }
338
+
312
339
private ImmutableMap <String , String > dimensionsToMap (final Messages .StatisticSetRecord statisticSetRecord ) {
313
340
// Build a map of dimension key-value pairs dropping any that are to be reaggregated over
314
341
final ImmutableMap .Builder <String , String > builder = ImmutableMap .builder ();
@@ -356,6 +383,9 @@ private String createHost() {
356
383
private final Set <Statistic > _statistics = Sets .newHashSet ();
357
384
private final Duration _aggregatorTimeout ;
358
385
private boolean _initialized = false ;
386
+ // This actor is _live if it's received data since the last LIVELINESS_CHECK_MSG.
387
+ // If this is ever false during a check, the actor will shutdown.
388
+ private boolean _live = false ;
359
389
private Duration _period ;
360
390
private String _cluster ;
361
391
private String _metric ;
@@ -367,6 +397,9 @@ private String createHost() {
367
397
|| entry .getKey ().equals (CombinedMetricData .HOST_KEY )
368
398
|| entry .getKey ().equals (CombinedMetricData .SERVICE_KEY ));
369
399
private static final String BUCKET_CHECK_TIMER_KEY = "bucketcheck" ;
400
+ private static final String LIVELINESS_CHECK_TIMER = "livelinesscheck" ;
401
+
402
+ private static final String LIVELINESS_CHECK_MSG = "LIVELINESS_CHECK_MSG" ;
370
403
371
404
private static final Logger LOGGER = LoggerFactory .getLogger (StreamingAggregator .class );
372
405
private static final Logger WORK_TOO_OLD_LOGGER = LoggerFactory .getRateLimitLogger (
0 commit comments