Skip to content

Commit d166507

Browse files
orbordevjkoskela
andauthored
Track the minimum request timestamp aggregated into a MAD bucket, and push that to CAGG during flush (#188)
Co-authored-by: William Ehlhardt <[email protected]> Co-authored-by: Ville Koskela <[email protected]>
1 parent 9621815 commit d166507

File tree

10 files changed

+306
-9
lines changed

10 files changed

+306
-9
lines changed

src/main/java/com/arpnetworking/metrics/mad/Bucket.java

Lines changed: 17 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -76,7 +76,8 @@ public void close() {
7676
b -> b.setData(data.build())
7777
.setDimensions(_key)
7878
.setPeriod(_period)
79-
.setStart(_start));
79+
.setStart(_start)
80+
.setMinRequestTime(_minRequestTime.orElse(null)));
8081
_sink.recordAggregateData(periodicData);
8182
} else {
8283
LOGGER.warn()
@@ -107,6 +108,7 @@ public void add(final Record record) {
107108
}
108109

109110

111+
110112
for (final Map.Entry<String, ? extends Metric> entry : record.getMetrics().entrySet()) {
111113
final String name = entry.getKey();
112114
final Metric metric = entry.getValue();
@@ -178,6 +180,19 @@ public void add(final Record record) {
178180
}
179181
addMetric(metric, calculators);
180182
}
183+
184+
updateMinRequestTime(record);
185+
}
186+
187+
private void updateMinRequestTime(final Record record) {
188+
if (record.getRequestTime().isPresent()) {
189+
if (!_minRequestTime.isPresent()) {
190+
_minRequestTime = record.getRequestTime();
191+
}
192+
if (record.getRequestTime().get().isBefore(_minRequestTime.get())) {
193+
_minRequestTime = record.getRequestTime();
194+
}
195+
}
181196
}
182197

