Skip to content

Commit 4ed21ca

Browse files
author
Ville Koskela
committed
Merge branch 'master' into influxdb-sink
2 parents ff90f22 + 60a4cb2 commit 4ed21ca

File tree

2 files changed

+84
-42
lines changed

2 files changed

+84
-42
lines changed

src/main/java/com/arpnetworking/clusteraggregator/Main.java

Lines changed: 80 additions & 38 deletions
Original file line numberDiff line numberDiff line change
@@ -17,9 +17,6 @@
1717

1818
import akka.actor.ActorRef;
1919
import akka.actor.ActorSystem;
20-
import akka.http.javadsl.IncomingConnection;
21-
import akka.http.javadsl.ServerBinding;
22-
import akka.stream.javadsl.Source;
2320
import ch.qos.logback.classic.LoggerContext;
2421
import com.arpnetworking.clusteraggregator.configuration.ClusterAggregatorConfiguration;
2522
import com.arpnetworking.configuration.jackson.DynamicConfiguration;
@@ -30,6 +27,8 @@
3027
import com.arpnetworking.utility.Database;
3128
import com.arpnetworking.utility.Launchable;
3229
import com.fasterxml.jackson.databind.ObjectMapper;
30+
import com.google.common.base.Optional;
31+
import com.google.common.base.Throwables;
3332
import com.google.common.collect.Lists;
3433
import com.google.inject.Guice;
3534
import com.google.inject.Injector;
@@ -38,11 +37,11 @@
3837
import com.google.inject.name.Names;
3938
import org.slf4j.LoggerFactory;
4039
import scala.concurrent.Await;
41-
import scala.concurrent.Future;
4240
import scala.concurrent.duration.Duration;
4341

4442
import java.io.File;
4543
import java.util.List;
44+
import java.util.concurrent.Semaphore;
4645
import java.util.concurrent.TimeUnit;
4746

