Skip to content

Commit 97fd810

Browse files
Include unit support for time series rate aggregation (#96605)
Here we include support for the unit parameter to the time series rate aggregation. Parsing the field was already there, we just make use of it so that calculation of the rate is done using the right temporal unit, instead of just using seconds. Before this change the rate was calculated using the number of milliseconds at the denominator later scaled to seconds. Now we scale that interval to whatever unit the user provides as unit. Supported values include: second, minute, hour, day, week, month, quarter and year. We fix a bug too. Results were returned as value / time interval (milliseconds) while all tests were written as if result was returned as value / time interval (seconds). We need to multiply everything by 1000 (milliseconds in 1 second). Resolves #94630
1 parent 76b30eb commit 97fd810

File tree

7 files changed

+270
-34
lines changed

7 files changed

+270
-34
lines changed

docs/changelog/96605.yaml

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,6 @@
1+
pr: 96605
2+
summary: "Feature: include unit support for time series rate aggregation"
3+
area: TSDB
4+
type: enhancement
5+
issues:
6+
- 94630

server/src/main/java/org/elasticsearch/TransportVersion.java

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -132,15 +132,17 @@ private static TransportVersion registerTransportVersion(int id, String uniqueId
132132
public static final TransportVersion V_8_500_006 = registerTransportVersion(8_500_006, "7BB5621A-80AC-425F-BA88-75543C442F23");
133133
public static final TransportVersion V_8_500_007 = registerTransportVersion(8_500_007, "77261d43-4149-40af-89c5-7e71e0454fce");
134134
public static final TransportVersion V_8_500_008 = registerTransportVersion(8_500_008, "8884ab9d-94cd-4bac-aff8-01f2c394f47c");
135+
135136
public static final TransportVersion V_8_500_009 = registerTransportVersion(8_500_009, "35091358-fd41-4106-a6e2-d2a1315494c1");
136137
public static final TransportVersion V_8_500_010 = registerTransportVersion(8_500_010, "9818C628-1EEC-439B-B943-468F61460675");
137138
public static final TransportVersion V_8_500_011 = registerTransportVersion(8_500_011, "2209F28D-B52E-4BC4-9889-E780F291C32E");
138139
public static final TransportVersion V_8_500_012 = registerTransportVersion(8_500_012, "BB6F4AF1-A860-4FD4-A138-8150FFBE0ABD");
139140
public static final TransportVersion V_8_500_013 = registerTransportVersion(8_500_013, "f65b85ac-db5e-4558-a487-a1dde4f6a33a");
140141
public static final TransportVersion V_8_500_014 = registerTransportVersion(8_500_014, "D115A2E1-1739-4A02-AB7B-64F6EA157EFB");
142+
public static final TransportVersion V_8_500_015 = registerTransportVersion(8_500_015, "651216c9-d54f-4189-9fe1-48d82d276863");
141143

142144
private static class CurrentHolder {
143-
private static final TransportVersion CURRENT = findCurrent(V_8_500_014);
145+
private static final TransportVersion CURRENT = findCurrent(V_8_500_015);
144146

145147
// finds the pluggable current version, or uses the given fallback
146148
private static TransportVersion findCurrent(TransportVersion fallback) {

x-pack/plugin/analytics/src/main/java/org/elasticsearch/xpack/analytics/rate/InternalResetTrackingRate.java

Lines changed: 23 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,8 @@
77

88
package org.elasticsearch.xpack.analytics.rate;
99

10+
import org.elasticsearch.TransportVersion;
11+
import org.elasticsearch.common.Rounding;
1012
import org.elasticsearch.common.io.stream.StreamInput;
1113
import org.elasticsearch.common.io.stream.StreamOutput;
1214
import org.elasticsearch.search.DocValueFormat;
@@ -19,17 +21,21 @@
1921
import java.util.Comparator;
2022
import java.util.List;
2123
import java.util.Map;
24+
import java.util.Objects;
2225

2326
public class InternalResetTrackingRate extends InternalNumericMetricsAggregation.SingleValue implements Rate {
2427

2528
public static final String NAME = "rate_with_resets";
29+
private static final int MILLIS_IN_SECOND = 1_000;
2630

2731
private final double startValue;
2832
private final double endValue;
2933
private final long startTime;
3034
private final long endTime;
3135
private final double resetCompensation;
3236

37+
private final Rounding.DateTimeUnit rateUnit;
38+
3339
protected InternalResetTrackingRate(
3440
String name,
3541
DocValueFormat format,
@@ -38,14 +44,16 @@ protected InternalResetTrackingRate(
3844
double endValue,
3945
long startTime,
4046
long endTime,
41-
double resetCompensation
47+
double resetCompensation,
48+
Rounding.DateTimeUnit rateUnit
4249
) {
4350
super(name, format, metadata);
4451
this.startValue = startValue;
4552
this.endValue = endValue;
4653
this.startTime = startTime;
4754
this.endTime = endTime;
4855
this.resetCompensation = resetCompensation;
56+
this.rateUnit = Objects.requireNonNull(rateUnit);
4957
}
5058

5159
public InternalResetTrackingRate(StreamInput in) throws IOException {
@@ -55,6 +63,11 @@ public InternalResetTrackingRate(StreamInput in) throws IOException {
5563
this.startTime = in.readLong();
5664
this.endTime = in.readLong();
5765
this.resetCompensation = in.readDouble();
66+
if (in.getTransportVersion().onOrAfter(TransportVersion.V_8_500_015)) {
67+
this.rateUnit = Rounding.DateTimeUnit.resolve(in.readByte());
68+
} else {
69+
this.rateUnit = Rounding.DateTimeUnit.SECOND_OF_MINUTE;
70+
}
5871
}
5972

6073
@Override
@@ -69,6 +82,11 @@ protected void doWriteTo(StreamOutput out) throws IOException {
6982
out.writeLong(startTime);
7083
out.writeLong(endTime);
7184
out.writeDouble(resetCompensation);
85+
if (out.getTransportVersion().onOrAfter(TransportVersion.V_8_500_015) && rateUnit != null) {
86+
out.writeByte(rateUnit.getId());
87+
} else {
88+
out.writeByte(Rounding.DateTimeUnit.SECOND_OF_MINUTE.getId());
89+
}
7290
}
7391

7492
@Override
@@ -98,7 +116,8 @@ public InternalAggregation reduce(List<InternalAggregation> aggregations, Aggreg
98116
endValue,
99117
toReduce.get(0).startTime,
100118
toReduce.get(endIndex).endTime,
101-
resetComp
119+
resetComp,
120+
toReduce.get(0).rateUnit
102121
);
103122
}
104123

@@ -109,7 +128,8 @@ public XContentBuilder doXContentBody(XContentBuilder builder, Params params) th
109128

110129
@Override
111130
public double value() {
112-
return (endValue - startValue + resetCompensation) / (endTime - startTime);
131+
long rateUnitSeconds = rateUnit.getField().getBaseUnit().getDuration().toSeconds();
132+
return (endValue - startValue + resetCompensation) / (endTime - startTime) * MILLIS_IN_SECOND * rateUnitSeconds;
113133
}
114134

115135
@Override

x-pack/plugin/analytics/src/main/java/org/elasticsearch/xpack/analytics/rate/TimeSeriesRateAggregator.java

Lines changed: 6 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -45,6 +45,8 @@ public class TimeSeriesRateAggregator extends NumericMetricsAggregator.SingleVal
4545
private double currentStartValue = -1;
4646
private int currentTsid = -1;
4747

48+
private final Rounding.DateTimeUnit rateUnit;
49+
4850
// Unused parameters are so that the constructor implements `RateAggregatorSupplier`
4951
protected TimeSeriesRateAggregator(
5052
String name,
@@ -62,11 +64,12 @@ protected TimeSeriesRateAggregator(
6264
this.startTimes = bigArrays().newLongArray(1, true);
6365
this.endTimes = bigArrays().newLongArray(1, true);
6466
this.resetCompensations = bigArrays().newDoubleArray(1, true);
67+
this.rateUnit = rateUnit;
6568
}
6669

6770
@Override
6871
public InternalAggregation buildEmptyAggregation() {
69-
return new InternalResetTrackingRate(name, DocValueFormat.RAW, metadata(), 0, 0, 0, 0, 0);
72+
return new InternalResetTrackingRate(name, DocValueFormat.RAW, metadata(), 0, 0, 0, 0, 0, Rounding.DateTimeUnit.SECOND_OF_MINUTE);
7073
}
7174

7275
private void calculateLastBucket() {
@@ -138,7 +141,8 @@ public InternalResetTrackingRate buildAggregation(long owningBucketOrd) {
138141
endValues.get(owningBucketOrd),
139142
startTimes.get(owningBucketOrd),
140143
endTimes.get(owningBucketOrd),
141-
resetCompensations.get(owningBucketOrd)
144+
resetCompensations.get(owningBucketOrd),
145+
rateUnit == null ? Rounding.DateTimeUnit.SECOND_OF_MINUTE : rateUnit
142146
);
143147
}
144148

x-pack/plugin/analytics/src/test/java/org/elasticsearch/xpack/analytics/rate/InternalResetTrackingRateTests.java

Lines changed: 103 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,7 @@
77

88
package org.elasticsearch.xpack.analytics.rate;
99

10+
import org.elasticsearch.common.Rounding;
1011
import org.elasticsearch.common.util.CollectionUtils;
1112
import org.elasticsearch.plugins.SearchPlugin;
1213
import org.elasticsearch.search.aggregations.Aggregation;
@@ -28,39 +29,77 @@
2829

2930
public class InternalResetTrackingRateTests extends InternalAggregationTestCase<InternalResetTrackingRate> {
3031

32+
private static final int MILLIS_IN_SECOND = 1_000;
33+
private static final int SECONDS_IN_MINUTE = 60;
34+
private static final int MILLIS_IN_MINUTE = MILLIS_IN_SECOND * SECONDS_IN_MINUTE;
35+
private static final int MINUTES_IN_HOUR = 60;
36+
private static final int MILLIS_IN_HOUR = MILLIS_IN_SECOND * SECONDS_IN_MINUTE * MINUTES_IN_HOUR;
37+
private static final int HOURS_IN_DAY = 24;
38+
private static final int MILLIS_IN_DAY = MILLIS_IN_SECOND * SECONDS_IN_MINUTE * MINUTES_IN_HOUR * HOURS_IN_DAY;
39+
private static final int DAYS_IN_WEEK = 7;
40+
private static final int MILLIS_IN_WEEK = MILLIS_IN_SECOND * SECONDS_IN_MINUTE * MINUTES_IN_HOUR * HOURS_IN_DAY * DAYS_IN_WEEK;
41+
private static final int MONTHS_IN_QUARTER = 3;
42+
private static final int MONTHS_IN_YEAR = 12;
43+
3144
@Override
3245
protected SearchPlugin registerPlugin() {
3346
return new AnalyticsPlugin();
3447
}
3548

3649
@Override
3750
protected InternalResetTrackingRate createTestInstance(String name, Map<String, Object> metadata) {
38-
return new InternalResetTrackingRate(name, null, metadata, 0, 0, 0, 0, 0);
51+
return new InternalResetTrackingRate(name, null, metadata, 0, 0, 0, 0, 0, Rounding.DateTimeUnit.SECOND_OF_MINUTE);
3952
}
4053

41-
private static InternalResetTrackingRate rate(double startValue, double endValue, long startTime, long endTime, double resetComp) {
42-
return new InternalResetTrackingRate("n", null, null, startValue, endValue, startTime, endTime, resetComp);
54+
private static InternalResetTrackingRate rate(
55+
double startValue,
56+
double endValue,
57+
long startTime,
58+
long endTime,
59+
double resetComp,
60+
Rounding.DateTimeUnit rateUnit
61+
) {
62+
return new InternalResetTrackingRate("n", null, null, startValue, endValue, startTime, endTime, resetComp, rateUnit);
4363
}
4464

45-
public void testReduction() {
46-
List<InternalAggregation> rates = List.of(
47-
rate(0, 10, 1000, 2000, 0),
48-
rate(10, 20, 2000, 3000, 0),
49-
rate(20, 5, 3000, 4000, 25), // internal reset
50-
rate(5, 15, 4000, 5000, 0),
51-
rate(0, 10, 5000, 6000, 0) // cross-boundary reset
52-
);
53-
InternalAggregation reduced = rates.get(0).reduce(rates, null);
54-
assertThat(reduced, instanceOf(Rate.class));
55-
assertThat(((Rate) reduced).getValue(), equalTo(0.01));
65+
public void testReductionSecond() {
66+
testReduction(Rounding.DateTimeUnit.SECOND_OF_MINUTE, 0.01 * MILLIS_IN_SECOND);
67+
}
68+
69+
public void testReductionMinute() {
70+
testReduction(Rounding.DateTimeUnit.MINUTES_OF_HOUR, 0.01 * MILLIS_IN_MINUTE);
71+
}
72+
73+
public void testReductionHour() {
74+
testReduction(Rounding.DateTimeUnit.HOUR_OF_DAY, 0.01 * MILLIS_IN_HOUR);
75+
}
76+
77+
public void testReductionDay() {
78+
testReduction(Rounding.DateTimeUnit.DAY_OF_MONTH, 0.01 * MILLIS_IN_DAY);
79+
}
80+
81+
public void testReductionWeek() {
82+
testReduction(Rounding.DateTimeUnit.WEEK_OF_WEEKYEAR, 0.01 * MILLIS_IN_WEEK);
83+
}
84+
85+
public void testReductionMonth() {
86+
testReduction(Rounding.DateTimeUnit.MONTH_OF_YEAR, 26297.46 * MILLIS_IN_SECOND);
87+
}
88+
89+
public void testReductionQuarter() {
90+
testReduction(Rounding.DateTimeUnit.QUARTER_OF_YEAR, 26297.46 * MILLIS_IN_SECOND * MONTHS_IN_QUARTER);
91+
}
92+
93+
public void testReductionYear() {
94+
testReduction(Rounding.DateTimeUnit.YEAR_OF_CENTURY, 26297.46 * MILLIS_IN_SECOND * MONTHS_IN_YEAR);
5695
}
5796

5897
@Override
5998
protected void assertReduced(InternalResetTrackingRate reduced, List<InternalResetTrackingRate> inputs) {
6099
for (InternalResetTrackingRate input : inputs) {
61-
assertEquals(0.01f, input.getValue(), 0.001);
100+
assertEquals(0.01f * MILLIS_IN_SECOND, input.getValue(), 0.01);
62101
}
63-
assertEquals(0.01f, reduced.getValue(), 0.001);
102+
assertEquals(0.01f * MILLIS_IN_SECOND, reduced.getValue(), 0.01);
64103
}
65104

66105
// Buckets must always be in-order so that we can detect resets between consecutive buckets
@@ -88,7 +127,7 @@ protected BuilderAndToReduce<InternalResetTrackingRate> randomResultsToReduce(St
88127
currentValue = 0;
89128
}
90129
if (randomInt(45) == 0) {
91-
internalRates.add(rate(startValue, currentValue, startTime, endTime, resetComp));
130+
internalRates.add(rate(startValue, currentValue, startTime, endTime, resetComp, Rounding.DateTimeUnit.SECOND_OF_MINUTE));
92131
startValue = currentValue;
93132
resetComp = 0;
94133
startTime = endTime;
@@ -98,7 +137,7 @@ protected BuilderAndToReduce<InternalResetTrackingRate> randomResultsToReduce(St
98137
endTime += 1000;
99138
currentValue += 10;
100139
}
101-
internalRates.add(rate(startValue, currentValue, startTime, endTime, resetComp));
140+
internalRates.add(rate(startValue, currentValue, startTime, endTime, resetComp, Rounding.DateTimeUnit.SECOND_OF_MINUTE));
102141
return new BuilderAndToReduce<>(mock(RateAggregationBuilder.class), internalRates);
103142
}
104143

@@ -119,12 +158,42 @@ protected List<NamedXContentRegistry.Entry> getNamedXContents() {
119158
}
120159

121160
public void testIncludes() {
122-
InternalResetTrackingRate big = new InternalResetTrackingRate("n", null, null, 0, 0, 1000, 3000, 0);
123-
InternalResetTrackingRate small = new InternalResetTrackingRate("n", null, null, 0, 0, 1500, 2500, 0);
161+
InternalResetTrackingRate big = new InternalResetTrackingRate(
162+
"n",
163+
null,
164+
null,
165+
0,
166+
0,
167+
1000,
168+
3000,
169+
0,
170+
Rounding.DateTimeUnit.SECOND_OF_MINUTE
171+
);
172+
InternalResetTrackingRate small = new InternalResetTrackingRate(
173+
"n",
174+
null,
175+
null,
176+
0,
177+
0,
178+
1500,
179+
2500,
180+
0,
181+
Rounding.DateTimeUnit.SECOND_OF_MINUTE
182+
);
124183
assertTrue(big.includes(small));
125184
assertFalse(small.includes(big));
126185

127-
InternalResetTrackingRate unrelated = new InternalResetTrackingRate("n", null, null, 0, 0, 100000, 1000010, 0);
186+
InternalResetTrackingRate unrelated = new InternalResetTrackingRate(
187+
"n",
188+
null,
189+
null,
190+
0,
191+
0,
192+
100000,
193+
1000010,
194+
0,
195+
Rounding.DateTimeUnit.SECOND_OF_MINUTE
196+
);
128197
assertFalse(big.includes(unrelated));
129198
assertFalse(unrelated.includes(big));
130199
assertFalse(small.includes(unrelated));
@@ -135,4 +204,17 @@ public void testIncludes() {
135204
protected InternalResetTrackingRate mutateInstance(InternalResetTrackingRate instance) {
136205
return null;// TODO implement https://github.com/elastic/elasticsearch/issues/25929
137206
}
207+
208+
private static void testReduction(final Rounding.DateTimeUnit dateTimeUnit, double operand) {
209+
List<InternalAggregation> rates = List.of(
210+
rate(0, 10, 1000, 2000, 0, dateTimeUnit),
211+
rate(10, 20, 2000, 3000, 0, dateTimeUnit),
212+
rate(20, 5, 3000, 4000, 25, dateTimeUnit), // internal reset
213+
rate(5, 15, 4000, 5000, 0, dateTimeUnit),
214+
rate(0, 10, 5000, 6000, 0, dateTimeUnit) // cross-boundary reset
215+
);
216+
InternalAggregation reduced = rates.get(0).reduce(rates, null);
217+
assertThat(reduced, instanceOf(Rate.class));
218+
assertThat(((Rate) reduced).getValue(), equalTo(operand));
219+
}
138220
}

x-pack/plugin/analytics/src/test/java/org/elasticsearch/xpack/analytics/rate/TimeSeriesRateAggregatorTests.java

Lines changed: 14 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -40,6 +40,8 @@
4040

4141
public class TimeSeriesRateAggregatorTests extends AggregatorTestCase {
4242

43+
private static final int MILLIS_IN_SECOND = 1_000;
44+
4345
@Override
4446
protected List<SearchPlugin> getSearchPlugins() {
4547
return List.of(new AggregationsPlugin(), new AnalyticsPlugin());
@@ -51,8 +53,14 @@ public void testSimple() throws IOException {
5153
tsBuilder.subAggregation(builder);
5254
Consumer<InternalTimeSeries> verifier = r -> {
5355
assertThat(r.getBuckets(), hasSize(2));
54-
assertThat(((Rate) r.getBucketByKey("{dim=1}").getAggregations().asList().get(0)).getValue(), closeTo(59.0 / 3000.0, 0.00001));
55-
assertThat(((Rate) r.getBucketByKey("{dim=2}").getAggregations().asList().get(0)).getValue(), closeTo(206.0 / 4000.0, 0.00001));
56+
assertThat(
57+
((Rate) r.getBucketByKey("{dim=1}").getAggregations().asList().get(0)).getValue(),
58+
closeTo(59.0 / 3000.0 * MILLIS_IN_SECOND, 0.00001)
59+
);
60+
assertThat(
61+
((Rate) r.getBucketByKey("{dim=2}").getAggregations().asList().get(0)).getValue(),
62+
closeTo(206.0 / 4000.0 * MILLIS_IN_SECOND, 0.00001)
63+
);
5664
};
5765
AggTestConfig aggTestConfig = new AggTestConfig(tsBuilder, timeStampField(), counterField("counter_field"))
5866
.withSplitLeavesIntoSeperateAggregators(false);
@@ -77,20 +85,20 @@ public void testNestedWithinDateHistogram() throws IOException {
7785
InternalDateHistogram hb = r.getBucketByKey("{dim=1}").getAggregations().get("date");
7886
{
7987
Rate rate = hb.getBuckets().get(1).getAggregations().get("counter_field");
80-
assertThat(rate.getValue(), closeTo((60 - 37 + 14) / 2000.0, 0.00001));
88+
assertThat(rate.getValue(), closeTo((60 - 37 + 14) / 2000.0 * MILLIS_IN_SECOND, 0.00001));
8189
}
8290
{
8391
Rate rate = hb.getBuckets().get(0).getAggregations().get("counter_field");
84-
assertThat(rate.getValue(), closeTo((37 - 15) / 1000.0, 0.00001));
92+
assertThat(rate.getValue(), closeTo((37 - 15) / 1000.0 * MILLIS_IN_SECOND, 0.00001));
8593
}
8694
hb = r.getBucketByKey("{dim=2}").getAggregations().get("date");
8795
{
8896
Rate rate = hb.getBuckets().get(0).getAggregations().get("counter_field");
89-
assertThat(rate.getValue(), closeTo((150 - 74) / 1000.0, 0.00001));
97+
assertThat(rate.getValue(), closeTo((150 - 74) / 1000.0 * MILLIS_IN_SECOND, 0.00001));
9098
}
9199
{
92100
Rate rate = hb.getBuckets().get(1).getAggregations().get("counter_field");
93-
assertThat(rate.getValue(), closeTo(90 / 2000.0, 0.00001));
101+
assertThat(rate.getValue(), closeTo(90 / 2000.0 * MILLIS_IN_SECOND, 0.00001));
94102
}
95103
};
96104

0 commit comments

Comments
 (0)