Skip to content

Commit e40d6dc

Browse files
authored
Fix and improve periodic statistics (#107)
* Port the PeriodicStatisticsSink updates from MAD and fix populationCount ingestion and computation. * Update jdk-wrapper to version 0.13.1 * Added dimension defaulting like in MAD. * Code review feedback.
1 parent b245ed9 commit e40d6dc

File tree

11 files changed

+875
-136
lines changed

11 files changed

+875
-136
lines changed

.jdkw

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -2,4 +2,3 @@ JDKW_RELEASE=latest
22
JDKW_DIST=zulu
33
JDKW_BUILD=8.27.0.7
44
JDKW_VERSION=8.0.162
5-
JDKW_VERBOSE=true

pom.xml

Lines changed: 26 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -133,10 +133,10 @@
133133
<logback.version>1.2.3</logback.version>
134134
<logback.steno.version>1.18.0</logback.steno.version>
135135
<log4j.over.slf4j.version>1.7.22</log4j.over.slf4j.version>
136-
<metrics.client.version>0.10.0</metrics.client.version>
137-
<metrics.jvm.extra.version>0.9.0</metrics.jvm.extra.version>
138-
<metrics.http.extra.version>0.9.1</metrics.http.extra.version>
139-
<metrics.aggregator.protocol.version>1.0.4</metrics.aggregator.protocol.version>
136+
<metrics.client.version>0.11.1</metrics.client.version>
137+
<metrics.jvm.extra.version>0.11.0</metrics.jvm.extra.version>
138+
<metrics.http.extra.version>0.11.1</metrics.http.extra.version>
139+
<metrics.aggregator.protocol.version>1.0.6</metrics.aggregator.protocol.version>
140140
<mockito.version>2.12.0</mockito.version>
141141
<netty.version>3.10.3.Final</netty.version>
142142
<netty.all.version>4.0.21.Final</netty.all.version>
@@ -149,8 +149,8 @@
149149
<scala.version>2.12</scala.version>
150150
<scala.java.compat.version>0.8.0</scala.java.compat.version>
151151
<scala.library.version>2.12.3</scala.library.version>
152-
<slf4j.version>1.7.25</slf4j.version>
153152
<signalfx.protoc.version>0.0.23</signalfx.protoc.version>
153+
<slf4j.version>1.7.25</slf4j.version>
154154
<typesafe.config.version>1.3.1</typesafe.config.version>
155155
<typesafe.ssl.config.version>0.2.2</typesafe.ssl.config.version>
156156
<wiremock.version>2.12.0</wiremock.version>
@@ -524,16 +524,37 @@
524524
<groupId>com.arpnetworking.metrics</groupId>
525525
<artifactId>metrics-client</artifactId>
526526
<version>${metrics.client.version}</version>
527+
<exclusions>
528+
<!-- TODO(ville): Remove suppression once findbugs to spotbugs migration is complete in CAGG -->
529+
<exclusion>
530+
<groupId>com.github.spotbugs</groupId>
531+
<artifactId>spotbugs-annotations</artifactId>
532+
</exclusion>
533+
</exclusions>
527534
</dependency>
528535
<dependency>
529536
<groupId>com.arpnetworking.metrics.extras</groupId>
530537
<artifactId>apache-http-sink-extra</artifactId>
531538
<version>${metrics.http.extra.version}</version>
539+
<exclusions>
540+
<!-- TODO(ville): Remove suppression once findbugs to spotbugs migration is complete in CAGG -->
541+
<exclusion>
542+
<groupId>com.github.spotbugs</groupId>
543+
<artifactId>spotbugs-annotations</artifactId>
544+
</exclusion>
545+
</exclusions>
532546
</dependency>
533547
<dependency>
534548
<groupId>com.arpnetworking.metrics.extras</groupId>
535549
<artifactId>jvm-extra</artifactId>
536550
<version>${metrics.jvm.extra.version}</version>
551+
<exclusions>
552+
<!-- TODO(ville): Remove suppression once findbugs to spotbugs migration is complete in CAGG -->
553+
<exclusion>
554+
<groupId>com.github.spotbugs</groupId>
555+
<artifactId>spotbugs-annotations</artifactId>
556+
</exclusion>
557+
</exclusions>
537558
</dependency>
538559
<!-- Google -->
539560
<dependency>

src/main/java/com/arpnetworking/clusteraggregator/GuiceModule.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -147,7 +147,7 @@ private Config provideAkkaConfig() {
147147
private MetricsFactory provideMetricsFactory() throws URISyntaxException {
148148
final Sink sink = new ApacheHttpSink.Builder()
149149
.setUri(new URI(String.format(
150-
"http://%s:%d/metrics/v2/application",
150+
"http://%s:%d/metrics/v3/application",
151151
_configuration.getMonitoringHost(),
152152
_configuration.getMonitoringPort())))
153153
.build();

src/main/java/com/arpnetworking/clusteraggregator/aggregation/StreamingAggregator.java

Lines changed: 5 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -130,13 +130,16 @@ public Receive createReceive() {
130130
})
131131
.match(BucketCheck.class, message -> {
132132
if (_initialized) {
133-
while (_aggBuckets.size() > 0) {
133+
while (!_aggBuckets.isEmpty()) {
134134
final StreamingAggregationBucket bucket = _aggBuckets.getFirst();
135135
if (bucket.getPeriodStart().plus(_period).plus(_aggregatorTimeout).isBeforeNow()) {
136136
_aggBuckets.removeFirst();
137137

138138
// Walk over every statistic in the bucket
139139
final Map<Statistic, CalculatedValue<?>> values = bucket.compute();
140+
final long populationSize = CombinedMetricData.computePopulationSizeFromCalculatedValues(
141+
_metric,
142+
values);
140143
final ImmutableList.Builder<AggregatedData> builder = ImmutableList.builder();
141144
for (final Map.Entry<Statistic, CalculatedValue<?>> entry : values.entrySet()) {
142145
_statistics.add(entry.getKey());
@@ -151,7 +154,7 @@ public Receive createReceive() {
151154
.setStart(bucket.getPeriodStart())
152155
.setValue(entry.getValue().getValue())
153156
.setSupportingData(entry.getValue().getData())
154-
.setPopulationSize(0L)
157+
.setPopulationSize(populationSize)
155158
.setIsSpecified(bucket.isSpecified(entry.getKey()))
156159
.build();
157160
LOGGER.debug()

src/main/java/com/arpnetworking/clusteraggregator/client/AggClientConnection.java

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -253,6 +253,10 @@ private Optional<PeriodicData> buildPeriodicData(final Messages.StatisticSetReco
253253

254254
final ImmutableMap<String, String> dimensions = dimensionBuilder.build();
255255

256+
final long populationSize = CombinedMetricData.computePopulationSize(
257+
setRecord.getMetric(),
258+
combinedMetricData.getCalculatedValues());
259+
256260
for (final Map.Entry<Statistic, CombinedMetricData.StatisticValue> record
257261
: combinedMetricData.getCalculatedValues().entrySet()) {
258262
final AggregatedData aggregatedData = new AggregatedData.Builder()
@@ -265,7 +269,7 @@ private Optional<PeriodicData> buildPeriodicData(final Messages.StatisticSetReco
265269
.setHost(host.get())
266270
.setIsSpecified(record.getValue().getUserSpecified())
267271
.setPeriod(combinedMetricData.getPeriod())
268-
.setPopulationSize(1L)
272+
.setPopulationSize(populationSize)
269273
.setSamples(Collections.emptyList())
270274
.setStart(combinedMetricData.getPeriodStart())
271275
.setSupportingData(record.getValue().getValue().getData())

src/main/java/com/arpnetworking/clusteraggregator/client/HttpSourceActor.java

Lines changed: 7 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -244,7 +244,7 @@ private AggregationRequest parseRecords(final com.arpnetworking.clusteraggregato
244244
}
245245
}
246246
final ImmutableList<AggregationMessage> records = recordsBuilder.build();
247-
if (records.size() == 0) {
247+
if (records.isEmpty()) {
248248
throw new NoRecordsException();
249249
}
250250
return new AggregationRequest.Builder()
@@ -286,6 +286,10 @@ private Optional<PeriodicData> buildPeriodicData(final Messages.StatisticSetReco
286286

287287
final ImmutableMap<String, String> dimensions = dimensionBuilder.build();
288288

289+
final long populationSize = CombinedMetricData.computePopulationSize(
290+
setRecord.getMetric(),
291+
combinedMetricData.getCalculatedValues());
292+
289293
for (final Map.Entry<Statistic, CombinedMetricData.StatisticValue> record
290294
: combinedMetricData.getCalculatedValues().entrySet()) {
291295
final AggregatedData aggregatedData = new AggregatedData.Builder()
@@ -298,7 +302,7 @@ private Optional<PeriodicData> buildPeriodicData(final Messages.StatisticSetReco
298302
.setHost(host.get())
299303
.setIsSpecified(record.getValue().getUserSpecified())
300304
.setPeriod(combinedMetricData.getPeriod())
301-
.setPopulationSize(1L)
305+
.setPopulationSize(populationSize)
302306
.setSamples(Collections.emptyList())
303307
.setStart(combinedMetricData.getPeriodStart())
304308
.setSupportingData(record.getValue().getValue().getData())
@@ -321,7 +325,7 @@ private Optional<PeriodicData> buildPeriodicData(final Messages.StatisticSetReco
321325
private final Graph<FlowShape<HttpRequest, AggregationRequest>, NotUsed> _processGraph;
322326

323327
private static final Logger BAD_REQUEST_LOGGER =
324-
LoggerFactory.getRateLimitLogger(HttpSourceActor.class, Duration.ofSeconds(30));
328+
LoggerFactory.getRateLimitLogger(HttpSourceActor.class, Duration.ofSeconds(30));
325329

326330

327331
private static class NoRecordsException extends IOException {

src/main/java/com/arpnetworking/clusteraggregator/http/Routes.java

Lines changed: 1 addition & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -37,7 +37,6 @@
3737
import com.arpnetworking.metrics.Metrics;
3838
import com.arpnetworking.metrics.MetricsFactory;
3939
import com.arpnetworking.metrics.Timer;
40-
import com.arpnetworking.metrics.Units;
4140
import com.arpnetworking.steno.LogBuilder;
4241
import com.arpnetworking.steno.Logger;
4342
import com.arpnetworking.steno.LoggerFactory;
@@ -104,8 +103,7 @@ public CompletionStage<HttpResponse> apply(final HttpRequest request) {
104103
final Timer timer = metrics.createTimer(createMetricName(request, REQUEST_METRIC));
105104
metrics.setGauge(
106105
createMetricName(request, BODY_SIZE_METRIC),
107-
request.entity().getContentLengthOption().orElse(0L),
108-
Units.BYTE);
106+
request.entity().getContentLengthOption().orElse(0L));
109107
final UUID requestId = UUID.randomUUID();
110108
if (LOGGER.isTraceEnabled()) {
111109
LOGGER.trace()

src/main/java/com/arpnetworking/clusteraggregator/models/CombinedMetricData.java

Lines changed: 75 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -28,13 +28,16 @@
2828
import com.arpnetworking.tsdcore.statistics.Statistic;
2929
import com.arpnetworking.tsdcore.statistics.StatisticFactory;
3030
import com.google.common.base.Strings;
31+
import com.google.common.collect.ImmutableMap;
3132
import com.google.common.collect.Maps;
3233
import net.sf.oval.constraint.NotNull;
3334
import org.joda.time.DateTime;
3435
import org.joda.time.Period;
3536

37+
import java.time.Duration;
3638
import java.util.Map;
3739
import java.util.Optional;
40+
import javax.annotation.Nullable;
3841

3942
/**
4043
* A metric-based aggregation model. Holds all of the statistics and supporting for a metric on a host.
@@ -80,6 +83,72 @@ public String getCluster() {
8083
return _cluster;
8184
}
8285

86+
/**
87+
* Compute the population size for a metric from its statistic values.
88+
*
89+
* @param metricName the name of the metric
90+
* @param statisticValues the statistic values for the metric.
91+
* @return requires {@link com.arpnetworking.tsdcore.statistics.CountStatistic}
92+
* or {@link HistogramStatistic} to return an accurate count otherwise
93+
* returns 1
94+
*/
95+
public static long computePopulationSize(
96+
final String metricName,
97+
final Map<Statistic, CombinedMetricData.StatisticValue> statisticValues) {
98+
final ImmutableMap.Builder<Statistic, CalculatedValue<?>> requiredCalculatedValues = ImmutableMap.builder();
99+
@Nullable final CombinedMetricData.StatisticValue countStatisticValue = statisticValues.get(COUNT_STATISTIC);
100+
if (countStatisticValue != null) {
101+
requiredCalculatedValues.put(COUNT_STATISTIC, countStatisticValue.getValue());
102+
}
103+
@Nullable final CombinedMetricData.StatisticValue histogramStatisticValue = statisticValues.get(HISTOGRAM_STATISTIC);
104+
if (countStatisticValue != null) {
105+
requiredCalculatedValues.put(HISTOGRAM_STATISTIC, histogramStatisticValue.getValue());
106+
}
107+
return computePopulationSizeFromCalculatedValues(
108+
metricName,
109+
requiredCalculatedValues.build());
110+
}
111+
112+
/**
113+
* Compute the population size for a metric from its calculated values.
114+
*
115+
* @param metricName the name of the metric
116+
* @param calculatedValues the calculated values for the metric.
117+
* @return requires {@link com.arpnetworking.tsdcore.statistics.CountStatistic}
118+
* or {@link HistogramStatistic} to return an accurate count otherwise
119+
* returns 1
120+
*/
121+
public static long computePopulationSizeFromCalculatedValues(
122+
final String metricName,
123+
final Map<Statistic, CalculatedValue<?>> calculatedValues) {
124+
// Compute the population size either via the count statistic or
125+
// via histogram bin counting.
126+
@Nullable final CalculatedValue<?> histogramCalculatedValue = calculatedValues.get(HISTOGRAM_STATISTIC);
127+
@Nullable final CalculatedValue<?> countCalculatedValue = calculatedValues.get(COUNT_STATISTIC);
128+
129+
// Prefer using the histogram since it's value is always accurate
130+
if (histogramCalculatedValue != null) {
131+
if (histogramCalculatedValue.getData() instanceof HistogramStatistic.HistogramSupportingData) {
132+
final HistogramStatistic.HistogramSupportingData supportingData =
133+
(HistogramStatistic.HistogramSupportingData) histogramCalculatedValue.getData();
134+
return supportingData.getHistogramSnapshot().getEntriesCount();
135+
}
136+
}
137+
138+
// Fallback to using the count since it's value is a double
139+
if (countCalculatedValue != null) {
140+
return (long) countCalculatedValue.getValue().getValue();
141+
}
142+
143+
// Take some backwards compatible default behavior, but let's log that
144+
// it's a problem.
145+
NO_POPULATION_SIZE_LOGGER.warn()
146+
.setMessage("Unable to compute population size")
147+
.addData("metric", metricName)
148+
.log();
149+
return 1L;
150+
}
151+
83152
/**
84153
* The key used in dimensions to specify the host-dimension.
85154
*/
@@ -100,6 +169,12 @@ public String getCluster() {
100169
private final String _service;
101170
private final String _cluster;
102171

172+
private static final StatisticFactory STATISTIC_FACTORY = new StatisticFactory();
173+
private static final Statistic COUNT_STATISTIC = STATISTIC_FACTORY.getStatistic("count");
174+
private static final Statistic HISTOGRAM_STATISTIC = STATISTIC_FACTORY.getStatistic("histogram");
175+
private static final Logger NO_POPULATION_SIZE_LOGGER =
176+
LoggerFactory.getRateLimitLogger(CombinedMetricData.class, Duration.ofSeconds(30));
177+
103178
/**
104179
* Implementation of builder pattern for {@link CombinedMetricData}.
105180
*

0 commit comments

Comments
 (0)