Skip to content

Commit b01cf6a

Browse files
authored
Configurable Reaggregation (#85)
* Add support for dedicated HTTP endpoints (paths) which either persist or reaggregate. * Support configurable reaggregation.
1 parent 4f321bc commit b01cf6a

File tree

5 files changed

+145
-17
lines changed

5 files changed

+145
-17
lines changed

src/main/java/com/arpnetworking/clusteraggregator/GuiceModule.java

Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -66,6 +66,7 @@
6666
import com.arpnetworking.utility.ParallelLeastShardAllocationStrategy;
6767
import com.arpnetworking.utility.partitioning.PartitionSet;
6868
import com.fasterxml.jackson.databind.ObjectMapper;
69+
import com.google.common.collect.ImmutableSet;
6970
import com.google.inject.AbstractModule;
7071
import com.google.inject.Injector;
7172
import com.google.inject.Key;
@@ -349,6 +350,20 @@ private String provideClusterHostSuffix(final ClusterAggregatorConfiguration con
349350
return config.getClusterHostSuffix();
350351
}
351352

353+
@Provides
354+
@Named("reaggregation-dimensions")
355+
@SuppressFBWarnings("UPM_UNCALLED_PRIVATE_METHOD") // Invoked reflectively by Guice
356+
private ImmutableSet<String> provideReaggregationDimensions(final ClusterAggregatorConfiguration config) {
357+
return config.getReaggregationDimensions();
358+
}
359+
360+
@Provides
361+
@Named("reaggregation-cluster-as-host")
362+
@SuppressFBWarnings("UPM_UNCALLED_PRIVATE_METHOD") // Invoked reflectively by Guice
363+
private boolean provideReaggregationInjectClusterAsHost(final ClusterAggregatorConfiguration config) {
364+
return config.getReaggregationInjectClusterAsHost();
365+
}
366+
352367
@Provides
353368
@Named("circonus-partition-set")
354369
@SuppressFBWarnings("UPM_UNCALLED_PRIVATE_METHOD") // Invoked reflectively by Guice

src/main/java/com/arpnetworking/clusteraggregator/aggregation/AggregationRouter.java

Lines changed: 27 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,7 @@
2323
import com.arpnetworking.metrics.aggregation.protocol.Messages;
2424
import com.arpnetworking.steno.Logger;
2525
import com.arpnetworking.steno.LoggerFactory;
26+
import com.google.common.collect.ImmutableSet;
2627
import com.google.inject.Inject;
2728
import com.google.inject.name.Named;
2829
import scala.concurrent.duration.FiniteDuration;
@@ -45,14 +46,25 @@ public class AggregationRouter extends AbstractActor {
4546
* @param metricsListener Where to send metrics about aggregation computations.
4647
* @param emitter Where to send the metrics data.
4748
* @param clusterHostSuffix The suffix to append to the hostname for cluster aggregations.
49+
* @param reaggregationDimensions The dimensions to reaggregate over.
50+
* @param injectClusterAsHost Whether to inject a host dimension based on cluster.
4851
* @return A new <code>Props</code>.
4952
*/
5053
public static Props props(
5154
final ActorRef lifecycleTracker,
5255
final ActorRef metricsListener,
5356
final ActorRef emitter,
54-
final String clusterHostSuffix) {
55-
return Props.create(AggregationRouter.class, lifecycleTracker, metricsListener, emitter, clusterHostSuffix);
57+
final String clusterHostSuffix,
58+
final ImmutableSet<String> reaggregationDimensions,
59+
final boolean injectClusterAsHost) {
60+
return Props.create(
61+
AggregationRouter.class,
62+
lifecycleTracker,
63+
metricsListener,
64+
emitter,
65+
clusterHostSuffix,
66+
reaggregationDimensions,
67+
injectClusterAsHost);
5668
}
5769

5870
/**
@@ -62,15 +74,26 @@ public static Props props(
6274
* @param periodicStatistics Where to send metrics about aggregation computations.
6375
* @param emitter Where to send the metrics data.
6476
* @param clusterHostSuffix The suffix to append to the hostname for cluster aggregations.
77+
* @param reaggregationDimensions The dimensions to reaggregate over.
78+
* @param injectClusterAsHost Whether to inject a host dimension based on cluster.
6579
*/
6680
@Inject
6781
public AggregationRouter(
6882
@Named("bookkeeper-proxy") final ActorRef lifecycleTracker,
6983
@Named("periodic-statistics") final ActorRef periodicStatistics,
7084
@Named("cluster-emitter") final ActorRef emitter,
71-
@Named("cluster-host-suffix") final String clusterHostSuffix) {
85+
@Named("cluster-host-suffix") final String clusterHostSuffix,
86+
@Named("reaggregation-dimensions") final ImmutableSet<String> reaggregationDimensions,
87+
@Named("reaggregation-cluster-as-host") final boolean injectClusterAsHost) {
7288
_streamingChild = context().actorOf(
73-
StreamingAggregator.props(lifecycleTracker, periodicStatistics, emitter, clusterHostSuffix), "streaming");
89+
StreamingAggregator.props(
90+
lifecycleTracker,
91+
periodicStatistics,
92+
emitter,
93+
clusterHostSuffix,
94+
reaggregationDimensions,
95+
injectClusterAsHost),
96+
"streaming");
7497
context().setReceiveTimeout(FiniteDuration.apply(30, TimeUnit.MINUTES));
7598
}
7699

src/main/java/com/arpnetworking/clusteraggregator/aggregation/StreamingAggregator.java

Lines changed: 48 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -33,6 +33,7 @@
3333
import com.arpnetworking.tsdcore.statistics.Statistic;
3434
import com.google.common.collect.ImmutableList;
3535
import com.google.common.collect.ImmutableMap;
36+
import com.google.common.collect.ImmutableSet;
3637
import com.google.common.collect.Lists;
3738
import com.google.common.collect.Sets;
3839
import com.google.inject.Inject;
@@ -51,6 +52,7 @@
5152
import java.util.Set;
5253
import java.util.concurrent.TimeUnit;
5354
import java.util.function.Predicate;
55+
import javax.annotation.Nullable;
5456

5557
/**
5658
* Actual actor responsible for aggregating.
@@ -66,14 +68,25 @@ public class StreamingAggregator extends AbstractActorWithTimers {
6668
* @param metricsListener Where to send metrics about aggregation computations.
6769
* @param emitter Where to send the metrics data.
6870
* @param clusterHostSuffix The suffix to append to the hostname for cluster aggregations.
71+
* @param reaggregationDimensions The dimensions to reaggregate over.
72+
* @param injectClusterAsHost Whether to inject a host dimension based on cluster.
6973
* @return A new <code>Props</code>.
7074
*/
7175
public static Props props(
7276
final ActorRef lifecycleTracker,
7377
final ActorRef metricsListener,
7478
final ActorRef emitter,
75-
final String clusterHostSuffix) {
76-
return Props.create(StreamingAggregator.class, lifecycleTracker, metricsListener, emitter, clusterHostSuffix);
79+
final String clusterHostSuffix,
80+
final ImmutableSet<String> reaggregationDimensions,
81+
final boolean injectClusterAsHost) {
82+
return Props.create(
83+
StreamingAggregator.class,
84+
lifecycleTracker,
85+
metricsListener,
86+
emitter,
87+
clusterHostSuffix,
88+
reaggregationDimensions,
89+
injectClusterAsHost);
7790
}
7891

7992
/**
@@ -83,16 +96,22 @@ public static Props props(
8396
* @param periodicStatistics Where to send metrics about aggregation computations.
8497
* @param emitter Where to send the metrics data.
8598
* @param clusterHostSuffix The suffix to append to the hostname for cluster aggregations.
99+
* @param reaggregationDimensions The dimensions to reaggregate over.
100+
* @param injectClusterAsHost Whether to inject a host dimension based on cluster.
86101
*/
87102
@Inject
88103
public StreamingAggregator(
89104
@Named("bookkeeper-proxy") final ActorRef lifecycleTracker,
90105
@Named("periodic-statistics") final ActorRef periodicStatistics,
91106
@Named("cluster-emitter") final ActorRef emitter,
92-
@Named("cluster-host-suffix") final String clusterHostSuffix) {
107+
@Named("cluster-host-suffix") final String clusterHostSuffix,
108+
@Named("reaggregation-dimensions") final ImmutableSet<String> reaggregationDimensions,
109+
@Named("reaggregation-cluster-as-host") final boolean injectClusterAsHost) {
93110
_lifecycleTracker = lifecycleTracker;
94111
_periodicStatistics = periodicStatistics;
95112
_clusterHostSuffix = clusterHostSuffix;
113+
_reaggregationDimensions = reaggregationDimensions;
114+
_injectClusterAsHost = injectClusterAsHost;
96115
context().setReceiveTimeout(FiniteDuration.apply(30, TimeUnit.MINUTES));
97116

98117
timers().startPeriodicTimer(BUCKET_CHECK_TIMER_KEY, BucketCheck.getInstance(), FiniteDuration.apply(5, TimeUnit.SECONDS));
@@ -284,20 +303,39 @@ private void processAggregationMessage(final Messages.StatisticSetRecord data) {
284303
}
285304

286305
private ImmutableMap<String, String> dimensionsToMap(final Messages.StatisticSetRecord statisticSetRecord) {
287-
final ImmutableMap.Builder<String, String> builder = ImmutableMap.<String, String>builder()
288-
.put(CombinedMetricData.CLUSTER_KEY, statisticSetRecord.getCluster())
289-
.put(CombinedMetricData.SERVICE_KEY, statisticSetRecord.getService())
290-
.put(CombinedMetricData.HOST_KEY, createHost());
306+
// Build a map of dimension key-value pairs dropping any that are to be reaggregated over
307+
final ImmutableMap.Builder<String, String> builder = ImmutableMap.builder();
291308

309+
// Grab the explicit cluster and service dimensions from the record
310+
addDimension(CombinedMetricData.CLUSTER_KEY, statisticSetRecord.getCluster(), builder);
311+
addDimension(CombinedMetricData.SERVICE_KEY, statisticSetRecord.getService(), builder);
312+
313+
// Either inject the cluster as the host dimension or grab it from the record dimensions
314+
if (_injectClusterAsHost) {
315+
addDimension(CombinedMetricData.HOST_KEY, createHost(), builder);
316+
} else {
317+
final @Nullable String hostDimension = statisticSetRecord.getDimensionsMap().get(CombinedMetricData.HOST_KEY);
318+
if (hostDimension != null) {
319+
addDimension(CombinedMetricData.HOST_KEY, hostDimension, builder);
320+
}
321+
}
322+
323+
// Inject all other dimensions (e.g. not service, cluster or host)
292324
statisticSetRecord.getDimensionsMap()
293325
.entrySet()
294326
.stream()
295327
.filter(NOT_EXPLICIT_DIMENSION)
296-
.forEach(entry -> builder.put(entry.getKey(), entry.getValue()));
328+
.forEach(entry -> addDimension(entry.getKey(), entry.getValue(), builder));
297329

298330
return builder.build();
299331
}
300332

333+
private void addDimension(final String key, final String value, final ImmutableMap.Builder<String, String> mapBuilder) {
334+
if (!_reaggregationDimensions.contains(key)) {
335+
mapBuilder.put(key, value);
336+
}
337+
}
338+
301339
private String createHost() {
302340
return _cluster + "-cluster" + _clusterHostSuffix;
303341
}
@@ -307,6 +345,8 @@ private String createHost() {
307345
private final ActorRef _lifecycleTracker;
308346
private final ActorRef _periodicStatistics;
309347
private final String _clusterHostSuffix;
348+
private final ImmutableSet<String> _reaggregationDimensions;
349+
private final boolean _injectClusterAsHost;
310350
private final Set<Statistic> _statistics = Sets.newHashSet();
311351
private boolean _initialized = false;
312352
private Period _period;

src/main/java/com/arpnetworking/clusteraggregator/configuration/ClusterAggregatorConfiguration.java

Lines changed: 44 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -17,10 +17,9 @@
1717

1818
import com.arpnetworking.commons.builder.OvalBuilder;
1919
import com.arpnetworking.commons.jackson.databind.ObjectMapperFactory;
20-
import com.arpnetworking.utility.InterfaceDatabase;
21-
import com.arpnetworking.utility.ReflectionsDatabase;
2220
import com.fasterxml.jackson.databind.ObjectMapper;
2321
import com.google.common.base.MoreObjects;
22+
import com.google.common.collect.ImmutableSet;
2423
import com.google.common.collect.Maps;
2524
import net.sf.oval.constraint.NotEmpty;
2625
import net.sf.oval.constraint.NotNull;
@@ -94,6 +93,14 @@ public File getClusterPipelineConfiguration() {
9493
return _clusterPipelineConfiguration;
9594
}
9695

96+
public ImmutableSet<String> getReaggregationDimensions() {
97+
return _reaggregationDimensions;
98+
}
99+
100+
public boolean getReaggregationInjectClusterAsHost() {
101+
return _reaggregationInjectClusterAsHost;
102+
}
103+
97104
public RebalanceConfiguration getRebalanceConfiguration() {
98105
return _rebalanceConfiguration;
99106
}
@@ -133,6 +140,8 @@ public String toString() {
133140
.add("AkkaConfiguration", _akkaConfiguration)
134141
.add("HostPipelineConfiguration", _hostPipelineConfiguration)
135142
.add("ClusterPipelineConfiguration", _hostPipelineConfiguration)
143+
.add("ReaggregationDimensions", _reaggregationDimensions)
144+
.add("ReaggregationInjectClusterAsHost", _reaggregationInjectClusterAsHost)
136145
.add("MinConnectionTimeout", _minConnectionTimeout)
137146
.add("MaxConnectionTimeout", _maxConnectionTimeout)
138147
.add("JvmMetricsCollectionInterval", _jvmMetricsCollectionInterval)
@@ -154,6 +163,8 @@ private ClusterAggregatorConfiguration(final Builder builder) {
154163
_akkaConfiguration = Maps.newHashMap(builder._akkaConfiguration);
155164
_hostPipelineConfiguration = builder._hostPipelineConfiguration;
156165
_clusterPipelineConfiguration = builder._clusterPipelineConfiguration;
166+
_reaggregationDimensions = builder._reaggregationDimensions;
167+
_reaggregationInjectClusterAsHost = builder._reaggregationInjectClusterAsHost;
157168
_minConnectionTimeout = builder._minConnectionTimeout;
158169
_maxConnectionTimeout = builder._maxConnectionTimeout;
159170
_jvmMetricsCollectionInterval = builder._jvmMetricsCollectionInterval;
@@ -174,6 +185,8 @@ private ClusterAggregatorConfiguration(final Builder builder) {
174185
private final Map<String, ?> _akkaConfiguration;
175186
private final File _clusterPipelineConfiguration;
176187
private final File _hostPipelineConfiguration;
188+
private final ImmutableSet<String> _reaggregationDimensions;
189+
private final boolean _reaggregationInjectClusterAsHost;
177190
private final Period _minConnectionTimeout;
178191
private final Period _maxConnectionTimeout;
179192
private final Period _jvmMetricsCollectionInterval;
@@ -182,8 +195,6 @@ private ClusterAggregatorConfiguration(final Builder builder) {
182195
private final boolean _calculateClusterAggregations;
183196
private final Map<String, DatabaseConfiguration> _databaseConfigurations;
184197

185-
private static final InterfaceDatabase INTERFACE_DATABASE = ReflectionsDatabase.newInstance();
186-
187198
/**
188199
* Implementation of builder pattern for {@link com.arpnetworking.clusteraggregator.configuration.ClusterAggregatorConfiguration}.
189200
*
@@ -371,6 +382,31 @@ public Builder setHostPipelineConfiguration(final File value) {
371382
return this;
372383
}
373384

385+
/**
386+
* The reaggregation dimensions. Optional. Default is set containing
387+
* {@code host}. Cannot be null.
388+
*
389+
* @param value The regaggregation dimensions.
390+
* @return This instance of <code>Builder</code>.
391+
*/
392+
public Builder setReaggregationDimensions(final ImmutableSet<String> value) {
393+
_reaggregationDimensions = value;
394+
return this;
395+
}
396+
397+
/**
398+
* Whether to inject a {@code host} dimension with a value based on
399+
* the {@code cluster} dimension. Optional. Default is {@code True}.
400+
* Cannot be null.
401+
*
402+
* @param value Whether to inject {@code host} derived from {@code cluster}.
403+
* @return This instance of <code>Builder</code>.
404+
*/
405+
public Builder setReaggregationInjectClusterAsHost(final Boolean value) {
406+
_reaggregationInjectClusterAsHost = value;
407+
return this;
408+
}
409+
374410
/**
375411
* Configuration for the shard rebalance settings.
376412
*
@@ -431,6 +467,10 @@ public Builder setCalculateClusterAggregations(final Boolean value) {
431467
@NotNull
432468
private File _clusterPipelineConfiguration;
433469
@NotNull
470+
private ImmutableSet<String> _reaggregationDimensions = ImmutableSet.of();
471+
@NotNull
472+
private Boolean _reaggregationInjectClusterAsHost = Boolean.TRUE;
473+
@NotNull
434474
private File _hostPipelineConfiguration;
435475
@NotNull
436476
private Map<String, ?> _akkaConfiguration;

src/test/java/com/arpnetworking/clusteraggregator/aggregation/AggregatorTest.java

Lines changed: 11 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,7 @@
2222
import akka.testkit.TestActorRef;
2323
import akka.testkit.TestProbe;
2424
import com.arpnetworking.utility.BaseActorTest;
25+
import com.google.common.collect.ImmutableSet;
2526
import org.junit.Assert;
2627
import org.junit.Test;
2728
import scala.concurrent.duration.FiniteDuration;
@@ -51,7 +52,16 @@ public void passivatesProperly() {
5152
public ActorRef createAggregator(final TestProbe probe) {
5253
final TestProbe ignored = TestProbe.apply(getSystem());
5354
return TestActorRef.apply(
54-
AggregationRouter.props(ignored.ref(), ignored.ref(), ignored.ref(), ""), probe.ref(), "agg", getSystem());
55+
AggregationRouter.props(
56+
ignored.ref(),
57+
ignored.ref(),
58+
ignored.ref(),
59+
"",
60+
ImmutableSet.of(),
61+
true),
62+
probe.ref(),
63+
"agg",
64+
getSystem());
5565
}
5666

5767
private static final FiniteDuration TIMEOUT = FiniteDuration.apply(10, TimeUnit.SECONDS);

0 commit comments

Comments
 (0)