Skip to content

Commit 7389c58

Browse files
pabloemjordan-powerselasticsearchmachine
authored
Increase timeseries function (#134934)
* Implementation of increase function for timeseries applications * rebasing aigu * fixup * fixup * Test repro * cleanup and ready to go * Fix formatting in X-RateGroupingAggregatorFunction.java.st This was causing an infinite CI loop where the formatter and the code generator were fighting. * [CI] Update transport version definitions * adding capability for tsfn increase * comments and fixup * csvtests * fixup? * fixup * fixcsvtests --------- Co-authored-by: Jordan Powers <[email protected]> Co-authored-by: elasticsearchmachine <[email protected]>
1 parent dacd9a7 commit 7389c58

File tree

12 files changed

+584
-48
lines changed

12 files changed

+584
-48
lines changed

x-pack/plugin/esql/compute/src/main/generated-src/org/elasticsearch/compute/aggregation/RateDoubleGroupingAggregatorFunction.java

Lines changed: 25 additions & 8 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

x-pack/plugin/esql/compute/src/main/generated-src/org/elasticsearch/compute/aggregation/RateIntGroupingAggregatorFunction.java

Lines changed: 25 additions & 8 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

x-pack/plugin/esql/compute/src/main/generated-src/org/elasticsearch/compute/aggregation/RateLongGroupingAggregatorFunction.java

Lines changed: 25 additions & 8 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/aggregation/X-RateGroupingAggregatorFunction.java.st

Lines changed: 25 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -36,6 +36,13 @@ import java.util.List;
3636
public final class Rate$Type$GroupingAggregatorFunction implements GroupingAggregatorFunction {
3737

3838
public static final class FunctionSupplier implements AggregatorFunctionSupplier {
39+
// Overriding constructor to support isRateOverTime flag
40+
private final boolean isRateOverTime;
41+
42+
public FunctionSupplier(boolean isRateOverTime) {
43+
this.isRateOverTime = isRateOverTime;
44+
}
45+
3946
@Override
4047
public List<IntermediateStateDesc> nonGroupingIntermediateStateDesc() {
4148
throw new UnsupportedOperationException("non-grouping aggregator is not supported");
@@ -53,7 +60,7 @@ public final class Rate$Type$GroupingAggregatorFunction implements GroupingAggre
5360

5461
@Override
5562
public Rate$Type$GroupingAggregatorFunction groupingAggregator(DriverContext driverContext, List<Integer> channels) {
56-
return new Rate$Type$GroupingAggregatorFunction(channels, driverContext);
63+
return new Rate$Type$GroupingAggregatorFunction(channels, driverContext, isRateOverTime);
5764
}
5865

5966
@Override
@@ -74,11 +81,13 @@ public final class Rate$Type$GroupingAggregatorFunction implements GroupingAggre
7481
private final DriverContext driverContext;
7582
private final BigArrays bigArrays;
7683
private ObjectArray<ReducedState> reducedStates;
84+
private final boolean isRateOverTime;
7785

78-
public Rate$Type$GroupingAggregatorFunction(List<Integer> channels, DriverContext driverContext) {
86+
public Rate$Type$GroupingAggregatorFunction(List<Integer> channels, DriverContext driverContext, boolean isRateOverTime) {
7987
this.channels = channels;
8088
this.driverContext = driverContext;
8189
this.bigArrays = driverContext.bigArrays();
90+
this.isRateOverTime = isRateOverTime;
8291
ObjectArray<Buffer> buffers = driverContext.bigArrays().newObjectArray(256);
8392
try {
8493
this.reducedStates = driverContext.bigArrays().newObjectArray(256);
@@ -565,9 +574,9 @@ public final class Rate$Type$GroupingAggregatorFunction implements GroupingAggre
565574
}
566575
final double rate;
567576
if (evalContext instanceof TimeSeriesGroupingAggregatorEvaluationContext tsContext) {
568-
rate = extrapolateRate(state, tsContext.rangeStartInMillis(group), tsContext.rangeEndInMillis(group));
577+
rate = extrapolateRate(state, tsContext.rangeStartInMillis(group), tsContext.rangeEndInMillis(group), isRateOverTime);
569578
} else {
570-
rate = computeRateWithoutExtrapolate(state);
579+
rate = computeRateWithoutExtrapolate(state, isRateOverTime);
571580
}
572581
rates.appendDouble(rate);
573582
}
@@ -636,13 +645,17 @@ public final class Rate$Type$GroupingAggregatorFunction implements GroupingAggre
636645
}
637646
}
638647

639-
private static double computeRateWithoutExtrapolate(ReducedState state) {
648+
private static double computeRateWithoutExtrapolate(ReducedState state, boolean isRateOverTime) {
640649
assert state.samples >= 2 : "rate requires at least two samples; got " + state.samples;
641650
final long firstTS = state.intervals[state.intervals.length - 1].t2;
642651
final long lastTS = state.intervals[0].t1;
643652
double firstValue = state.intervals[state.intervals.length - 1].v2;
644653
double lastValue = state.intervals[0].v1 + state.resets;
645-
return (lastValue - firstValue) * 1000.0 / (lastTS - firstTS);
654+
if (isRateOverTime) {
655+
return (lastValue - firstValue) * 1000.0 / (lastTS - firstTS);
656+
} else {
657+
return lastValue - firstValue;
658+
}
646659
}
647660

648661
/**
@@ -654,7 +667,7 @@ public final class Rate$Type$GroupingAggregatorFunction implements GroupingAggre
654667
* We still extrapolate the rate in this case, but not all the way to the boundary, only by half of the average duration between
655668
* samples (which is our guess for where the series actually starts or ends).
656669
*/
657-
private static double extrapolateRate(ReducedState state, long rangeStart, long rangeEnd) {
670+
private static double extrapolateRate(ReducedState state, long rangeStart, long rangeEnd, boolean isRateOverTime) {
658671
assert state.samples >= 2 : "rate requires at least two samples; got " + state.samples;
659672
final long firstTS = state.intervals[state.intervals.length - 1].t2;
660673
final long lastTS = state.intervals[0].t1;
@@ -677,6 +690,10 @@ public final class Rate$Type$GroupingAggregatorFunction implements GroupingAggre
677690
}
678691
lastValue = lastValue + endGap * slope;
679692
}
680-
return (lastValue - firstValue) * 1000.0 / (rangeEnd - rangeStart);
693+
if (isRateOverTime) {
694+
return (lastValue - firstValue) * 1000.0 / (rangeEnd - rangeStart);
695+
} else {
696+
return lastValue - firstValue;
697+
}
681698
}
682699
}

0 commit comments

Comments
 (0)