Skip to content

Commit c48cfe9

Browse files
authored
Remove the book-keeper since it does not work for non-reaggregation case and the messages are no longer java serializable. Also improve logging in the streaming aggregator. Finally, support configuration of the streaming aggregator timeout. (#94)
1 parent 2fd1300 commit c48cfe9

File tree

17 files changed

+155
-825
lines changed

17 files changed

+155
-825
lines changed

pom.xml

Lines changed: 0 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -627,11 +627,6 @@
627627
<artifactId>akka-cluster_${scala.version}</artifactId>
628628
<version>${akka.version}</version>
629629
</dependency>
630-
<dependency>
631-
<groupId>com.typesafe.akka</groupId>
632-
<artifactId>akka-cluster-tools_${scala.version}</artifactId>
633-
<version>${akka.version}</version>
634-
</dependency>
635630
<dependency>
636631
<groupId>com.typesafe.akka</groupId>
637632
<artifactId>akka-remote_${scala.version}</artifactId>

src/main/java/com/arpnetworking/clusteraggregator/AggregatorLifecycle.java

Lines changed: 0 additions & 99 deletions
This file was deleted.

src/main/java/com/arpnetworking/clusteraggregator/GuiceModule.java

Lines changed: 9 additions & 37 deletions
Original file line numberDiff line numberDiff line change
@@ -22,10 +22,6 @@
2222
import akka.cluster.Cluster;
2323
import akka.cluster.sharding.ClusterSharding;
2424
import akka.cluster.sharding.ClusterShardingSettings;
25-
import akka.cluster.singleton.ClusterSingletonManager;
26-
import akka.cluster.singleton.ClusterSingletonManagerSettings;
27-
import akka.cluster.singleton.ClusterSingletonProxy;
28-
import akka.cluster.singleton.ClusterSingletonProxySettings;
2925
import akka.http.javadsl.ConnectHttp;
3026
import akka.http.javadsl.Http;
3127
import akka.http.javadsl.IncomingConnection;
@@ -36,8 +32,6 @@
3632
import akka.stream.Materializer;
3733
import com.arpnetworking.clusteraggregator.aggregation.AggMessageExtractor;
3834
import com.arpnetworking.clusteraggregator.aggregation.AggregationRouter;
39-
import com.arpnetworking.clusteraggregator.aggregation.Bookkeeper;
40-
import com.arpnetworking.clusteraggregator.bookkeeper.persistence.InMemoryBookkeeper;
4135
import com.arpnetworking.clusteraggregator.client.AggClientServer;
4236
import com.arpnetworking.clusteraggregator.client.AggClientSupervisor;
4337
import com.arpnetworking.clusteraggregator.client.HttpSourceActor;
@@ -79,6 +73,7 @@
7973
import com.typesafe.config.ConfigParseOptions;
8074
import com.typesafe.config.ConfigSyntax;
8175
import edu.umd.cs.findbugs.annotations.SuppressFBWarnings;
76+
import org.joda.time.Period;
8277

8378
import java.io.File;
8479
import java.io.IOException;
@@ -210,35 +205,17 @@ private ActorRef launchEmitter(final Injector injector, final ActorSystem system
210205
return emitterConfigurationProxy;
211206
}
212207

213-
@Provides
214-
@Singleton
215-
@Named("bookkeeper-proxy")
216-
@SuppressFBWarnings("UPM_UNCALLED_PRIVATE_METHOD") // Invoked reflectively by Guice
217-
private ActorRef provideBookkeeperProxy(final ActorSystem system) {
218-
system.actorOf(
219-
ClusterSingletonManager.props(
220-
Bookkeeper.props(new InMemoryBookkeeper()),
221-
PoisonPill.getInstance(),
222-
ClusterSingletonManagerSettings.create(system)),
223-
"bookkeeper");
224-
225-
return system.actorOf(
226-
ClusterSingletonProxy.props("/user/bookkeeper", ClusterSingletonProxySettings.create(system)),
227-
"bookkeeperProxy");
228-
}
229-
230208
@Provides
231209
@Singleton
232210
@Named("status-cache")
233211
@SuppressFBWarnings("UPM_UNCALLED_PRIVATE_METHOD") // Invoked reflectively by Guice
234212
private ActorRef provideStatusCache(
235213
final ActorSystem system,
236-
@Named("bookkeeper-proxy") final ActorRef bookkeeperProxy,
237214
@Named("periodic-statistics") final ActorRef periodicStats,
238215
final MetricsFactory metricsFactory) {
239216
final Cluster cluster = Cluster.get(system);
240217
final ActorRef clusterStatusCache = system.actorOf(ClusterStatusCache.props(cluster, metricsFactory), "cluster-status");
241-
return system.actorOf(Status.props(bookkeeperProxy, cluster, clusterStatusCache, periodicStats), "status");
218+
return system.actorOf(Status.props(cluster, clusterStatusCache, periodicStats), "status");
242219
}
243220

244221
@Provides
@@ -249,18 +226,6 @@ private ActorRef provideTcpServer(final Injector injector, final ActorSystem sys
249226
return system.actorOf(GuiceActorCreator.props(injector, AggClientServer.class), "tcp-server");
250227
}
251228

252-
@Provides
253-
@Singleton
254-
@Named("aggregator-lifecycle")
255-
@SuppressFBWarnings("UPM_UNCALLED_PRIVATE_METHOD") // Invoked reflectively by Guice
256-
private ActorRef provideAggregatorLifecycleTracker(
257-
final ActorSystem system,
258-
@Named("bookkeeper-proxy") final ActorRef bookkeeperProxy) {
259-
final ActorRef aggLifecycle = system.actorOf(AggregatorLifecycle.props(), "agg-lifecycle");
260-
aggLifecycle.tell(new AggregatorLifecycle.Subscribe(bookkeeperProxy), bookkeeperProxy);
261-
return aggLifecycle;
262-
}
263-
264229
@Provides
265230
@Singleton
266231
@Named("http-server")
@@ -364,6 +329,13 @@ private boolean provideReaggregationInjectClusterAsHost(final ClusterAggregatorC
364329
return config.getReaggregationInjectClusterAsHost();
365330
}
366331

332+
@Provides
333+
@Named("reaggregation-timeout")
334+
@SuppressFBWarnings("UPM_UNCALLED_PRIVATE_METHOD") // Invoked reflectively by Guice
335+
private Period provideReaggregationTimeout(final ClusterAggregatorConfiguration config) {
336+
return config.getReaggregationTimeout();
337+
}
338+
367339
@Provides
368340
@Named("circonus-partition-set")
369341
@SuppressFBWarnings("UPM_UNCALLED_PRIVATE_METHOD") // Invoked reflectively by Guice

src/main/java/com/arpnetworking/clusteraggregator/Main.java

Lines changed: 6 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -172,12 +172,6 @@ private void launchActors(final Injector injector) {
172172
injector.getInstance(Key.get(ActorRef.class, Names.named("host-emitter")));
173173
injector.getInstance(Key.get(ActorRef.class, Names.named("cluster-emitter")));
174174

175-
LOGGER.info()
176-
.setMessage("Launching bookkeeper singleton and proxy")
177-
.log();
178-
injector.getInstance(Key.get(ActorRef.class, Names.named("bookkeeper-proxy")));
179-
180-
injector.getInstance(Key.get(ActorRef.class, Names.named("aggregator-lifecycle")));
181175
injector.getInstance(Key.get(ActorRef.class, Names.named("periodic-statistics")));
182176

183177
LOGGER.info()
@@ -231,9 +225,13 @@ private void shutdownAkka() {
231225
LOGGER.info()
232226
.setMessage("Stopping Akka")
233227
.log();
234-
_shutdownActor.tell(GracefulShutdownActor.Shutdown.instance(), ActorRef.noSender());
228+
if (_shutdownActor != null) {
229+
_shutdownActor.tell(GracefulShutdownActor.Shutdown.instance(), ActorRef.noSender());
230+
}
235231
try {
236-
Await.result(_system.whenTerminated(), SHUTDOWN_TIMEOUT);
232+
if (_system != null) {
233+
Await.result(_system.whenTerminated(), SHUTDOWN_TIMEOUT);
234+
}
237235
// CHECKSTYLE.OFF: IllegalCatch - Prevent program shutdown
238236
} catch (final Exception e) {
239237
// CHECKSTYLE.ON: IllegalCatch

src/main/java/com/arpnetworking/clusteraggregator/Status.java

Lines changed: 1 addition & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -23,7 +23,6 @@
2323
import akka.cluster.MemberStatus;
2424
import akka.pattern.PatternsCS;
2525
import akka.remote.AssociationErrorEvent;
26-
import com.arpnetworking.clusteraggregator.models.BookkeeperData;
2726
import com.arpnetworking.clusteraggregator.models.MetricsRequest;
2827
import com.arpnetworking.clusteraggregator.models.PeriodMetrics;
2928
import com.arpnetworking.clusteraggregator.models.StatusResponse;
@@ -53,18 +52,15 @@ public class Status extends AbstractActor {
5352
/**
5453
* Public constructor.
5554
*
56-
* @param metricsBookkeeper Where to get the status metrics from.
5755
* @param cluster The instance of the Clustering extension.
5856
* @param clusterStatusCache The actor holding the cached cluster status.
5957
* @param localMetrics The actor holding the local node metrics.
6058
*/
6159
public Status(
62-
final ActorRef metricsBookkeeper,
6360
final Cluster cluster,
6461
final ActorRef clusterStatusCache,
6562
final ActorRef localMetrics) {
6663

67-
_metricsBookkeeper = metricsBookkeeper;
6864
_cluster = cluster;
6965
_clusterStatusCache = clusterStatusCache;
7066
_localMetrics = localMetrics;
@@ -74,19 +70,17 @@ public Status(
7470
/**
7571
* Creates a <code>Props</code> for use in Akka.
7672
*
77-
* @param bookkeeper Where to get the status metrics from.
7873
* @param cluster The instance of the Clustering extension.
7974
* @param clusterStatusCache The actor holding the cached cluster status.
8075
* @param localMetrics The actor holding the local node metrics.
8176
* @return A new <code>Props</code>.
8277
*/
8378
public static Props props(
84-
final ActorRef bookkeeper,
8579
final Cluster cluster,
8680
final ActorRef clusterStatusCache,
8781
final ActorRef localMetrics) {
8882

89-
return Props.create(Status.class, bookkeeper, cluster, clusterStatusCache, localMetrics);
83+
return Props.create(Status.class, cluster, clusterStatusCache, localMetrics);
9084
}
9185

9286
@Override
@@ -116,13 +110,6 @@ public Receive createReceive() {
116110

117111
private void processStatusRequest() {
118112
// Call the bookkeeper
119-
final CompletableFuture<BookkeeperData> bookkeeperFuture = PatternsCS.ask(
120-
_metricsBookkeeper,
121-
new MetricsRequest(),
122-
Duration.ofSeconds(3))
123-
.<BookkeeperData>thenApply(new CastMapper<>())
124-
.exceptionally(new AsNullRecovery<>())
125-
.toCompletableFuture();
126113
final CompletableFuture<ClusterStatusCache.StatusResponse> clusterStateFuture =
127114
PatternsCS.ask(
128115
_clusterStatusCache,
@@ -143,11 +130,9 @@ private void processStatusRequest() {
143130

144131
PatternsCS.pipe(
145132
CompletableFuture.allOf(
146-
bookkeeperFuture.toCompletableFuture(),
147133
clusterStateFuture.toCompletableFuture(),
148134
localMetricsFuture.toCompletableFuture())
149135
.thenApply(aVoid -> new StatusResponse.Builder()
150-
.setClusterMetrics(bookkeeperFuture.getNow(null))
151136
.setClusterState(clusterStateFuture.getNow(null))
152137
.setLocalMetrics(localMetricsFuture.getNow(null))
153138
.setLocalAddress(_cluster.selfAddress())
@@ -158,7 +143,6 @@ private void processStatusRequest() {
158143

159144
private boolean _quarantined = false;
160145

161-
private final ActorRef _metricsBookkeeper;
162146
private final Cluster _cluster;
163147
private final ActorRef _clusterStatusCache;
164148
private final ActorRef _localMetrics;

src/main/java/com/arpnetworking/clusteraggregator/aggregation/AggregationRouter.java

Lines changed: 11 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,7 @@
2626
import com.google.common.collect.ImmutableSet;
2727
import com.google.inject.Inject;
2828
import com.google.inject.name.Named;
29+
import org.joda.time.Period;
2930
import scala.concurrent.duration.FiniteDuration;
3031

3132
import java.io.Serializable;
@@ -42,57 +43,57 @@ public class AggregationRouter extends AbstractActor {
4243
/**
4344
* Creates a <code>Props</code> for use in Akka.
4445
*
45-
* @param lifecycleTracker Where to register the liveliness of this aggregator.
4646
* @param metricsListener Where to send metrics about aggregation computations.
4747
* @param emitter Where to send the metrics data.
4848
* @param clusterHostSuffix The suffix to append to the hostname for cluster aggregations.
4949
* @param reaggregationDimensions The dimensions to reaggregate over.
5050
* @param injectClusterAsHost Whether to inject a host dimension based on cluster.
51+
* @param aggregatorTimeout The time to wait from the start of the period for all data.
5152
* @return A new <code>Props</code>.
5253
*/
5354
public static Props props(
54-
final ActorRef lifecycleTracker,
5555
final ActorRef metricsListener,
5656
final ActorRef emitter,
5757
final String clusterHostSuffix,
5858
final ImmutableSet<String> reaggregationDimensions,
59-
final boolean injectClusterAsHost) {
59+
final boolean injectClusterAsHost,
60+
final Period aggregatorTimeout) {
6061
return Props.create(
6162
AggregationRouter.class,
62-
lifecycleTracker,
6363
metricsListener,
6464
emitter,
6565
clusterHostSuffix,
6666
reaggregationDimensions,
67-
injectClusterAsHost);
67+
injectClusterAsHost,
68+
aggregatorTimeout);
6869
}
6970

7071
/**
7172
* Public constructor.
7273
*
73-
* @param lifecycleTracker Where to register the liveliness of this aggregator.
7474
* @param periodicStatistics Where to send metrics about aggregation computations.
7575
* @param emitter Where to send the metrics data.
7676
* @param clusterHostSuffix The suffix to append to the hostname for cluster aggregations.
7777
* @param reaggregationDimensions The dimensions to reaggregate over.
7878
* @param injectClusterAsHost Whether to inject a host dimension based on cluster.
79+
* @param aggregatorTimeout The time to wait from the start of the period for all data.
7980
*/
8081
@Inject
8182
public AggregationRouter(
82-
@Named("bookkeeper-proxy") final ActorRef lifecycleTracker,
8383
@Named("periodic-statistics") final ActorRef periodicStatistics,
8484
@Named("cluster-emitter") final ActorRef emitter,
8585
@Named("cluster-host-suffix") final String clusterHostSuffix,
8686
@Named("reaggregation-dimensions") final ImmutableSet<String> reaggregationDimensions,
87-
@Named("reaggregation-cluster-as-host") final boolean injectClusterAsHost) {
87+
@Named("reaggregation-cluster-as-host") final boolean injectClusterAsHost,
88+
@Named("reaggregation-timeout") final Period aggregatorTimeout) {
8889
_streamingChild = context().actorOf(
8990
StreamingAggregator.props(
90-
lifecycleTracker,
9191
periodicStatistics,
9292
emitter,
9393
clusterHostSuffix,
9494
reaggregationDimensions,
95-
injectClusterAsHost),
95+
injectClusterAsHost,
96+
aggregatorTimeout),
9697
"streaming");
9798
context().setReceiveTimeout(FiniteDuration.apply(30, TimeUnit.MINUTES));
9899
}

0 commit comments

Comments
 (0)