Skip to content

Commit d7e8d54

Browse files
authored
Make the dimensions move through the cluster aggregation. (#69)
1 parent 6eaca91 commit d7e8d54

File tree

1 file changed

+24
-2
lines changed

1 file changed

+24
-2
lines changed

src/main/java/com/arpnetworking/clusteraggregator/aggregation/StreamingAggregator.java

Lines changed: 24 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -51,6 +51,7 @@
5151
import java.util.Optional;
5252
import java.util.Set;
5353
import java.util.concurrent.TimeUnit;
54+
import java.util.function.Predicate;
5455

5556
/**
5657
* Actual actor responsible for aggregating.
@@ -161,7 +162,7 @@ public Receive createReceive() {
161162
}
162163
final PeriodicData periodicData = new PeriodicData.Builder()
163164
.setData(builder.build())
164-
.setDimensions(ImmutableMap.of("host", createHost()))
165+
.setDimensions(_dimensions)
165166
.setConditions(ImmutableList.of())
166167
.setPeriod(_period)
167168
.setStart(bucket.getPeriodStart())
@@ -214,11 +215,12 @@ private void processAggregationMessage(final Messages.StatisticSetRecord data) {
214215
_cluster = metricData.getCluster();
215216
_metric = metricData.getMetricName();
216217
_service = metricData.getService();
218+
_dimensions = dimensionsToMap(data);
217219
_resultBuilder = new AggregatedData.Builder()
218220
.setHost(createHost())
219221
.setPeriod(_period)
220222
.setPopulationSize(1L)
221-
.setSamples(Collections.<Quantity>emptyList())
223+
.setSamples(Collections.emptyList())
222224
.setStart(DateTime.now().hourOfDay().roundFloorCopy())
223225
.setValue(new Quantity.Builder().setValue(0d).build());
224226

@@ -292,6 +294,21 @@ private void processAggregationMessage(final Messages.StatisticSetRecord data) {
292294
}
293295
}
294296

297+
private ImmutableMap<String, String> dimensionsToMap(final Messages.StatisticSetRecord statisticSetRecord) {
298+
final ImmutableMap.Builder<String, String> builder = ImmutableMap.<String, String>builder()
299+
.put(CombinedMetricData.CLUSTER_KEY, statisticSetRecord.getCluster())
300+
.put(CombinedMetricData.SERVICE_KEY, statisticSetRecord.getService())
301+
.put(CombinedMetricData.HOST_KEY, createHost());
302+
303+
statisticSetRecord.getDimensionsMap()
304+
.entrySet()
305+
.stream()
306+
.filter(NOT_EXPLICIT_DIMENSION)
307+
.forEach(entry -> builder.put(entry.getKey(), entry.getValue()));
308+
309+
return builder.build();
310+
}
311+
295312
private String createHost() {
296313
return _cluster + "-cluster" + _clusterHostSuffix;
297314
}
@@ -307,9 +324,14 @@ private String createHost() {
307324
private String _cluster;
308325
private String _metric;
309326
private String _service;
327+
private ImmutableMap<String, String> _dimensions;
310328
private AggregatedData.Builder _resultBuilder;
311329
private static final Duration AGG_TIMEOUT = Duration.standardMinutes(1);
312330
private static final Logger LOGGER = LoggerFactory.getLogger(StreamingAggregator.class);
331+
private static final Predicate<Map.Entry<String, String>> NOT_EXPLICIT_DIMENSION = entry ->
332+
!(entry.getKey().equals(CombinedMetricData.CLUSTER_KEY)
333+
|| entry.getKey().equals(CombinedMetricData.HOST_KEY)
334+
|| entry.getKey().equals(CombinedMetricData.SERVICE_KEY));
313335

314336
private static final class BucketCheck implements Serializable {
315337
private static final long serialVersionUID = 1L;

0 commit comments

Comments
 (0)