183198
public ZonedDateTime getStart() {
@@ -349,6 +364,7 @@ private Collection<Calculator<?>> getOrCreateCalculators(
349364
}
350365

351366
private boolean _isOpen = true;
367+
private Optional<ZonedDateTime> _minRequestTime = Optional.empty();
352368

353369
private final Map<String, Collection<Calculator<?>>> _counterMetricCalculators = Maps.newHashMap();
354370
private final Map<String, Collection<Calculator<?>>> _gaugeMetricCalculators = Maps.newHashMap();

src/main/java/com/arpnetworking/metrics/mad/model/DefaultRecord.java

Lines changed: 24 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,9 @@
2424
import net.sf.oval.constraint.NotNull;
2525

2626
import java.time.ZonedDateTime;
27+
import java.util.Optional;
28+
import javax.annotation.Nullable;
29+
2730

2831
/**
2932
* Default implementation of the <code>Record</code> interface.
@@ -49,6 +52,11 @@ public ZonedDateTime getTime() {
4952
return _time;
5053
}
5154

55+
@Override
56+
public Optional<ZonedDateTime> getRequestTime() {
57+
return _requestTime;
58+
}
59+
5260
@Override
5361
public ImmutableMap<String, ? extends Metric> getMetrics() {
5462
return _metrics;
@@ -101,13 +109,15 @@ private DefaultRecord(final Builder builder) {
101109
_metrics = builder._metrics;
102110
_id = builder._id;
103111
_time = builder._time;
112+
_requestTime = Optional.ofNullable(builder._requestTime);
104113
_annotations = builder._annotations;
105114
_dimensions = builder._dimensions;
106115
}
107116

108117
private final ImmutableMap<String, ? extends Metric> _metrics;
109118
private final String _id;
110119
private final ZonedDateTime _time;
120+
private final Optional<ZonedDateTime> _requestTime;
111121
private final ImmutableMap<String, String> _annotations;
112122
private final ImmutableMap<String, String> _dimensions;
113123

@@ -158,6 +168,17 @@ public Builder setTime(final ZonedDateTime value) {
158168
return this;
159169
}
160170

171+
/**
172+
* The timestamp at which the record was received. Can be null.
173+
*
174+
* @param value The timestamp.
175+
* @return This instance of <code>Builder</code>.
176+
*/
177+
public Builder setRequestTime(final ZonedDateTime value) {
178+
_requestTime = value;
179+
return this;
180+
}
181+
161182
/**
162183
* The annotations <code>ImmutableMap</code>. Optional. Default is an empty
163184
* <code>ImmutableMap</code>. Cannot be null.
@@ -189,6 +210,7 @@ protected void reset() {
189210
_time = null;
190211
_annotations = ImmutableMap.of();
191212
_dimensions = ImmutableMap.of();
213+
_requestTime = null;
192214
}
193215

194216
@NotNull
@@ -198,6 +220,8 @@ protected void reset() {
198220
private String _id;
199221
@NotNull
200222
private ZonedDateTime _time;
223+
@Nullable
224+
private ZonedDateTime _requestTime;
201225
@NotNull
202226
private ImmutableMap<String, String> _annotations = ImmutableMap.of();
203227
@NotNull

src/main/java/com/arpnetworking/metrics/mad/model/Record.java

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@
1818
import com.google.common.collect.ImmutableMap;
1919

2020
import java.time.ZonedDateTime;
21+
import java.util.Optional;
2122

2223
/**
2324
* The interface to a record. Records consistent of a timestamp, any number of
@@ -43,6 +44,13 @@ public interface Record {
4344
*/
4445
ZonedDateTime getTime();
4546

47+
/**
48+
* Gets the "received at" time stamp of the record.
49+
*
50+
* @return the time stamp
51+
*/
52+
Optional<ZonedDateTime> getRequestTime();
53+
4654
/**
4755
* Gets metrics.
4856
*

src/main/java/com/arpnetworking/metrics/mad/parsers/ProtobufV3ToRecordParser.java

Lines changed: 10 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -68,10 +68,16 @@ public List<Record> parse(final HttpRequest data) throws ParsingException {
6868
final long low = byteBuffer.getLong();
6969
records.add(ThreadLocalBuilder.build(
7070
DefaultRecord.Builder.class,
71-
builder -> builder.setId(new UUID(high, low).toString())
72-
.setTime(ZonedDateTime.ofInstant(Instant.ofEpochMilli(record.getEndMillisSinceEpoch()), ZoneOffset.UTC))
73-
.setDimensions(buildDimensions(record))
74-
.setMetrics(buildMetrics(record))));
71+
builder -> {
72+
builder.setId(new UUID(high, low).toString())
73+
.setTime(ZonedDateTime.ofInstant(Instant.ofEpochMilli(record.getEndMillisSinceEpoch()), ZoneOffset.UTC))
74+
.setDimensions(buildDimensions(record))
75+
.setMetrics(buildMetrics(record));
76+
if (record.getRequestMillisSinceEpoch() != 0) {
77+
builder.setRequestTime(
78+
ZonedDateTime.ofInstant(Instant.ofEpochMilli(record.getRequestMillisSinceEpoch()), ZoneOffset.UTC));
79+
}
80+
}));
7581
}
7682
return records;
7783
} catch (final InvalidProtocolBufferException e) {

src/main/java/com/arpnetworking/tsdcore/model/PeriodicData.java

Lines changed: 29 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,8 @@
2525

2626
import java.time.Duration;
2727
import java.time.ZonedDateTime;
28+
import java.util.Optional;
29+
import javax.annotation.Nullable;
2830

2931
/**
3032
* Contains the data for a specific period in time.
@@ -42,6 +44,10 @@ public ZonedDateTime getStart() {
4244
return _start;
4345
}
4446

47+
public Optional<ZonedDateTime> getMinRequestTime() {
48+
return _minRequestTime;
49+
}
50+
4551
public Key getDimensions() {
4652
return _dimensions;
4753
}
@@ -64,7 +70,8 @@ public boolean equals(final Object object) {
6470
return Objects.equal(_data, other._data)
6571
&& Objects.equal(_dimensions, other._dimensions)
6672
&& Objects.equal(_period, other._period)
67-
&& Objects.equal(_start, other._start);
73+
&& Objects.equal(_start, other._start)
74+
&& Objects.equal(_minRequestTime, other._minRequestTime);
6875
}
6976

7077
@Override
@@ -73,7 +80,8 @@ public int hashCode() {
7380
_data,
7481
_dimensions,
7582
_period,
76-
_start);
83+
_start,
84+
_minRequestTime);
7785
}
7886

7987
@Override
@@ -84,18 +92,21 @@ public String toString() {
8492
.add("Start", _start)
8593
.add("Dimensions", _dimensions)
8694
.add("Data", _data)
95+
.add("MinRequestTime", _minRequestTime)
8796
.toString();
8897
}
8998

9099
private PeriodicData(final Builder builder) {
91100
_period = builder._period;
92101
_start = builder._start;
102+
_minRequestTime = Optional.ofNullable(builder._minRequestTime);
93103
_dimensions = builder._dimensions;
94104
_data = builder._data;
95105
}
96106

97107
private final Duration _period;
98108
private final ZonedDateTime _start;
109+
private final Optional<ZonedDateTime> _minRequestTime;
99110
private final Key _dimensions;
100111
private final ImmutableMultimap<String, AggregatedData> _data;
101112

@@ -133,6 +144,18 @@ public Builder setStart(final ZonedDateTime value) {
133144
return this;
134145
}
135146

147+
/**
148+
* Set the earliest time at which data in this bucket was recorded. May be null.
149+
*
150+
* @param value The timestamp
151+
* @return This <code>Builder</code> instance.
152+
*/
153+
public Builder setMinRequestTime(@Nullable final ZonedDateTime value) {
154+
_minRequestTime = value;
155+
return this;
156+
}
157+
158+
136159
/**
137160
* Set the dimensions. Required. Cannot be null.
138161
*
@@ -159,6 +182,7 @@ public Builder setData(final ImmutableMultimap<String, AggregatedData> value) {
159182
protected void reset() {
160183
_period = null;
161184
_start = null;
185+
_minRequestTime = null;
162186
_dimensions = null;
163187
_data = ImmutableMultimap.of();
164188
}
@@ -167,6 +191,9 @@ protected void reset() {
167191
private Duration _period;
168192
@NotNull
169193
private ZonedDateTime _start;
194+
@Nullable
195+
private ZonedDateTime _minRequestTime;
196+
170197
@NotNull
171198
private Key _dimensions;
172199
@NotNull
Lines changed: 125 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,125 @@
1+
/*
2+
* Copyright 2020 Dropbox Inc.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
package com.arpnetworking.tsdcore.sinks;
17+
18+
import com.arpnetworking.metrics.aggregation.protocol.Messages;
19+
import com.arpnetworking.metrics.mad.model.AggregatedData;
20+
import com.arpnetworking.metrics.mad.model.statistics.HistogramStatistic;
21+
import com.arpnetworking.metrics.mad.model.statistics.Statistic;
22+
import com.arpnetworking.metrics.mad.model.statistics.StatisticFactory;
23+
import com.arpnetworking.tsdcore.model.AggregationMessage;
24+
import com.arpnetworking.tsdcore.model.PeriodicData;
25+
import com.google.protobuf.ByteString;
26+
27+
import java.util.Collection;
28+
import java.util.Map;
29+
import java.util.Objects;
30+
31+
/**
32+
* Utility class to serialize to metrics-aggregator-protocol protobuf messages.
33+
*
34+
* @author William Ehlhardt (whale at dropbox dot com)
35+
*/
36+
public final class MetricsDataSerializer {
37+
private static final StatisticFactory STATISTIC_FACTORY = new StatisticFactory();
38+
private static final Statistic EXPRESSION_STATISTIC = STATISTIC_FACTORY.getStatistic("expression");
39+
40+
/**
41+
* Serialize a metric's data to a StatisticSetRecord.
42+
*
43+
* @param periodicData Originating PeriodicData
44+
* @param metricName Name of metric being serialized.
45+
* @param data Recorded metric data to serialize.
46+
* @return StatisticSetRecord protobuf corresponding to the above.
47+
*/
48+
public static Messages.StatisticSetRecord serializeMetricData(
49+
final PeriodicData periodicData,
50+
final String metricName,
51+
final Collection<AggregatedData> data) {
52+
53+
// Create a statistic record set
54+
final Messages.StatisticSetRecord.Builder builder = Messages.StatisticSetRecord.newBuilder()
55+
.setMetric(metricName)
56+
.setPeriod(periodicData.getPeriod().toString())
57+
.setPeriodStart(periodicData.getStart().toString())
58+
.setClientMinimumRequestTime(periodicData.getMinRequestTime().map(t -> t.toString()).orElse(""))
59+
.putAllDimensions(periodicData.getDimensions().getParameters())
60+
.setCluster(periodicData.getDimensions().getCluster())
61+
.setService(periodicData.getDimensions().getService());
62+
63+
for (final AggregatedData datum : data) {
64+
if (Objects.equals(EXPRESSION_STATISTIC, datum.getStatistic())) {
65+
continue;
66+
}
67+
68+
final String unit;
69+
if (datum.getValue().getUnit().isPresent()) {
70+
// TODO(ville): The protocol needs to support compound units.
71+
unit = datum.getValue().getUnit().get().toString();
72+
} else {
73+
unit = "";
74+
}
75+
76+
final Messages.StatisticRecord.Builder entryBuilder = builder.addStatisticsBuilder()
77+
.setStatistic(datum.getStatistic().getName())
78+
.setValue(datum.getValue().getValue())
79+
.setUnit(unit)
80+
.setUserSpecified(datum.getIsSpecified());
81+
82+
final ByteString supportingData = serializeSupportingData(datum);
83+
if (supportingData != null) {
84+
entryBuilder.setSupportingData(supportingData);
85+
}
86+
entryBuilder.build();
87+
}
88+
89+
return builder.build();
90+
}
91+
92+
private static ByteString serializeSupportingData(final AggregatedData datum) {
93+
final Object data = datum.getSupportingData();
94+
final ByteString byteString;
95+
if (data instanceof HistogramStatistic.HistogramSupportingData) {
96+
final HistogramStatistic.HistogramSupportingData histogramSupportingData = (HistogramStatistic.HistogramSupportingData) data;
97+
final Messages.SparseHistogramSupportingData.Builder builder = Messages.SparseHistogramSupportingData.newBuilder();
98+
final HistogramStatistic.HistogramSnapshot histogram = histogramSupportingData.getHistogramSnapshot();
99+
final String unit;
100+
if (histogramSupportingData.getUnit().isPresent()) {
101+
// TODO(ville): The protocol needs to support compound units.
102+
unit = histogramSupportingData.getUnit().get().toString();
103+
} else {
104+
unit = "";
105+
}
106+
builder.setUnit(unit);
107+
108+
for (final Map.Entry<Double, Long> entry : histogram.getValues()) {
109+
builder.addEntriesBuilder()
110+
.setBucket(entry.getKey())
111+
.setCount(entry.getValue())
112+
.build();
113+
}
114+
byteString = ByteString.copyFrom(
115+
AggregationMessage.create(builder.build()).serializeToByteString().toArray());
116+
} else {
117+
return null;
118+
}
119+
return byteString;
120+
}
121+
122+
private MetricsDataSerializer() {
123+
throw new AssertionError("utility class should not be instantiated");
124+
}
125+
}

0 commit comments

Comments
 (0)