4847
/**
@@ -51,21 +50,18 @@
5150
* @author Brandon Arp (barp at groupon dot com)
5251
*/
5352
public final class Main implements Launchable {
54-
5553
/**
5654
* Entry point.
5755
*
5856
* @param args command line arguments
5957
*/
6058
public static void main(final String[] args) {
61-
LOGGER.info()
62-
.setMessage("Launching cluster-aggregator")
63-
.log();
64-
6559
Thread.setDefaultUncaughtExceptionHandler(
6660
(thread, throwable) -> {
67-
System.err.println("Unhandled exception! exception: " + throwable.toString());
68-
throwable.printStackTrace(System.err);
61+
LOGGER.error()
62+
.setMessage("Unhandled exception!")
63+
.setThrowable(throwable)
64+
.log();
6965
});
7066

7167
Thread.currentThread().setUncaughtExceptionHandler(
@@ -77,6 +73,12 @@ public static void main(final String[] args) {
7773
}
7874
);
7975

76+
LOGGER.info()
77+
.setMessage("Launching cluster-aggregator")
78+
.log();
79+
80+
Runtime.getRuntime().addShutdownHook(SHUTDOWN_THREAD);
81+
8082
if (args.length != 1) {
8183
throw new RuntimeException("No configuration file specified");
8284
}
@@ -86,32 +88,37 @@ public static void main(final String[] args) {
8688
.addData("file", args[0])
8789
.log();
8890

89-
final File configurationFile = new File(args[0]);
90-
final Configurator<Main, ClusterAggregatorConfiguration> configurator =
91-
new Configurator<>(Main::new, ClusterAggregatorConfiguration.class);
92-
final ObjectMapper objectMapper = ClusterAggregatorConfiguration.createObjectMapper();
93-
final DynamicConfiguration configuration = new DynamicConfiguration.Builder()
94-
.setObjectMapper(objectMapper)
95-
.addSourceBuilder(
96-
new JsonNodeFileSource.Builder().setObjectMapper(objectMapper)
97-
.setFile(configurationFile))
98-
.addTrigger(new FileTrigger.Builder().setFile(configurationFile).build())
99-
.addListener(configurator)
100-
.build();
101-
102-
configuration.launch();
103-
104-
Runtime.getRuntime().addShutdownHook(
105-
new Thread(
106-
() -> {
107-
configuration.shutdown();
108-
configurator.shutdown();
109-
LOGGER.info()
110-
.setMessage("Stopping cluster-aggregator")
111-
.log();
112-
final LoggerContext context = (LoggerContext) LoggerFactory.getILoggerFactory();
113-
context.stop();
114-
}));
91+
Optional<DynamicConfiguration> configuration = Optional.absent();
92+
Optional<Configurator<Main, ClusterAggregatorConfiguration>> configurator = Optional.absent();
93+
try {
94+
final File configurationFile = new File(args[0]);
95+
configurator = Optional.of(new Configurator<>(Main::new, ClusterAggregatorConfiguration.class));
96+
final ObjectMapper objectMapper = ClusterAggregatorConfiguration.createObjectMapper();
97+
configuration = Optional.of(new DynamicConfiguration.Builder()
98+
.setObjectMapper(objectMapper)
99+
.addSourceBuilder(
100+
new JsonNodeFileSource.Builder().setObjectMapper(objectMapper)
101+
.setFile(configurationFile))
102+
.addTrigger(new FileTrigger.Builder().setFile(configurationFile).build())
103+
.addListener(configurator.get())
104+
.build());
105+
106+
configuration.get().launch();
107+
108+
// Wait for application shutdown
109+
SHUTDOWN_SEMAPHORE.acquire();
110+
} catch (final InterruptedException e) {
111+
throw Throwables.propagate(e);
112+
} finally {
113+
if (configurator.isPresent()) {
114+
configurator.get().shutdown();
115+
}
116+
if (configuration.isPresent()) {
117+
configuration.get().shutdown();
118+
}
119+
// Notify the shutdown that we're done
120+
SHUTDOWN_SEMAPHORE.release();
121+
}
115122
}
116123

117124
/**
@@ -245,6 +252,41 @@ private void shutdownAkka() {
245252
private static final Logger LOGGER = com.arpnetworking.steno.LoggerFactory.getLogger(Main.class);
246253
private static final Duration SHUTDOWN_TIMEOUT = Duration.create(3, TimeUnit.MINUTES);
247254
private static final SourceTypeLiteral SOURCE_TYPE_LITERAL = new SourceTypeLiteral();
255+
private static final Semaphore SHUTDOWN_SEMAPHORE = new Semaphore(0);
256+
private static final Thread SHUTDOWN_THREAD = new ShutdownThread();
257+
258+
private static final class ShutdownThread extends Thread {
259+
private ShutdownThread() {
260+
super("ClusterAggregatorShutdownHook");
261+
}
262+
263+
@Override
264+
public void run() {
265+
LOGGER.info()
266+
.setMessage("Stopping cluster-aggregator")
267+
.log();
268+
269+
// release the main thread waiting for shutdown signal
270+
SHUTDOWN_SEMAPHORE.release();
271+
272+
try {
273+
// wait for it to signal that it has completed shutdown
274+
if (!SHUTDOWN_SEMAPHORE.tryAcquire(SHUTDOWN_TIMEOUT.toSeconds(), TimeUnit.SECONDS)) {
275+
LOGGER.warn()
276+
.setMessage("Shutdown did not complete in a timely manner")
277+
.log();
278+
}
279+
} catch (final InterruptedException e) {
280+
throw Throwables.propagate(e);
281+
} finally {
282+
LOGGER.info()
283+
.setMessage("Shutdown complete")
284+
.log();
285+
final LoggerContext context = (LoggerContext) LoggerFactory.getILoggerFactory();
286+
context.stop();
287+
}
288+
}
289+
}
248290

249-
private static class SourceTypeLiteral extends TypeLiteral<Source<IncomingConnection, Future<ServerBinding>>> {}
291+
private static class SourceTypeLiteral extends TypeLiteral<java.util.concurrent.CompletionStage<akka.http.javadsl.ServerBinding>> {}
250292
}

src/main/java/com/arpnetworking/clusteraggregator/models/PeriodMetrics.java

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -142,21 +142,21 @@ public long getStatisticsLatestPeriod() {
142142
private BloomFilter<CharSequence> createServicesBF() {
143143
return BloomFilter.create(
144144
Funnels.stringFunnel(Charsets.UTF_8),
145-
100_000,
146-
0.0001);
145+
10_000,
146+
0.001);
147147
}
148148

149149
private BloomFilter<CharSequence> createMetricsBF() {
150150
return BloomFilter.create(
151151
Funnels.stringFunnel(Charsets.UTF_8),
152-
10_000_000,
152+
1_000_000,
153153
0.001);
154154
}
155155

156156
private BloomFilter<CharSequence> createStatisticsBF() {
157157
return BloomFilter.create(
158158
Funnels.stringFunnel(Charsets.UTF_8),
159-
100_000_000,
159+
10_000_000,
160160
0.005);
161161
}
162162

0 commit comments

Comments
 (0)