Skip to content

Commit 7aa1f29

Browse files
authored
Reaggregation (#168)
* Add support for aggregation pre-aggregated values (e.g. from the ISM V3 protocol). * Pre-aggregation integration test.
1 parent 7ea7b54 commit 7aa1f29

20 files changed

+622
-227
lines changed

src/main/java/com/arpnetworking/metrics/mad/Aggregator.java

Lines changed: 1 addition & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -220,11 +220,7 @@ public Optional<ImmutableSet<Statistic>> load(final String metric) throws Except
220220
@Override
221221
public Optional<ImmutableSet<Statistic>> load(final String metric) throws Exception {
222222
final Optional<ImmutableSet<Statistic>> statistics = _cachedSpecifiedStatistics.get(metric);
223-
if (statistics.isPresent()) {
224-
return Optional.of(computeDependentStatistics(statistics.get()));
225-
} else {
226-
return Optional.empty();
227-
}
223+
return statistics.map(statisticImmutableSet -> computeDependentStatistics(statisticImmutableSet));
228224
}
229225
});
230226
}

src/main/java/com/arpnetworking/metrics/mad/Bucket.java

Lines changed: 9 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -35,6 +35,7 @@
3535
import com.arpnetworking.tsdcore.model.PeriodicData;
3636
import com.arpnetworking.tsdcore.sinks.Sink;
3737
import com.google.common.cache.LoadingCache;
38+
import com.google.common.collect.ImmutableList;
3839
import com.google.common.collect.ImmutableMultimap;
3940
import com.google.common.collect.ImmutableSet;
4041
import com.google.common.collect.Maps;
@@ -76,16 +77,6 @@ public void close() {
7677
computeStatistics(_gaugeMetricCalculators, _specifiedGaugeStatistics, data);
7778
computeStatistics(_timerMetricCalculators, _specifiedTimerStatistics, data);
7879
computeStatistics(_explicitMetricCalculators, _specifiedStatisticsCache, data);
79-
// TODO(vkoskela): Perform expression evaluation here. [NEXT]
80-
// -> This still requires realizing and indexing the computed aggregated data
81-
// in order to feed the expression evaluation. Once the filtering is consolidated
82-
// we can probably just build a map here and then do one copy into immutable form
83-
// in the PeriodicData. This becomes feasible with consolidated filtering because
84-
// fewer copies (e.g. none) are made downstream.
85-
// TODO(vkoskela): Perform alert evaluation here. [NEXT]
86-
// -> This requires expressions. Otherwise, it's just a matter of changing the
87-
// alerts abstraction from a Sink to something more appropriate and hooking it in
88-
// here.
8980
final PeriodicData periodicData = ThreadLocalBuilder.build(
9081
PeriodicData.Builder.class,
9182
b -> b.setData(data.build())
@@ -114,10 +105,10 @@ public void add(final Record record) {
114105
final String name = entry.getKey();
115106
final Metric metric = entry.getValue();
116107

117-
if (metric.getValues().isEmpty()) {
108+
if (metric.getValues().isEmpty() && metric.getStatistics().isEmpty()) {
118109
LOGGER.debug()
119110
.setMessage("Discarding metric")
120-
.addData("reason", "no samples")
111+
.addData("reason", "no samples or statistics")
121112
.addData("name", name)
122113
.addData("metric", metric)
123114
.log();
@@ -320,14 +311,18 @@ private void addMetric(
320311
return;
321312
}
322313

323-
// Add the value to any accumulators
314+
// Add the metric data to any accumulators
324315
for (final Calculator<?> calculator : calculators) {
316+
final Statistic statistic = calculator.getStatistic();
325317
if (calculator instanceof Accumulator) {
326318
final Accumulator<?> accumulator = (Accumulator<?>) calculator;
327319
synchronized (accumulator) {
328320
for (final Quantity quantity : metric.getValues()) {
329321
accumulator.accumulate(quantity);
330322
}
323+
for (final CalculatedValue<?> value : metric.getStatistics().getOrDefault(statistic, ImmutableList.of())) {
324+
accumulator.accumulateAny(value);
325+
}
331326
}
332327
}
333328
}
@@ -351,6 +346,7 @@ private Collection<Calculator<?>> getOrCreateCalculators(
351346
newCalculators.add(statistic.createCalculator());
352347
}
353348
newCalculators.add(COUNT_STATISTIC.createCalculator());
349+
354350
calculators = calculatorsByMetric.putIfAbsent(name, newCalculators);
355351
if (calculators == null) {
356352
calculators = newCalculators;

src/main/java/com/arpnetworking/metrics/mad/model/DefaultMetric.java

Lines changed: 8 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -17,10 +17,13 @@
1717

1818
import com.arpnetworking.commons.builder.ThreadLocalBuilder;
1919
import com.arpnetworking.logback.annotations.LogValue;
20+
import com.arpnetworking.metrics.mad.model.statistics.Statistic;
2021
import com.arpnetworking.steno.LogValueMapFactory;
22+
import com.arpnetworking.tsdcore.model.CalculatedValue;
2123
import com.google.common.base.MoreObjects;
2224
import com.google.common.base.Objects;
2325
import com.google.common.collect.ImmutableList;
26+
import com.google.common.collect.ImmutableMap;
2427
import net.sf.oval.constraint.NotNull;
2528

2629
import java.util.List;
@@ -44,7 +47,7 @@ public List<Quantity> getValues() {
4447
}
4548

4649
@Override
47-
public List<AggregatedData> getStatistics() {
50+
public ImmutableMap<Statistic, ImmutableList<CalculatedValue<?>>> getStatistics() {
4851
return _statistics;
4952
}
5053

@@ -106,7 +109,7 @@ private DefaultMetric(final Builder builder) {
106109

107110
private final MetricType _type;
108111
private final ImmutableList<Quantity> _values;
109-
private final ImmutableList<AggregatedData> _statistics;
112+
private final ImmutableMap<Statistic, ImmutableList<CalculatedValue<?>>> _statistics;
110113

111114
/**
112115
* Implementation of builder pattern for {@link DefaultMetric}.
@@ -123,12 +126,12 @@ public Builder() {
123126
}
124127

125128
/**
126-
* The statistics {@code List}. Cannot be null.
129+
* The statistics {@code Map}. Cannot be null.
127130
*
128131
* @param value The values {@code List}.
129132
* @return This instance of {@link Builder}.
130133
*/
131-
public Builder setStatistics(final ImmutableList<AggregatedData> value) {
134+
public Builder setStatistics(final ImmutableMap<Statistic, ImmutableList<CalculatedValue<?>>> value) {
132135
_statistics = value;
133136
return this;
134137
}
@@ -162,7 +165,7 @@ protected void reset() {
162165
}
163166

164167
@NotNull
165-
private ImmutableList<AggregatedData> _statistics = ImmutableList.of();
168+
private ImmutableMap<Statistic, ImmutableList<CalculatedValue<?>>> _statistics = ImmutableMap.of();
166169
@NotNull
167170
private ImmutableList<Quantity> _values = ImmutableList.of();
168171
@NotNull

src/main/java/com/arpnetworking/metrics/mad/model/Metric.java

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,11 @@
1515
*/
1616
package com.arpnetworking.metrics.mad.model;
1717

18+
import com.arpnetworking.metrics.mad.model.statistics.Statistic;
19+
import com.arpnetworking.tsdcore.model.CalculatedValue;
20+
import com.google.common.collect.ImmutableList;
21+
import com.google.common.collect.ImmutableMap;
22+
1823
import java.util.List;
1924

2025
/**
@@ -43,5 +48,5 @@ public interface Metric {
4348
*
4449
* @return The collected statistical data.
4550
*/
46-
List<AggregatedData> getStatistics();
51+
ImmutableMap<Statistic, ImmutableList<CalculatedValue<?>>> getStatistics();
4752
}

src/main/java/com/arpnetworking/metrics/mad/model/statistics/Accumulator.java

Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -49,4 +49,19 @@ public interface Accumulator<T> extends Calculator<T> {
4949
* @return This <code>Accumulator</code>.
5050
*/
5151
Accumulator<T> accumulate(CalculatedValue<T> calculatedValue);
52+
53+
/**
54+
* Add the specified <code>CalculatedValue</code> to the accumulated value. The
55+
* <code>CalculatedValue</code> was produced by this <code>Accumulator</code> in
56+
* a different context. For example, for a different time period or a different
57+
* host. It is permissible to mix calls to accumulate with <code>Quantity</code>
58+
* and <code>CalculatedValue</code>.
59+
*
60+
* If the <code>CalculatedValue</code>'s supporting data is of an unsupported
61+
* type then an <code>IllegaglArgumentException</code> will be thrown.
62+
*
63+
* @param calculatedValue The <code>CalculatedValue</code> to include in the accumulated value.
64+
* @return This <code>Accumulator</code>.
65+
*/
66+
Accumulator<T> accumulateAny(CalculatedValue<?> calculatedValue);
5267
}

src/main/java/com/arpnetworking/metrics/mad/model/statistics/CountStatistic.java

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -76,6 +76,11 @@ public Accumulator<Void> accumulate(final Quantity quantity) {
7676

7777
@Override
7878
public Accumulator<Void> accumulate(final CalculatedValue<Void> calculatedValue) {
79+
return accumulateAny(calculatedValue);
80+
}
81+
82+
@Override
83+
public Accumulator<Void> accumulateAny(final CalculatedValue<?> calculatedValue) {
7984
_count += calculatedValue.getValue().getValue();
8085
return this;
8186
}

src/main/java/com/arpnetworking/metrics/mad/model/statistics/HistogramStatistic.java

Lines changed: 21 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -96,6 +96,27 @@ public Accumulator<HistogramSupportingData> accumulate(final CalculatedValue<His
9696
return this;
9797
}
9898

99+
@Override
100+
public Accumulator<HistogramSupportingData> accumulateAny(final CalculatedValue<?> calculatedValue) {
101+
if (calculatedValue.getData() == null) {
102+
throw new IllegalArgumentException(
103+
String.format(
104+
"Null calculated value data for %s",
105+
this.getClass()));
106+
}
107+
if (!(calculatedValue.getData() instanceof HistogramSupportingData)) {
108+
throw new IllegalArgumentException(
109+
String.format(
110+
"Unsupported calculated value data type %s for %s",
111+
calculatedValue.getData().getClass(),
112+
this.getClass()));
113+
}
114+
@SuppressWarnings("unchecked")
115+
final CalculatedValue<HistogramSupportingData> checkedCalculatedValue =
116+
(CalculatedValue<HistogramSupportingData>) calculatedValue;
117+
return accumulate(checkedCalculatedValue);
118+
}
119+
99120
@Override
100121
public CalculatedValue<HistogramSupportingData> calculate(final Map<Statistic, Calculator<?>> dependencies) {
101122
return ThreadLocalBuilder.<

src/main/java/com/arpnetworking/metrics/mad/model/statistics/MaxStatistic.java

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -94,6 +94,11 @@ public Accumulator<Void> accumulate(final Quantity quantity) {
9494

9595
@Override
9696
public Accumulator<Void> accumulate(final CalculatedValue<Void> calculatedValue) {
97+
return accumulateAny(calculatedValue);
98+
}
99+
100+
@Override
101+
public Accumulator<Void> accumulateAny(final CalculatedValue<?> calculatedValue) {
97102
return accumulate(calculatedValue.getValue());
98103
}
99104

src/main/java/com/arpnetworking/metrics/mad/model/statistics/MinStatistic.java

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -92,6 +92,11 @@ public Accumulator<Void> accumulate(final Quantity quantity) {
9292

9393
@Override
9494
public Accumulator<Void> accumulate(final CalculatedValue<Void> calculatedValue) {
95+
return accumulateAny(calculatedValue);
96+
}
97+
98+
@Override
99+
public Accumulator<Void> accumulateAny(final CalculatedValue<?> calculatedValue) {
95100
return accumulate(calculatedValue.getValue());
96101
}
97102

src/main/java/com/arpnetworking/metrics/mad/model/statistics/SumStatistic.java

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -77,6 +77,11 @@ public Accumulator<Void> accumulate(final Quantity quantity) {
7777

7878
@Override
7979
public Accumulator<Void> accumulate(final CalculatedValue<Void> calculatedValue) {
80+
return accumulateAny(calculatedValue);
81+
}
82+
83+
@Override
84+
public Accumulator<Void> accumulateAny(final CalculatedValue<?> calculatedValue) {
8085
return accumulate(calculatedValue.getValue());
8186
}
8287

0 commit comments

Comments
 (0)