Skip to content

Commit 745dbfc

Browse files
authored
Add start/stop/restart metrics to streaming aggregator actor. (#141)
* Add start/stop/restart metrics to streaming aggregator actor. * Removed double semi-colon. * Increase test timeout.
1 parent 1981ab3 commit 745dbfc

File tree

9 files changed

+189
-11
lines changed

9 files changed

+189
-11
lines changed

pom.xml

Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -133,6 +133,7 @@
133133
<logback.steno.version>1.18.0</logback.steno.version>
134134
<log4j.over.slf4j.version>1.7.22</log4j.over.slf4j.version>
135135
<metrics.client.version>0.11.3</metrics.client.version>
136+
<metrics.client.incubator.version>0.11.2</metrics.client.incubator.version>
136137
<metrics.jvm.extra.version>0.11.2</metrics.jvm.extra.version>
137138
<metrics.http.extra.version>0.11.2</metrics.http.extra.version>
138139
<metrics.aggregator.protocol.version>1.0.10</metrics.aggregator.protocol.version>
@@ -583,6 +584,18 @@
583584
</exclusion>
584585
</exclusions>
585586
</dependency>
587+
<dependency>
588+
<groupId>com.arpnetworking.metrics.extras</groupId>
589+
<artifactId>incubator-extra</artifactId>
590+
<version>${metrics.client.incubator.version}</version>
591+
<exclusions>
592+
<!-- TODO(ville): Remove suppression once findbugs to spotbugs migration is complete -->
593+
<exclusion>
594+
<groupId>com.google.code.findbugs</groupId>
595+
<artifactId>findbugs-annotations</artifactId>
596+
</exclusion>
597+
</exclusions>
598+
</dependency>
586599
<dependency>
587600
<groupId>com.arpnetworking.metrics.extras</groupId>
588601
<artifactId>jvm-extra</artifactId>
Lines changed: 63 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,63 @@
1+
/*
2+
* Copyright 2017 Inscope Metrics, Inc
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
package com.arpnetworking.clusteraggregator;
17+
18+
import com.arpnetworking.steno.Logger;
19+
import com.arpnetworking.steno.LoggerFactory;
20+
21+
import java.util.concurrent.CompletableFuture;
22+
import java.util.concurrent.CompletionStage;
23+
import java.util.concurrent.ConcurrentLinkedQueue;
24+
import java.util.concurrent.ExecutionException;
25+
import java.util.function.Supplier;
26+
27+
/**
28+
* Assists Guice-instantiated classes in shutting down.
29+
*
30+
* @author Brandon Arp (brandon dot arp at inscopemetrics dot io)
31+
*/
32+
public class AppShutdown implements LifecycleRegistration {
33+
/**
34+
* {@inheritDoc}
35+
*/
36+
@Override
37+
public void registerShutdown(final Supplier<CompletionStage<Void>> callback) {
38+
_shutdownCallbacks.offer(callback);
39+
}
40+
41+
/**
42+
* Executes all of the registered shutdown hooks and waits for them to complete.
43+
*/
44+
public void shutdown() {
45+
final CompletableFuture<?>[] completionStages = _shutdownCallbacks.stream()
46+
.map(Supplier::get)
47+
.map(CompletionStage::toCompletableFuture)
48+
.toArray(CompletableFuture[]::new);
49+
50+
try {
51+
CompletableFuture.allOf(completionStages).get();
52+
} catch (final InterruptedException | ExecutionException e) {
53+
LOGGER.error()
54+
.setMessage("Exception in application shutdown")
55+
.setThrowable(e)
56+
.log();
57+
}
58+
}
59+
60+
private final ConcurrentLinkedQueue<Supplier<CompletionStage<Void>>> _shutdownCallbacks = new ConcurrentLinkedQueue<>();
61+
62+
private static final Logger LOGGER = LoggerFactory.getLogger(AppShutdown.class);
63+
}

src/main/java/com/arpnetworking/clusteraggregator/GuiceModule.java

Lines changed: 29 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -54,6 +54,8 @@
5454
import com.arpnetworking.metrics.Sink;
5555
import com.arpnetworking.metrics.impl.ApacheHttpSink;
5656
import com.arpnetworking.metrics.impl.TsdMetricsFactory;
57+
import com.arpnetworking.metrics.incubator.PeriodicMetrics;
58+
import com.arpnetworking.metrics.incubator.impl.TsdPeriodicMetrics;
5759
import com.arpnetworking.utility.ActorConfigurator;
5860
import com.arpnetworking.utility.ConfiguredLaunchableFactory;
5961
import com.arpnetworking.utility.Database;
@@ -86,7 +88,11 @@
8688
import java.util.Locale;
8789
import java.util.Map;
8890
import java.util.Optional;
91+
import java.util.concurrent.CompletableFuture;
8992
import java.util.concurrent.CompletionStage;
93+
import java.util.concurrent.Executors;
94+
import java.util.concurrent.ScheduledExecutorService;
95+
import java.util.concurrent.TimeUnit;
9096
import javax.annotation.Nullable;
9197

9298
/**
@@ -100,14 +106,17 @@ public class GuiceModule extends AbstractModule {
100106
* Public constructor.
101107
*
102108
* @param configuration The configuration.
109+
* @param shutdown The shutdown hook.
103110
*/
104-
public GuiceModule(final ClusterAggregatorConfiguration configuration) {
111+
public GuiceModule(final ClusterAggregatorConfiguration configuration, final LifecycleRegistration shutdown) {
105112
_configuration = configuration;
113+
_shutdown = shutdown;
106114
}
107115

108116
@Override
109117
protected void configure() {
110118
bind(ClusterAggregatorConfiguration.class).toInstance(_configuration);
119+
bind(LifecycleRegistration.class).toInstance(_shutdown);
111120

112121
for (final Map.Entry<String, DatabaseConfiguration> entry : _configuration.getDatabaseConfigurations().entrySet()) {
113122
bind(Database.class)
@@ -314,6 +323,24 @@ private ActorRef provideJvmMetricsCollector(final ActorSystem system, final Metr
314323
return system.actorOf(JvmMetricsCollector.props(_configuration.getJvmMetricsCollectionInterval(), metricsFactory));
315324
}
316325

326+
@Provides
327+
@Singleton
328+
@SuppressFBWarnings("UPM_UNCALLED_PRIVATE_METHOD") // Invoked reflectively by Guice
329+
private PeriodicMetrics providePeriodicMetrics(final MetricsFactory metricsFactory, final LifecycleRegistration lifecycle) {
330+
final TsdPeriodicMetrics periodicMetrics = new TsdPeriodicMetrics.Builder()
331+
.setMetricsFactory(metricsFactory)
332+
.build();
333+
final ScheduledExecutorService executor = Executors.newSingleThreadScheduledExecutor(
334+
r -> new Thread(r, "PeriodicMetricsCloser"));
335+
final long offsetMillis = 1250 - (System.currentTimeMillis() % 1000);
336+
executor.scheduleAtFixedRate(periodicMetrics, offsetMillis, 1000, TimeUnit.MILLISECONDS);
337+
lifecycle.registerShutdown(() -> {
338+
executor.shutdown();
339+
return CompletableFuture.completedFuture(null);
340+
});
341+
return periodicMetrics;
342+
}
343+
317344
@Provides
318345
@Singleton
319346
@SuppressFBWarnings("UPM_UNCALLED_PRIVATE_METHOD") // Invoked reflectively by Guice
@@ -414,6 +441,7 @@ static List<Sink> createSinks(final ImmutableList<JsonNode> monitoringSinks) {
414441
}
415442

416443
private final ClusterAggregatorConfiguration _configuration;
444+
private final LifecycleRegistration _shutdown;
417445

418446
private static final String HOCON_FILE_EXTENSION = ".conf";
419447
private static final ObjectMapper OBJECT_MAPPER = ObjectMapperFactory.getInstance();
Lines changed: 34 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,34 @@
1+
/*
2+
* Copyright 2017 Inscope Metrics, Inc
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
package com.arpnetworking.clusteraggregator;
17+
18+
import java.util.concurrent.CompletionStage;
19+
import java.util.function.Supplier;
20+
21+
/**
22+
* Provides a way to register for lifecycle events, ensuring that Guice-instantiated classes can be notified.
23+
*
24+
* @author Brandon Arp (brandon dot arp at inscopemetrics dot io)
25+
*/
26+
public interface LifecycleRegistration {
27+
/**
28+
* Registers a method to be called when the application is shutting down. The shutdown
29+
* will wait for all CompletionStages to complete before shutting down.
30+
*
31+
* @param callback The callback to register
32+
*/
33+
void registerShutdown(Supplier<CompletionStage<Void>> callback);
34+
}

src/main/java/com/arpnetworking/clusteraggregator/Main.java

Lines changed: 11 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -149,10 +149,12 @@ public synchronized void launch() {
149149
public synchronized void shutdown() {
150150
shutdownAkka();
151151
shutdownDatabases();
152+
shutdownGuice();
152153
}
153154

154155
private Injector launchGuice() {
155-
return Guice.createInjector(new GuiceModule(_configuration));
156+
_guiceAppShutdown = new AppShutdown();
157+
return Guice.createInjector(new GuiceModule(_configuration, _guiceAppShutdown));
156158
}
157159

158160
private void launchDatabases(final Injector injector) {
@@ -211,6 +213,13 @@ private void launchAkka(final Injector injector) {
211213
_system = injector.getInstance(ActorSystem.class);
212214
}
213215

216+
private void shutdownGuice() {
217+
LOGGER.info().setMessage("Stopping guice").log();
218+
if (_guiceAppShutdown != null) {
219+
_guiceAppShutdown.shutdown();
220+
}
221+
}
222+
214223
private void shutdownDatabases() {
215224
for (final Database database : _databases) {
216225
LOGGER.info()
@@ -259,6 +268,7 @@ private static Builder<? extends JsonNodeSource> getFileSourceBuilder(
259268

260269
private volatile ActorSystem _system;
261270
private volatile ActorRef _shutdownActor;
271+
private volatile AppShutdown _guiceAppShutdown;
262272
private volatile List<Database> _databases;
263273

264274
private static final Logger LOGGER = com.arpnetworking.steno.LoggerFactory.getLogger(Main.class);

src/main/java/com/arpnetworking/clusteraggregator/aggregation/AggregationRouter.java

Lines changed: 11 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,7 @@
2121
import akka.actor.ReceiveTimeout;
2222
import akka.cluster.sharding.ShardRegion;
2323
import com.arpnetworking.metrics.aggregation.protocol.Messages;
24+
import com.arpnetworking.metrics.incubator.PeriodicMetrics;
2425
import com.arpnetworking.steno.Logger;
2526
import com.arpnetworking.steno.LoggerFactory;
2627
import com.google.common.collect.ImmutableSet;
@@ -49,6 +50,7 @@ public class AggregationRouter extends AbstractActor {
4950
* @param reaggregationDimensions The dimensions to reaggregate over.
5051
* @param injectClusterAsHost Whether to inject a host dimension based on cluster.
5152
* @param aggregatorTimeout The time to wait from the start of the period for all data.
53+
* @param periodicMetrics The {@link PeriodicMetrics} instance.
5254
* @return A new {@link Props}.
5355
*/
5456
public static Props props(
@@ -57,15 +59,17 @@ public static Props props(
5759
final String clusterHostSuffix,
5860
final ImmutableSet<String> reaggregationDimensions,
5961
final boolean injectClusterAsHost,
60-
final Duration aggregatorTimeout) {
62+
final Duration aggregatorTimeout,
63+
final PeriodicMetrics periodicMetrics) {
6164
return Props.create(
6265
AggregationRouter.class,
6366
metricsListener,
6467
emitter,
6568
clusterHostSuffix,
6669
reaggregationDimensions,
6770
injectClusterAsHost,
68-
aggregatorTimeout);
71+
aggregatorTimeout,
72+
periodicMetrics);
6973
}
7074

7175
/**
@@ -77,6 +81,7 @@ public static Props props(
7781
* @param reaggregationDimensions The dimensions to reaggregate over.
7882
* @param injectClusterAsHost Whether to inject a host dimension based on cluster.
7983
* @param aggregatorTimeout The time to wait from the start of the period for all data.
84+
* @param periodicMetrics The {@link PeriodicMetrics} instance.
8085
*/
8186
@Inject
8287
public AggregationRouter(
@@ -85,15 +90,17 @@ public AggregationRouter(
8590
@Named("cluster-host-suffix") final String clusterHostSuffix,
8691
@Named("reaggregation-dimensions") final ImmutableSet<String> reaggregationDimensions,
8792
@Named("reaggregation-cluster-as-host") final boolean injectClusterAsHost,
88-
@Named("reaggregation-timeout") final Duration aggregatorTimeout) {
93+
@Named("reaggregation-timeout") final Duration aggregatorTimeout,
94+
final PeriodicMetrics periodicMetrics) {
8995
_streamingChild = context().actorOf(
9096
StreamingAggregator.props(
9197
periodicStatistics,
9298
emitter,
9399
clusterHostSuffix,
94100
reaggregationDimensions,
95101
injectClusterAsHost,
96-
aggregatorTimeout),
102+
aggregatorTimeout,
103+
periodicMetrics),
97104
"streaming");
98105
context().setReceiveTimeout(FiniteDuration.apply(30, TimeUnit.MINUTES));
99106
}

src/main/java/com/arpnetworking/clusteraggregator/aggregation/StreamingAggregator.java

Lines changed: 22 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,7 @@
2222
import akka.cluster.sharding.ShardRegion;
2323
import com.arpnetworking.clusteraggregator.models.CombinedMetricData;
2424
import com.arpnetworking.metrics.aggregation.protocol.Messages;
25+
import com.arpnetworking.metrics.incubator.PeriodicMetrics;
2526
import com.arpnetworking.steno.Logger;
2627
import com.arpnetworking.steno.LoggerFactory;
2728
import com.arpnetworking.tsdcore.model.AggregatedData;
@@ -68,6 +69,7 @@ public class StreamingAggregator extends AbstractActorWithTimers {
6869
* @param reaggregationDimensions The dimensions to reaggregate over.
6970
* @param injectClusterAsHost Whether to inject a host dimension based on cluster.
7071
* @param aggregatorTimeout The time to wait from the start of the period for all data.
72+
* @param periodicMetrics The {@link PeriodicMetrics} instance.
7173
* @return A new {@link Props}.
7274
*/
7375
public static Props props(
@@ -76,15 +78,17 @@ public static Props props(
7678
final String clusterHostSuffix,
7779
final ImmutableSet<String> reaggregationDimensions,
7880
final boolean injectClusterAsHost,
79-
final Duration aggregatorTimeout) {
81+
final Duration aggregatorTimeout,
82+
final PeriodicMetrics periodicMetrics) {
8083
return Props.create(
8184
StreamingAggregator.class,
8285
metricsListener,
8386
emitter,
8487
clusterHostSuffix,
8588
reaggregationDimensions,
8689
injectClusterAsHost,
87-
aggregatorTimeout);
90+
aggregatorTimeout,
91+
periodicMetrics);
8892
}
8993

9094
/**
@@ -96,6 +100,7 @@ public static Props props(
96100
* @param reaggregationDimensions The dimensions to reaggregate over.
97101
* @param injectClusterAsHost Whether to inject a host dimension based on cluster.
98102
* @param aggregatorTimeout The time to wait from the start of the period for all data.
103+
* @param periodicMetrics The {@link PeriodicMetrics} instance.
99104
*/
100105
@Inject
101106
public StreamingAggregator(
@@ -104,12 +109,14 @@ public StreamingAggregator(
104109
@Named("cluster-host-suffix") final String clusterHostSuffix,
105110
@Named("reaggregation-dimensions") final ImmutableSet<String> reaggregationDimensions,
106111
@Named("reaggregation-cluster-as-host") final boolean injectClusterAsHost,
107-
@Named("reaggregation-timeout") final Duration aggregatorTimeout) {
112+
@Named("reaggregation-timeout") final Duration aggregatorTimeout,
113+
final PeriodicMetrics periodicMetrics) {
108114
_periodicStatistics = periodicStatistics;
109115
_clusterHostSuffix = clusterHostSuffix;
110116
_reaggregationDimensions = reaggregationDimensions;
111117
_injectClusterAsHost = injectClusterAsHost;
112118
_aggregatorTimeout = aggregatorTimeout;
119+
_periodicMetrics = periodicMetrics;
113120
context().setReceiveTimeout(FiniteDuration.apply(30, TimeUnit.MINUTES));
114121

115122
timers().startPeriodicTimer(BUCKET_CHECK_TIMER_KEY, BucketCheck.getInstance(), FiniteDuration.apply(5, TimeUnit.SECONDS));
@@ -189,8 +196,19 @@ public Receive createReceive() {
189196
.build();
190197
}
191198

199+
@Override
200+
public void preStart() {
201+
_periodicMetrics.recordCounter("actors/streaming_aggregator/started", 1);
202+
}
203+
204+
@Override
205+
public void postStop() {
206+
_periodicMetrics.recordCounter("actors/streaming_aggregator/stopped", 1);
207+
}
208+
192209
@Override
193210
public void preRestart(final Throwable reason, final Optional<Object> message) throws Exception {
211+
_periodicMetrics.recordCounter("actors/streaming_aggregator/restarted", 1);
194212
LOGGER.error()
195213
.setMessage("Aggregator crashing")
196214
.setThrowable(reason)
@@ -355,6 +373,7 @@ private String createHost() {
355373
private final boolean _injectClusterAsHost;
356374
private final Set<Statistic> _statistics = Sets.newHashSet();
357375
private final Duration _aggregatorTimeout;
376+
private final PeriodicMetrics _periodicMetrics;
358377
private boolean _initialized = false;
359378
private Duration _period;
360379
private String _cluster;

0 commit comments

Comments
 (0)