From 5eca698d74fd68a3d830a211d232386c4070500e Mon Sep 17 00:00:00 2001 From: Nhat Nguyen Date: Thu, 3 Apr 2025 17:20:30 -0700 Subject: [PATCH 1/6] Extrapolate rate aggregation --- .../aggregation/RateDoubleAggregator.java | 99 +++++++++++++++---- .../aggregation/RateFloatAggregator.java | 99 +++++++++++++++---- .../aggregation/RateIntAggregator.java | 99 +++++++++++++++---- .../aggregation/RateLongAggregator.java | 99 +++++++++++++++---- .../RateDoubleGroupingAggregatorFunction.java | 12 ++- .../RateFloatGroupingAggregatorFunction.java | 12 ++- .../RateIntGroupingAggregatorFunction.java | 12 ++- .../RateLongGroupingAggregatorFunction.java | 12 ++- ...esGroupingAggregatorEvaluationContext.java | 20 ++++ .../aggregation/X-RateAggregator.java.st | 99 +++++++++++++++---- .../operator/HashAggregationOperator.java | 8 +- .../TimeSeriesAggregationOperator.java | 24 +++++ 12 files changed, 476 insertions(+), 119 deletions(-) create mode 100644 x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/aggregation/TimeSeriesGroupingAggregatorEvaluationContext.java diff --git a/x-pack/plugin/esql/compute/src/main/generated-src/org/elasticsearch/compute/aggregation/RateDoubleAggregator.java b/x-pack/plugin/esql/compute/src/main/generated-src/org/elasticsearch/compute/aggregation/RateDoubleAggregator.java index e412ec91bf5ee..f3c5886b1009d 100644 --- a/x-pack/plugin/esql/compute/src/main/generated-src/org/elasticsearch/compute/aggregation/RateDoubleAggregator.java +++ b/x-pack/plugin/esql/compute/src/main/generated-src/org/elasticsearch/compute/aggregation/RateDoubleAggregator.java @@ -35,6 +35,7 @@ value = { @IntermediateState(name = "timestamps", type = "LONG_BLOCK"), @IntermediateState(name = "values", type = "DOUBLE_BLOCK"), + @IntermediateState(name = "sampleCounts", type = "INT"), @IntermediateState(name = "resets", type = "DOUBLE") } ) public class RateDoubleAggregator { @@ -52,10 +53,11 @@ public static void combineIntermediate( int groupId, LongBlock timestamps, DoubleBlock values, + int sampleCount, double reset, int otherPosition ) { - current.combine(groupId, timestamps, values, reset, otherPosition); + current.combine(groupId, timestamps, values, sampleCount, reset, otherPosition); } public static void combineStates( @@ -70,15 +72,17 @@ public static void combineStates( public static Block evaluateFinal( DoubleRateGroupingState state, IntVector selected, - GroupingAggregatorEvaluationContext evaluatorContext + GroupingAggregatorEvaluationContext evalContext ) { - return state.evaluateFinal(selected, evaluatorContext.blockFactory()); + return state.evaluateFinal(selected, evalContext); } private static class DoubleRateState { static final long BASE_RAM_USAGE = RamUsageEstimator.sizeOfObject(DoubleRateState.class); final long[] timestamps; // descending order final double[] values; + // the timestamps and values arrays might have collapsed to fewer values than the actual sample count + int sampleCount = 0; double reset = 0; DoubleRateState(int initialSize) { @@ -89,6 +93,7 @@ private static class DoubleRateState { DoubleRateState(long[] ts, double[] vs) { this.timestamps = ts; this.values = vs; + this.sampleCount = values.length; } private double dv(double v0, double v1) { @@ -102,6 +107,7 @@ void append(long t, double v) { reset += dv(v, values[1]) + dv(values[1], values[0]) - dv(v, values[0]); timestamps[1] = t; values[1] = v; + sampleCount++; } int entries() { @@ -158,7 +164,7 @@ void append(int groupId, long timestamp, double value) { } } - void combine(int groupId, LongBlock timestamps, DoubleBlock values, double reset, int otherPosition) { + void combine(int groupId, LongBlock timestamps, DoubleBlock values, int sampleCount, double reset, int otherPosition) { final int valueCount = timestamps.getValueCount(otherPosition); if (valueCount == 0) { return; @@ -170,6 +176,7 @@ void combine(int groupId, LongBlock timestamps, DoubleBlock values, double reset adjustBreaker(DoubleRateState.bytesUsed(valueCount)); state = new DoubleRateState(valueCount); state.reset = reset; + state.sampleCount = sampleCount; states.set(groupId, state); // TODO: add bulk_copy to Block for (int i = 0; i < valueCount; i++) { @@ -180,6 +187,7 @@ void combine(int groupId, LongBlock timestamps, DoubleBlock values, double reset adjustBreaker(DoubleRateState.bytesUsed(state.entries() + valueCount)); var newState = new DoubleRateState(state.entries() + valueCount); newState.reset = state.reset + reset; + newState.sampleCount = state.sampleCount + sampleCount; states.set(groupId, newState); merge(state, newState, firstIndex, valueCount, timestamps, values); adjustBreaker(-DoubleRateState.bytesUsed(state.entries())); // old state @@ -227,6 +235,7 @@ void combineState(int groupId, DoubleRateGroupingState otherState, int otherGrou adjustBreaker(DoubleRateState.bytesUsed(len)); curr = new DoubleRateState(Arrays.copyOf(other.timestamps, len), Arrays.copyOf(other.values, len)); curr.reset = other.reset; + curr.sampleCount = other.sampleCount; states.set(groupId, curr); } else { states.set(groupId, mergeState(curr, other)); @@ -238,6 +247,7 @@ DoubleRateState mergeState(DoubleRateState s1, DoubleRateState s2) { adjustBreaker(DoubleRateState.bytesUsed(newLen)); var dst = new DoubleRateState(newLen); dst.reset = s1.reset + s2.reset; + dst.sampleCount = s1.sampleCount + s2.sampleCount; int i = 0, j = 0, k = 0; while (i < s1.entries() && j < s2.entries()) { if (s1.timestamps[i] > s2.timestamps[j]) { @@ -276,6 +286,7 @@ public void toIntermediate(Block[] blocks, int offset, IntVector selected, Drive try ( LongBlock.Builder timestamps = blockFactory.newLongBlockBuilder(positionCount * 2); DoubleBlock.Builder values = blockFactory.newDoubleBlockBuilder(positionCount * 2); + IntVector.FixedBuilder sampleCounts = blockFactory.newIntVectorFixedBuilder(positionCount); DoubleVector.FixedBuilder resets = blockFactory.newDoubleVectorFixedBuilder(positionCount) ) { for (int i = 0; i < positionCount; i++) { @@ -293,45 +304,91 @@ public void toIntermediate(Block[] blocks, int offset, IntVector selected, Drive values.appendDouble(v); } values.endPositionEntry(); - + sampleCounts.appendInt(i, state.sampleCount); resets.appendDouble(i, state.reset); } else { timestamps.appendNull(); values.appendNull(); + sampleCounts.appendInt(i, 0); resets.appendDouble(i, 0); } } blocks[offset] = timestamps.build(); blocks[offset + 1] = values.build(); - blocks[offset + 2] = resets.build().asBlock(); + blocks[offset + 2] = sampleCounts.build().asBlock(); + blocks[offset + 3] = resets.build().asBlock(); } } - Block evaluateFinal(IntVector selected, BlockFactory blockFactory) { + private static double computeRate(DoubleRateState state, long unitInMillis) { + final int len = state.entries(); + final long firstTS = state.timestamps[state.timestamps.length - 1]; + final long lastTS = state.timestamps[0]; + double reset = state.reset; + for (int i = 1; i < len; i++) { + if (state.values[i - 1] < state.values[i]) { + reset += state.values[i]; + } + } + final double firstValue = state.values[len - 1]; + final double lastValue = state.values[0] + reset; + return (lastValue - firstValue) * unitInMillis / (lastTS - firstTS); + } + + private static double computeRate(DoubleRateState state, long rangeStart, long rangeEnd, long unitInMillis) { + final int len = state.entries(); + final long firstTS = state.timestamps[state.timestamps.length - 1]; + final long lastTS = state.timestamps[0]; + double reset = state.reset; + for (int i = 1; i < len; i++) { + if (state.values[i - 1] < state.values[i]) { + reset += state.values[i]; + } + } + double firstValue = state.values[len - 1]; + double lastValue = state.values[0] + reset; + final double sampleTS = lastTS - firstTS; + final double averageSampleInterval = sampleTS / state.sampleCount; + final double slope = (lastValue - firstValue) / sampleTS; + double startGap = firstTS - rangeStart; + if (startGap > 0) { + if (startGap > averageSampleInterval * 1.1) { + startGap = averageSampleInterval / 2.0; // limit to half of the average of the sample interval + } + firstValue = Math.max(0.0, firstValue - startGap * slope); + } + double endGap = rangeEnd - lastTS; + if (endGap > 0) { + if (endGap > averageSampleInterval * 1.1) { + endGap = averageSampleInterval / 2.0; + } + lastValue = lastValue + endGap * slope; + } + return (lastValue - firstValue) * unitInMillis / (rangeEnd - rangeStart); + } + + Block evaluateFinal(IntVector selected, GroupingAggregatorEvaluationContext evalContext) { int positionCount = selected.getPositionCount(); - try (DoubleBlock.Builder rates = blockFactory.newDoubleBlockBuilder(positionCount)) { + try (DoubleBlock.Builder rates = evalContext.blockFactory().newDoubleBlockBuilder(positionCount)) { for (int p = 0; p < positionCount; p++) { final var groupId = selected.getInt(p); final var state = groupId < states.size() ? states.get(groupId) : null; - if (state == null) { + if (state == null || state.sampleCount < 2) { rates.appendNull(); continue; } int len = state.entries(); - long dt = state.timestamps[0] - state.timestamps[len - 1]; - if (dt == 0) { - // TODO: maybe issue warning when we don't have enough sample? - rates.appendNull(); + final double rate; + if (evalContext instanceof TimeSeriesGroupingAggregatorEvaluationContext tsContext) { + rate = computeRate( + state, + tsContext.rangeStartInMillis(groupId), + tsContext.rangeEndInMillis(groupId), + unitInMillis); } else { - double reset = state.reset; - for (int i = 1; i < len; i++) { - if (state.values[i - 1] < state.values[i]) { - reset += state.values[i]; - } - } - double dv = state.values[0] - state.values[len - 1] + reset; - rates.appendDouble(dv * unitInMillis / dt); + rate = computeRate(state, unitInMillis); } + rates.appendDouble(rate); } return rates.build(); } diff --git a/x-pack/plugin/esql/compute/src/main/generated-src/org/elasticsearch/compute/aggregation/RateFloatAggregator.java b/x-pack/plugin/esql/compute/src/main/generated-src/org/elasticsearch/compute/aggregation/RateFloatAggregator.java index dcb54a6c18813..207bfa9e885a3 100644 --- a/x-pack/plugin/esql/compute/src/main/generated-src/org/elasticsearch/compute/aggregation/RateFloatAggregator.java +++ b/x-pack/plugin/esql/compute/src/main/generated-src/org/elasticsearch/compute/aggregation/RateFloatAggregator.java @@ -36,6 +36,7 @@ value = { @IntermediateState(name = "timestamps", type = "LONG_BLOCK"), @IntermediateState(name = "values", type = "FLOAT_BLOCK"), + @IntermediateState(name = "sampleCounts", type = "INT"), @IntermediateState(name = "resets", type = "DOUBLE") } ) public class RateFloatAggregator { @@ -53,10 +54,11 @@ public static void combineIntermediate( int groupId, LongBlock timestamps, FloatBlock values, + int sampleCount, double reset, int otherPosition ) { - current.combine(groupId, timestamps, values, reset, otherPosition); + current.combine(groupId, timestamps, values, sampleCount, reset, otherPosition); } public static void combineStates( @@ -71,15 +73,17 @@ public static void combineStates( public static Block evaluateFinal( FloatRateGroupingState state, IntVector selected, - GroupingAggregatorEvaluationContext evaluatorContext + GroupingAggregatorEvaluationContext evalContext ) { - return state.evaluateFinal(selected, evaluatorContext.blockFactory()); + return state.evaluateFinal(selected, evalContext); } private static class FloatRateState { static final long BASE_RAM_USAGE = RamUsageEstimator.sizeOfObject(FloatRateState.class); final long[] timestamps; // descending order final float[] values; + // the timestamps and values arrays might have collapsed to fewer values than the actual sample count + int sampleCount = 0; double reset = 0; FloatRateState(int initialSize) { @@ -90,6 +94,7 @@ private static class FloatRateState { FloatRateState(long[] ts, float[] vs) { this.timestamps = ts; this.values = vs; + this.sampleCount = values.length; } private float dv(float v0, float v1) { @@ -103,6 +108,7 @@ void append(long t, float v) { reset += dv(v, values[1]) + dv(values[1], values[0]) - dv(v, values[0]); timestamps[1] = t; values[1] = v; + sampleCount++; } int entries() { @@ -159,7 +165,7 @@ void append(int groupId, long timestamp, float value) { } } - void combine(int groupId, LongBlock timestamps, FloatBlock values, double reset, int otherPosition) { + void combine(int groupId, LongBlock timestamps, FloatBlock values, int sampleCount, double reset, int otherPosition) { final int valueCount = timestamps.getValueCount(otherPosition); if (valueCount == 0) { return; @@ -171,6 +177,7 @@ void combine(int groupId, LongBlock timestamps, FloatBlock values, double reset, adjustBreaker(FloatRateState.bytesUsed(valueCount)); state = new FloatRateState(valueCount); state.reset = reset; + state.sampleCount = sampleCount; states.set(groupId, state); // TODO: add bulk_copy to Block for (int i = 0; i < valueCount; i++) { @@ -181,6 +188,7 @@ void combine(int groupId, LongBlock timestamps, FloatBlock values, double reset, adjustBreaker(FloatRateState.bytesUsed(state.entries() + valueCount)); var newState = new FloatRateState(state.entries() + valueCount); newState.reset = state.reset + reset; + newState.sampleCount = state.sampleCount + sampleCount; states.set(groupId, newState); merge(state, newState, firstIndex, valueCount, timestamps, values); adjustBreaker(-FloatRateState.bytesUsed(state.entries())); // old state @@ -228,6 +236,7 @@ void combineState(int groupId, FloatRateGroupingState otherState, int otherGroup adjustBreaker(FloatRateState.bytesUsed(len)); curr = new FloatRateState(Arrays.copyOf(other.timestamps, len), Arrays.copyOf(other.values, len)); curr.reset = other.reset; + curr.sampleCount = other.sampleCount; states.set(groupId, curr); } else { states.set(groupId, mergeState(curr, other)); @@ -239,6 +248,7 @@ FloatRateState mergeState(FloatRateState s1, FloatRateState s2) { adjustBreaker(FloatRateState.bytesUsed(newLen)); var dst = new FloatRateState(newLen); dst.reset = s1.reset + s2.reset; + dst.sampleCount = s1.sampleCount + s2.sampleCount; int i = 0, j = 0, k = 0; while (i < s1.entries() && j < s2.entries()) { if (s1.timestamps[i] > s2.timestamps[j]) { @@ -277,6 +287,7 @@ public void toIntermediate(Block[] blocks, int offset, IntVector selected, Drive try ( LongBlock.Builder timestamps = blockFactory.newLongBlockBuilder(positionCount * 2); FloatBlock.Builder values = blockFactory.newFloatBlockBuilder(positionCount * 2); + IntVector.FixedBuilder sampleCounts = blockFactory.newIntVectorFixedBuilder(positionCount); DoubleVector.FixedBuilder resets = blockFactory.newDoubleVectorFixedBuilder(positionCount) ) { for (int i = 0; i < positionCount; i++) { @@ -294,45 +305,91 @@ public void toIntermediate(Block[] blocks, int offset, IntVector selected, Drive values.appendFloat(v); } values.endPositionEntry(); - + sampleCounts.appendInt(i, state.sampleCount); resets.appendDouble(i, state.reset); } else { timestamps.appendNull(); values.appendNull(); + sampleCounts.appendInt(i, 0); resets.appendDouble(i, 0); } } blocks[offset] = timestamps.build(); blocks[offset + 1] = values.build(); - blocks[offset + 2] = resets.build().asBlock(); + blocks[offset + 2] = sampleCounts.build().asBlock(); + blocks[offset + 3] = resets.build().asBlock(); } } - Block evaluateFinal(IntVector selected, BlockFactory blockFactory) { + private static double computeRate(FloatRateState state, long unitInMillis) { + final int len = state.entries(); + final long firstTS = state.timestamps[state.timestamps.length - 1]; + final long lastTS = state.timestamps[0]; + double reset = state.reset; + for (int i = 1; i < len; i++) { + if (state.values[i - 1] < state.values[i]) { + reset += state.values[i]; + } + } + final double firstValue = state.values[len - 1]; + final double lastValue = state.values[0] + reset; + return (lastValue - firstValue) * unitInMillis / (lastTS - firstTS); + } + + private static double computeRate(FloatRateState state, long rangeStart, long rangeEnd, long unitInMillis) { + final int len = state.entries(); + final long firstTS = state.timestamps[state.timestamps.length - 1]; + final long lastTS = state.timestamps[0]; + double reset = state.reset; + for (int i = 1; i < len; i++) { + if (state.values[i - 1] < state.values[i]) { + reset += state.values[i]; + } + } + double firstValue = state.values[len - 1]; + double lastValue = state.values[0] + reset; + final double sampleTS = lastTS - firstTS; + final double averageSampleInterval = sampleTS / state.sampleCount; + final double slope = (lastValue - firstValue) / sampleTS; + double startGap = firstTS - rangeStart; + if (startGap > 0) { + if (startGap > averageSampleInterval * 1.1) { + startGap = averageSampleInterval / 2.0; // limit to half of the average of the sample interval + } + firstValue = Math.max(0.0, firstValue - startGap * slope); + } + double endGap = rangeEnd - lastTS; + if (endGap > 0) { + if (endGap > averageSampleInterval * 1.1) { + endGap = averageSampleInterval / 2.0; + } + lastValue = lastValue + endGap * slope; + } + return (lastValue - firstValue) * unitInMillis / (rangeEnd - rangeStart); + } + + Block evaluateFinal(IntVector selected, GroupingAggregatorEvaluationContext evalContext) { int positionCount = selected.getPositionCount(); - try (DoubleBlock.Builder rates = blockFactory.newDoubleBlockBuilder(positionCount)) { + try (DoubleBlock.Builder rates = evalContext.blockFactory().newDoubleBlockBuilder(positionCount)) { for (int p = 0; p < positionCount; p++) { final var groupId = selected.getInt(p); final var state = groupId < states.size() ? states.get(groupId) : null; - if (state == null) { + if (state == null || state.sampleCount < 2) { rates.appendNull(); continue; } int len = state.entries(); - long dt = state.timestamps[0] - state.timestamps[len - 1]; - if (dt == 0) { - // TODO: maybe issue warning when we don't have enough sample? - rates.appendNull(); + final double rate; + if (evalContext instanceof TimeSeriesGroupingAggregatorEvaluationContext tsContext) { + rate = computeRate( + state, + tsContext.rangeStartInMillis(groupId), + tsContext.rangeEndInMillis(groupId), + unitInMillis); } else { - double reset = state.reset; - for (int i = 1; i < len; i++) { - if (state.values[i - 1] < state.values[i]) { - reset += state.values[i]; - } - } - double dv = state.values[0] - state.values[len - 1] + reset; - rates.appendDouble(dv * unitInMillis / dt); + rate = computeRate(state, unitInMillis); } + rates.appendDouble(rate); } return rates.build(); } diff --git a/x-pack/plugin/esql/compute/src/main/generated-src/org/elasticsearch/compute/aggregation/RateIntAggregator.java b/x-pack/plugin/esql/compute/src/main/generated-src/org/elasticsearch/compute/aggregation/RateIntAggregator.java index f8e0c0b70aed2..97940fbef7dad 100644 --- a/x-pack/plugin/esql/compute/src/main/generated-src/org/elasticsearch/compute/aggregation/RateIntAggregator.java +++ b/x-pack/plugin/esql/compute/src/main/generated-src/org/elasticsearch/compute/aggregation/RateIntAggregator.java @@ -36,6 +36,7 @@ value = { @IntermediateState(name = "timestamps", type = "LONG_BLOCK"), @IntermediateState(name = "values", type = "INT_BLOCK"), + @IntermediateState(name = "sampleCounts", type = "INT"), @IntermediateState(name = "resets", type = "DOUBLE") } ) public class RateIntAggregator { @@ -53,10 +54,11 @@ public static void combineIntermediate( int groupId, LongBlock timestamps, IntBlock values, + int sampleCount, double reset, int otherPosition ) { - current.combine(groupId, timestamps, values, reset, otherPosition); + current.combine(groupId, timestamps, values, sampleCount, reset, otherPosition); } public static void combineStates( @@ -71,15 +73,17 @@ public static void combineStates( public static Block evaluateFinal( IntRateGroupingState state, IntVector selected, - GroupingAggregatorEvaluationContext evaluatorContext + GroupingAggregatorEvaluationContext evalContext ) { - return state.evaluateFinal(selected, evaluatorContext.blockFactory()); + return state.evaluateFinal(selected, evalContext); } private static class IntRateState { static final long BASE_RAM_USAGE = RamUsageEstimator.sizeOfObject(IntRateState.class); final long[] timestamps; // descending order final int[] values; + // the timestamps and values arrays might have collapsed to fewer values than the actual sample count + int sampleCount = 0; double reset = 0; IntRateState(int initialSize) { @@ -90,6 +94,7 @@ private static class IntRateState { IntRateState(long[] ts, int[] vs) { this.timestamps = ts; this.values = vs; + this.sampleCount = values.length; } private int dv(int v0, int v1) { @@ -103,6 +108,7 @@ void append(long t, int v) { reset += dv(v, values[1]) + dv(values[1], values[0]) - dv(v, values[0]); timestamps[1] = t; values[1] = v; + sampleCount++; } int entries() { @@ -159,7 +165,7 @@ void append(int groupId, long timestamp, int value) { } } - void combine(int groupId, LongBlock timestamps, IntBlock values, double reset, int otherPosition) { + void combine(int groupId, LongBlock timestamps, IntBlock values, int sampleCount, double reset, int otherPosition) { final int valueCount = timestamps.getValueCount(otherPosition); if (valueCount == 0) { return; @@ -171,6 +177,7 @@ void combine(int groupId, LongBlock timestamps, IntBlock values, double reset, i adjustBreaker(IntRateState.bytesUsed(valueCount)); state = new IntRateState(valueCount); state.reset = reset; + state.sampleCount = sampleCount; states.set(groupId, state); // TODO: add bulk_copy to Block for (int i = 0; i < valueCount; i++) { @@ -181,6 +188,7 @@ void combine(int groupId, LongBlock timestamps, IntBlock values, double reset, i adjustBreaker(IntRateState.bytesUsed(state.entries() + valueCount)); var newState = new IntRateState(state.entries() + valueCount); newState.reset = state.reset + reset; + newState.sampleCount = state.sampleCount + sampleCount; states.set(groupId, newState); merge(state, newState, firstIndex, valueCount, timestamps, values); adjustBreaker(-IntRateState.bytesUsed(state.entries())); // old state @@ -228,6 +236,7 @@ void combineState(int groupId, IntRateGroupingState otherState, int otherGroupId adjustBreaker(IntRateState.bytesUsed(len)); curr = new IntRateState(Arrays.copyOf(other.timestamps, len), Arrays.copyOf(other.values, len)); curr.reset = other.reset; + curr.sampleCount = other.sampleCount; states.set(groupId, curr); } else { states.set(groupId, mergeState(curr, other)); @@ -239,6 +248,7 @@ IntRateState mergeState(IntRateState s1, IntRateState s2) { adjustBreaker(IntRateState.bytesUsed(newLen)); var dst = new IntRateState(newLen); dst.reset = s1.reset + s2.reset; + dst.sampleCount = s1.sampleCount + s2.sampleCount; int i = 0, j = 0, k = 0; while (i < s1.entries() && j < s2.entries()) { if (s1.timestamps[i] > s2.timestamps[j]) { @@ -277,6 +287,7 @@ public void toIntermediate(Block[] blocks, int offset, IntVector selected, Drive try ( LongBlock.Builder timestamps = blockFactory.newLongBlockBuilder(positionCount * 2); IntBlock.Builder values = blockFactory.newIntBlockBuilder(positionCount * 2); + IntVector.FixedBuilder sampleCounts = blockFactory.newIntVectorFixedBuilder(positionCount); DoubleVector.FixedBuilder resets = blockFactory.newDoubleVectorFixedBuilder(positionCount) ) { for (int i = 0; i < positionCount; i++) { @@ -294,45 +305,91 @@ public void toIntermediate(Block[] blocks, int offset, IntVector selected, Drive values.appendInt(v); } values.endPositionEntry(); - + sampleCounts.appendInt(i, state.sampleCount); resets.appendDouble(i, state.reset); } else { timestamps.appendNull(); values.appendNull(); + sampleCounts.appendInt(i, 0); resets.appendDouble(i, 0); } } blocks[offset] = timestamps.build(); blocks[offset + 1] = values.build(); - blocks[offset + 2] = resets.build().asBlock(); + blocks[offset + 2] = sampleCounts.build().asBlock(); + blocks[offset + 3] = resets.build().asBlock(); } } - Block evaluateFinal(IntVector selected, BlockFactory blockFactory) { + private static double computeRate(IntRateState state, long unitInMillis) { + final int len = state.entries(); + final long firstTS = state.timestamps[state.timestamps.length - 1]; + final long lastTS = state.timestamps[0]; + double reset = state.reset; + for (int i = 1; i < len; i++) { + if (state.values[i - 1] < state.values[i]) { + reset += state.values[i]; + } + } + final double firstValue = state.values[len - 1]; + final double lastValue = state.values[0] + reset; + return (lastValue - firstValue) * unitInMillis / (lastTS - firstTS); + } + + private static double computeRate(IntRateState state, long rangeStart, long rangeEnd, long unitInMillis) { + final int len = state.entries(); + final long firstTS = state.timestamps[state.timestamps.length - 1]; + final long lastTS = state.timestamps[0]; + double reset = state.reset; + for (int i = 1; i < len; i++) { + if (state.values[i - 1] < state.values[i]) { + reset += state.values[i]; + } + } + double firstValue = state.values[len - 1]; + double lastValue = state.values[0] + reset; + final double sampleTS = lastTS - firstTS; + final double averageSampleInterval = sampleTS / state.sampleCount; + final double slope = (lastValue - firstValue) / sampleTS; + double startGap = firstTS - rangeStart; + if (startGap > 0) { + if (startGap > averageSampleInterval * 1.1) { + startGap = averageSampleInterval / 2.0; // limit to half of the average of the sample interval + } + firstValue = Math.max(0.0, firstValue - startGap * slope); + } + double endGap = rangeEnd - lastTS; + if (endGap > 0) { + if (endGap > averageSampleInterval * 1.1) { + endGap = averageSampleInterval / 2.0; + } + lastValue = lastValue + endGap * slope; + } + return (lastValue - firstValue) * unitInMillis / (rangeEnd - rangeStart); + } + + Block evaluateFinal(IntVector selected, GroupingAggregatorEvaluationContext evalContext) { int positionCount = selected.getPositionCount(); - try (DoubleBlock.Builder rates = blockFactory.newDoubleBlockBuilder(positionCount)) { + try (DoubleBlock.Builder rates = evalContext.blockFactory().newDoubleBlockBuilder(positionCount)) { for (int p = 0; p < positionCount; p++) { final var groupId = selected.getInt(p); final var state = groupId < states.size() ? states.get(groupId) : null; - if (state == null) { + if (state == null || state.sampleCount < 2) { rates.appendNull(); continue; } int len = state.entries(); - long dt = state.timestamps[0] - state.timestamps[len - 1]; - if (dt == 0) { - // TODO: maybe issue warning when we don't have enough sample? - rates.appendNull(); + final double rate; + if (evalContext instanceof TimeSeriesGroupingAggregatorEvaluationContext tsContext) { + rate = computeRate( + state, + tsContext.rangeStartInMillis(groupId), + tsContext.rangeEndInMillis(groupId), + unitInMillis); } else { - double reset = state.reset; - for (int i = 1; i < len; i++) { - if (state.values[i - 1] < state.values[i]) { - reset += state.values[i]; - } - } - double dv = state.values[0] - state.values[len - 1] + reset; - rates.appendDouble(dv * unitInMillis / dt); + rate = computeRate(state, unitInMillis); } + rates.appendDouble(rate); } return rates.build(); } diff --git a/x-pack/plugin/esql/compute/src/main/generated-src/org/elasticsearch/compute/aggregation/RateLongAggregator.java b/x-pack/plugin/esql/compute/src/main/generated-src/org/elasticsearch/compute/aggregation/RateLongAggregator.java index ba636accf7a78..68186d18c49e4 100644 --- a/x-pack/plugin/esql/compute/src/main/generated-src/org/elasticsearch/compute/aggregation/RateLongAggregator.java +++ b/x-pack/plugin/esql/compute/src/main/generated-src/org/elasticsearch/compute/aggregation/RateLongAggregator.java @@ -35,6 +35,7 @@ value = { @IntermediateState(name = "timestamps", type = "LONG_BLOCK"), @IntermediateState(name = "values", type = "LONG_BLOCK"), + @IntermediateState(name = "sampleCounts", type = "INT"), @IntermediateState(name = "resets", type = "DOUBLE") } ) public class RateLongAggregator { @@ -52,10 +53,11 @@ public static void combineIntermediate( int groupId, LongBlock timestamps, LongBlock values, + int sampleCount, double reset, int otherPosition ) { - current.combine(groupId, timestamps, values, reset, otherPosition); + current.combine(groupId, timestamps, values, sampleCount, reset, otherPosition); } public static void combineStates( @@ -70,15 +72,17 @@ public static void combineStates( public static Block evaluateFinal( LongRateGroupingState state, IntVector selected, - GroupingAggregatorEvaluationContext evaluatorContext + GroupingAggregatorEvaluationContext evalContext ) { - return state.evaluateFinal(selected, evaluatorContext.blockFactory()); + return state.evaluateFinal(selected, evalContext); } private static class LongRateState { static final long BASE_RAM_USAGE = RamUsageEstimator.sizeOfObject(LongRateState.class); final long[] timestamps; // descending order final long[] values; + // the timestamps and values arrays might have collapsed to fewer values than the actual sample count + int sampleCount = 0; double reset = 0; LongRateState(int initialSize) { @@ -89,6 +93,7 @@ private static class LongRateState { LongRateState(long[] ts, long[] vs) { this.timestamps = ts; this.values = vs; + this.sampleCount = values.length; } private long dv(long v0, long v1) { @@ -102,6 +107,7 @@ void append(long t, long v) { reset += dv(v, values[1]) + dv(values[1], values[0]) - dv(v, values[0]); timestamps[1] = t; values[1] = v; + sampleCount++; } int entries() { @@ -158,7 +164,7 @@ void append(int groupId, long timestamp, long value) { } } - void combine(int groupId, LongBlock timestamps, LongBlock values, double reset, int otherPosition) { + void combine(int groupId, LongBlock timestamps, LongBlock values, int sampleCount, double reset, int otherPosition) { final int valueCount = timestamps.getValueCount(otherPosition); if (valueCount == 0) { return; @@ -170,6 +176,7 @@ void combine(int groupId, LongBlock timestamps, LongBlock values, double reset, adjustBreaker(LongRateState.bytesUsed(valueCount)); state = new LongRateState(valueCount); state.reset = reset; + state.sampleCount = sampleCount; states.set(groupId, state); // TODO: add bulk_copy to Block for (int i = 0; i < valueCount; i++) { @@ -180,6 +187,7 @@ void combine(int groupId, LongBlock timestamps, LongBlock values, double reset, adjustBreaker(LongRateState.bytesUsed(state.entries() + valueCount)); var newState = new LongRateState(state.entries() + valueCount); newState.reset = state.reset + reset; + newState.sampleCount = state.sampleCount + sampleCount; states.set(groupId, newState); merge(state, newState, firstIndex, valueCount, timestamps, values); adjustBreaker(-LongRateState.bytesUsed(state.entries())); // old state @@ -227,6 +235,7 @@ void combineState(int groupId, LongRateGroupingState otherState, int otherGroupI adjustBreaker(LongRateState.bytesUsed(len)); curr = new LongRateState(Arrays.copyOf(other.timestamps, len), Arrays.copyOf(other.values, len)); curr.reset = other.reset; + curr.sampleCount = other.sampleCount; states.set(groupId, curr); } else { states.set(groupId, mergeState(curr, other)); @@ -238,6 +247,7 @@ LongRateState mergeState(LongRateState s1, LongRateState s2) { adjustBreaker(LongRateState.bytesUsed(newLen)); var dst = new LongRateState(newLen); dst.reset = s1.reset + s2.reset; + dst.sampleCount = s1.sampleCount + s2.sampleCount; int i = 0, j = 0, k = 0; while (i < s1.entries() && j < s2.entries()) { if (s1.timestamps[i] > s2.timestamps[j]) { @@ -276,6 +286,7 @@ public void toIntermediate(Block[] blocks, int offset, IntVector selected, Drive try ( LongBlock.Builder timestamps = blockFactory.newLongBlockBuilder(positionCount * 2); LongBlock.Builder values = blockFactory.newLongBlockBuilder(positionCount * 2); + IntVector.FixedBuilder sampleCounts = blockFactory.newIntVectorFixedBuilder(positionCount); DoubleVector.FixedBuilder resets = blockFactory.newDoubleVectorFixedBuilder(positionCount) ) { for (int i = 0; i < positionCount; i++) { @@ -293,45 +304,91 @@ public void toIntermediate(Block[] blocks, int offset, IntVector selected, Drive values.appendLong(v); } values.endPositionEntry(); - + sampleCounts.appendInt(i, state.sampleCount); resets.appendDouble(i, state.reset); } else { timestamps.appendNull(); values.appendNull(); + sampleCounts.appendInt(i, 0); resets.appendDouble(i, 0); } } blocks[offset] = timestamps.build(); blocks[offset + 1] = values.build(); - blocks[offset + 2] = resets.build().asBlock(); + blocks[offset + 2] = sampleCounts.build().asBlock(); + blocks[offset + 3] = resets.build().asBlock(); } } - Block evaluateFinal(IntVector selected, BlockFactory blockFactory) { + private static double computeRate(LongRateState state, long unitInMillis) { + final int len = state.entries(); + final long firstTS = state.timestamps[state.timestamps.length - 1]; + final long lastTS = state.timestamps[0]; + double reset = state.reset; + for (int i = 1; i < len; i++) { + if (state.values[i - 1] < state.values[i]) { + reset += state.values[i]; + } + } + final double firstValue = state.values[len - 1]; + final double lastValue = state.values[0] + reset; + return (lastValue - firstValue) * unitInMillis / (lastTS - firstTS); + } + + private static double computeRate(LongRateState state, long rangeStart, long rangeEnd, long unitInMillis) { + final int len = state.entries(); + final long firstTS = state.timestamps[state.timestamps.length - 1]; + final long lastTS = state.timestamps[0]; + double reset = state.reset; + for (int i = 1; i < len; i++) { + if (state.values[i - 1] < state.values[i]) { + reset += state.values[i]; + } + } + double firstValue = state.values[len - 1]; + double lastValue = state.values[0] + reset; + final double sampleTS = lastTS - firstTS; + final double averageSampleInterval = sampleTS / state.sampleCount; + final double slope = (lastValue - firstValue) / sampleTS; + double startGap = firstTS - rangeStart; + if (startGap > 0) { + if (startGap > averageSampleInterval * 1.1) { + startGap = averageSampleInterval / 2.0; // limit to half of the average of the sample interval + } + firstValue = Math.max(0.0, firstValue - startGap * slope); + } + double endGap = rangeEnd - lastTS; + if (endGap > 0) { + if (endGap > averageSampleInterval * 1.1) { + endGap = averageSampleInterval / 2.0; + } + lastValue = lastValue + endGap * slope; + } + return (lastValue - firstValue) * unitInMillis / (rangeEnd - rangeStart); + } + + Block evaluateFinal(IntVector selected, GroupingAggregatorEvaluationContext evalContext) { int positionCount = selected.getPositionCount(); - try (DoubleBlock.Builder rates = blockFactory.newDoubleBlockBuilder(positionCount)) { + try (DoubleBlock.Builder rates = evalContext.blockFactory().newDoubleBlockBuilder(positionCount)) { for (int p = 0; p < positionCount; p++) { final var groupId = selected.getInt(p); final var state = groupId < states.size() ? states.get(groupId) : null; - if (state == null) { + if (state == null || state.sampleCount < 2) { rates.appendNull(); continue; } int len = state.entries(); - long dt = state.timestamps[0] - state.timestamps[len - 1]; - if (dt == 0) { - // TODO: maybe issue warning when we don't have enough sample? - rates.appendNull(); + final double rate; + if (evalContext instanceof TimeSeriesGroupingAggregatorEvaluationContext tsContext) { + rate = computeRate( + state, + tsContext.rangeStartInMillis(groupId), + tsContext.rangeEndInMillis(groupId), + unitInMillis); } else { - double reset = state.reset; - for (int i = 1; i < len; i++) { - if (state.values[i - 1] < state.values[i]) { - reset += state.values[i]; - } - } - double dv = state.values[0] - state.values[len - 1] + reset; - rates.appendDouble(dv * unitInMillis / dt); + rate = computeRate(state, unitInMillis); } + rates.appendDouble(rate); } return rates.build(); } diff --git a/x-pack/plugin/esql/compute/src/main/generated/org/elasticsearch/compute/aggregation/RateDoubleGroupingAggregatorFunction.java b/x-pack/plugin/esql/compute/src/main/generated/org/elasticsearch/compute/aggregation/RateDoubleGroupingAggregatorFunction.java index cf0749fcf9e99..4327897cbc0cc 100644 --- a/x-pack/plugin/esql/compute/src/main/generated/org/elasticsearch/compute/aggregation/RateDoubleGroupingAggregatorFunction.java +++ b/x-pack/plugin/esql/compute/src/main/generated/org/elasticsearch/compute/aggregation/RateDoubleGroupingAggregatorFunction.java @@ -28,6 +28,7 @@ public final class RateDoubleGroupingAggregatorFunction implements GroupingAggre private static final List INTERMEDIATE_STATE_DESC = List.of( new IntermediateStateDesc("timestamps", ElementType.LONG), new IntermediateStateDesc("values", ElementType.DOUBLE), + new IntermediateStateDesc("sampleCounts", ElementType.INT), new IntermediateStateDesc("resets", ElementType.DOUBLE) ); private final RateDoubleAggregator.DoubleRateGroupingState state; @@ -189,15 +190,20 @@ public void addIntermediateInput(int positionOffset, IntVector groups, Page page return; } DoubleBlock values = (DoubleBlock) valuesUncast; - Block resetsUncast = page.getBlock(channels.get(2)); + Block sampleCountsUncast = page.getBlock(channels.get(2)); + if (sampleCountsUncast.areAllValuesNull()) { + return; + } + IntVector sampleCounts = ((IntBlock) sampleCountsUncast).asVector(); + Block resetsUncast = page.getBlock(channels.get(3)); if (resetsUncast.areAllValuesNull()) { return; } DoubleVector resets = ((DoubleBlock) resetsUncast).asVector(); - assert timestamps.getPositionCount() == values.getPositionCount() && timestamps.getPositionCount() == resets.getPositionCount(); + assert timestamps.getPositionCount() == values.getPositionCount() && timestamps.getPositionCount() == sampleCounts.getPositionCount() && timestamps.getPositionCount() == resets.getPositionCount(); for (int groupPosition = 0; groupPosition < groups.getPositionCount(); groupPosition++) { int groupId = groups.getInt(groupPosition); - RateDoubleAggregator.combineIntermediate(state, groupId, timestamps, values, resets.getDouble(groupPosition + positionOffset), groupPosition + positionOffset); + RateDoubleAggregator.combineIntermediate(state, groupId, timestamps, values, sampleCounts.getInt(groupPosition + positionOffset), resets.getDouble(groupPosition + positionOffset), groupPosition + positionOffset); } } diff --git a/x-pack/plugin/esql/compute/src/main/generated/org/elasticsearch/compute/aggregation/RateFloatGroupingAggregatorFunction.java b/x-pack/plugin/esql/compute/src/main/generated/org/elasticsearch/compute/aggregation/RateFloatGroupingAggregatorFunction.java index b3c346c12e31f..f37e0121a1eb6 100644 --- a/x-pack/plugin/esql/compute/src/main/generated/org/elasticsearch/compute/aggregation/RateFloatGroupingAggregatorFunction.java +++ b/x-pack/plugin/esql/compute/src/main/generated/org/elasticsearch/compute/aggregation/RateFloatGroupingAggregatorFunction.java @@ -30,6 +30,7 @@ public final class RateFloatGroupingAggregatorFunction implements GroupingAggreg private static final List INTERMEDIATE_STATE_DESC = List.of( new IntermediateStateDesc("timestamps", ElementType.LONG), new IntermediateStateDesc("values", ElementType.FLOAT), + new IntermediateStateDesc("sampleCounts", ElementType.INT), new IntermediateStateDesc("resets", ElementType.DOUBLE) ); private final RateFloatAggregator.FloatRateGroupingState state; @@ -191,15 +192,20 @@ public void addIntermediateInput(int positionOffset, IntVector groups, Page page return; } FloatBlock values = (FloatBlock) valuesUncast; - Block resetsUncast = page.getBlock(channels.get(2)); + Block sampleCountsUncast = page.getBlock(channels.get(2)); + if (sampleCountsUncast.areAllValuesNull()) { + return; + } + IntVector sampleCounts = ((IntBlock) sampleCountsUncast).asVector(); + Block resetsUncast = page.getBlock(channels.get(3)); if (resetsUncast.areAllValuesNull()) { return; } DoubleVector resets = ((DoubleBlock) resetsUncast).asVector(); - assert timestamps.getPositionCount() == values.getPositionCount() && timestamps.getPositionCount() == resets.getPositionCount(); + assert timestamps.getPositionCount() == values.getPositionCount() && timestamps.getPositionCount() == sampleCounts.getPositionCount() && timestamps.getPositionCount() == resets.getPositionCount(); for (int groupPosition = 0; groupPosition < groups.getPositionCount(); groupPosition++) { int groupId = groups.getInt(groupPosition); - RateFloatAggregator.combineIntermediate(state, groupId, timestamps, values, resets.getDouble(groupPosition + positionOffset), groupPosition + positionOffset); + RateFloatAggregator.combineIntermediate(state, groupId, timestamps, values, sampleCounts.getInt(groupPosition + positionOffset), resets.getDouble(groupPosition + positionOffset), groupPosition + positionOffset); } } diff --git a/x-pack/plugin/esql/compute/src/main/generated/org/elasticsearch/compute/aggregation/RateIntGroupingAggregatorFunction.java b/x-pack/plugin/esql/compute/src/main/generated/org/elasticsearch/compute/aggregation/RateIntGroupingAggregatorFunction.java index 769c7b126a24f..b5b8a073e3713 100644 --- a/x-pack/plugin/esql/compute/src/main/generated/org/elasticsearch/compute/aggregation/RateIntGroupingAggregatorFunction.java +++ b/x-pack/plugin/esql/compute/src/main/generated/org/elasticsearch/compute/aggregation/RateIntGroupingAggregatorFunction.java @@ -28,6 +28,7 @@ public final class RateIntGroupingAggregatorFunction implements GroupingAggregat private static final List INTERMEDIATE_STATE_DESC = List.of( new IntermediateStateDesc("timestamps", ElementType.LONG), new IntermediateStateDesc("values", ElementType.INT), + new IntermediateStateDesc("sampleCounts", ElementType.INT), new IntermediateStateDesc("resets", ElementType.DOUBLE) ); private final RateIntAggregator.IntRateGroupingState state; @@ -189,15 +190,20 @@ public void addIntermediateInput(int positionOffset, IntVector groups, Page page return; } IntBlock values = (IntBlock) valuesUncast; - Block resetsUncast = page.getBlock(channels.get(2)); + Block sampleCountsUncast = page.getBlock(channels.get(2)); + if (sampleCountsUncast.areAllValuesNull()) { + return; + } + IntVector sampleCounts = ((IntBlock) sampleCountsUncast).asVector(); + Block resetsUncast = page.getBlock(channels.get(3)); if (resetsUncast.areAllValuesNull()) { return; } DoubleVector resets = ((DoubleBlock) resetsUncast).asVector(); - assert timestamps.getPositionCount() == values.getPositionCount() && timestamps.getPositionCount() == resets.getPositionCount(); + assert timestamps.getPositionCount() == values.getPositionCount() && timestamps.getPositionCount() == sampleCounts.getPositionCount() && timestamps.getPositionCount() == resets.getPositionCount(); for (int groupPosition = 0; groupPosition < groups.getPositionCount(); groupPosition++) { int groupId = groups.getInt(groupPosition); - RateIntAggregator.combineIntermediate(state, groupId, timestamps, values, resets.getDouble(groupPosition + positionOffset), groupPosition + positionOffset); + RateIntAggregator.combineIntermediate(state, groupId, timestamps, values, sampleCounts.getInt(groupPosition + positionOffset), resets.getDouble(groupPosition + positionOffset), groupPosition + positionOffset); } } diff --git a/x-pack/plugin/esql/compute/src/main/generated/org/elasticsearch/compute/aggregation/RateLongGroupingAggregatorFunction.java b/x-pack/plugin/esql/compute/src/main/generated/org/elasticsearch/compute/aggregation/RateLongGroupingAggregatorFunction.java index 1cee7d6845130..a9c6cf14ee78d 100644 --- a/x-pack/plugin/esql/compute/src/main/generated/org/elasticsearch/compute/aggregation/RateLongGroupingAggregatorFunction.java +++ b/x-pack/plugin/esql/compute/src/main/generated/org/elasticsearch/compute/aggregation/RateLongGroupingAggregatorFunction.java @@ -28,6 +28,7 @@ public final class RateLongGroupingAggregatorFunction implements GroupingAggrega private static final List INTERMEDIATE_STATE_DESC = List.of( new IntermediateStateDesc("timestamps", ElementType.LONG), new IntermediateStateDesc("values", ElementType.LONG), + new IntermediateStateDesc("sampleCounts", ElementType.INT), new IntermediateStateDesc("resets", ElementType.DOUBLE) ); private final RateLongAggregator.LongRateGroupingState state; @@ -189,15 +190,20 @@ public void addIntermediateInput(int positionOffset, IntVector groups, Page page return; } LongBlock values = (LongBlock) valuesUncast; - Block resetsUncast = page.getBlock(channels.get(2)); + Block sampleCountsUncast = page.getBlock(channels.get(2)); + if (sampleCountsUncast.areAllValuesNull()) { + return; + } + IntVector sampleCounts = ((IntBlock) sampleCountsUncast).asVector(); + Block resetsUncast = page.getBlock(channels.get(3)); if (resetsUncast.areAllValuesNull()) { return; } DoubleVector resets = ((DoubleBlock) resetsUncast).asVector(); - assert timestamps.getPositionCount() == values.getPositionCount() && timestamps.getPositionCount() == resets.getPositionCount(); + assert timestamps.getPositionCount() == values.getPositionCount() && timestamps.getPositionCount() == sampleCounts.getPositionCount() && timestamps.getPositionCount() == resets.getPositionCount(); for (int groupPosition = 0; groupPosition < groups.getPositionCount(); groupPosition++) { int groupId = groups.getInt(groupPosition); - RateLongAggregator.combineIntermediate(state, groupId, timestamps, values, resets.getDouble(groupPosition + positionOffset), groupPosition + positionOffset); + RateLongAggregator.combineIntermediate(state, groupId, timestamps, values, sampleCounts.getInt(groupPosition + positionOffset), resets.getDouble(groupPosition + positionOffset), groupPosition + positionOffset); } } diff --git a/x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/aggregation/TimeSeriesGroupingAggregatorEvaluationContext.java b/x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/aggregation/TimeSeriesGroupingAggregatorEvaluationContext.java new file mode 100644 index 0000000000000..5e46922bace35 --- /dev/null +++ b/x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/aggregation/TimeSeriesGroupingAggregatorEvaluationContext.java @@ -0,0 +1,20 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License + * 2.0; you may not use this file except in compliance with the Elastic License + * 2.0. + */ + +package org.elasticsearch.compute.aggregation; + +import org.elasticsearch.compute.operator.DriverContext; + +public abstract class TimeSeriesGroupingAggregatorEvaluationContext extends GroupingAggregatorEvaluationContext { + public TimeSeriesGroupingAggregatorEvaluationContext(DriverContext driverContext) { + super(driverContext); + } + + public abstract long rangeStartInMillis(int groupId); + + public abstract long rangeEndInMillis(int groupId); +} diff --git a/x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/aggregation/X-RateAggregator.java.st b/x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/aggregation/X-RateAggregator.java.st index e1c829eda4c2e..3eb12f5149163 100644 --- a/x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/aggregation/X-RateAggregator.java.st +++ b/x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/aggregation/X-RateAggregator.java.st @@ -40,6 +40,7 @@ import java.util.Arrays; value = { @IntermediateState(name = "timestamps", type = "LONG_BLOCK"), @IntermediateState(name = "values", type = "$TYPE$_BLOCK"), + @IntermediateState(name = "sampleCounts", type = "INT"), @IntermediateState(name = "resets", type = "DOUBLE") } ) public class Rate$Type$Aggregator { @@ -57,10 +58,11 @@ public class Rate$Type$Aggregator { int groupId, LongBlock timestamps, $Type$Block values, + int sampleCount, double reset, int otherPosition ) { - current.combine(groupId, timestamps, values, reset, otherPosition); + current.combine(groupId, timestamps, values, sampleCount, reset, otherPosition); } public static void combineStates( @@ -75,15 +77,17 @@ public class Rate$Type$Aggregator { public static Block evaluateFinal( $Type$RateGroupingState state, IntVector selected, - GroupingAggregatorEvaluationContext evaluatorContext + GroupingAggregatorEvaluationContext evalContext ) { - return state.evaluateFinal(selected, evaluatorContext.blockFactory()); + return state.evaluateFinal(selected, evalContext); } private static class $Type$RateState { static final long BASE_RAM_USAGE = RamUsageEstimator.sizeOfObject($Type$RateState.class); final long[] timestamps; // descending order final $type$[] values; + // the timestamps and values arrays might have collapsed to fewer values than the actual sample count + int sampleCount = 0; double reset = 0; $Type$RateState(int initialSize) { @@ -94,6 +98,7 @@ public class Rate$Type$Aggregator { $Type$RateState(long[] ts, $type$[] vs) { this.timestamps = ts; this.values = vs; + this.sampleCount = values.length; } private $type$ dv($type$ v0, $type$ v1) { @@ -107,6 +112,7 @@ public class Rate$Type$Aggregator { reset += dv(v, values[1]) + dv(values[1], values[0]) - dv(v, values[0]); timestamps[1] = t; values[1] = v; + sampleCount++; } int entries() { @@ -163,7 +169,7 @@ public class Rate$Type$Aggregator { } } - void combine(int groupId, LongBlock timestamps, $Type$Block values, double reset, int otherPosition) { + void combine(int groupId, LongBlock timestamps, $Type$Block values, int sampleCount, double reset, int otherPosition) { final int valueCount = timestamps.getValueCount(otherPosition); if (valueCount == 0) { return; @@ -175,6 +181,7 @@ public class Rate$Type$Aggregator { adjustBreaker($Type$RateState.bytesUsed(valueCount)); state = new $Type$RateState(valueCount); state.reset = reset; + state.sampleCount = sampleCount; states.set(groupId, state); // TODO: add bulk_copy to Block for (int i = 0; i < valueCount; i++) { @@ -185,6 +192,7 @@ public class Rate$Type$Aggregator { adjustBreaker($Type$RateState.bytesUsed(state.entries() + valueCount)); var newState = new $Type$RateState(state.entries() + valueCount); newState.reset = state.reset + reset; + newState.sampleCount = state.sampleCount + sampleCount; states.set(groupId, newState); merge(state, newState, firstIndex, valueCount, timestamps, values); adjustBreaker(-$Type$RateState.bytesUsed(state.entries())); // old state @@ -232,6 +240,7 @@ public class Rate$Type$Aggregator { adjustBreaker($Type$RateState.bytesUsed(len)); curr = new $Type$RateState(Arrays.copyOf(other.timestamps, len), Arrays.copyOf(other.values, len)); curr.reset = other.reset; + curr.sampleCount = other.sampleCount; states.set(groupId, curr); } else { states.set(groupId, mergeState(curr, other)); @@ -243,6 +252,7 @@ public class Rate$Type$Aggregator { adjustBreaker($Type$RateState.bytesUsed(newLen)); var dst = new $Type$RateState(newLen); dst.reset = s1.reset + s2.reset; + dst.sampleCount = s1.sampleCount + s2.sampleCount; int i = 0, j = 0, k = 0; while (i < s1.entries() && j < s2.entries()) { if (s1.timestamps[i] > s2.timestamps[j]) { @@ -281,6 +291,7 @@ public class Rate$Type$Aggregator { try ( LongBlock.Builder timestamps = blockFactory.newLongBlockBuilder(positionCount * 2); $Type$Block.Builder values = blockFactory.new$Type$BlockBuilder(positionCount * 2); + IntVector.FixedBuilder sampleCounts = blockFactory.newIntVectorFixedBuilder(positionCount); DoubleVector.FixedBuilder resets = blockFactory.newDoubleVectorFixedBuilder(positionCount) ) { for (int i = 0; i < positionCount; i++) { @@ -298,45 +309,91 @@ public class Rate$Type$Aggregator { values.append$Type$(v); } values.endPositionEntry(); - + sampleCounts.appendInt(i, state.sampleCount); resets.appendDouble(i, state.reset); } else { timestamps.appendNull(); values.appendNull(); + sampleCounts.appendInt(i, 0); resets.appendDouble(i, 0); } } blocks[offset] = timestamps.build(); blocks[offset + 1] = values.build(); - blocks[offset + 2] = resets.build().asBlock(); + blocks[offset + 2] = sampleCounts.build().asBlock(); + blocks[offset + 3] = resets.build().asBlock(); } } - Block evaluateFinal(IntVector selected, BlockFactory blockFactory) { + private static double computeRate($Type$RateState state, long unitInMillis) { + final int len = state.entries(); + final long firstTS = state.timestamps[state.timestamps.length - 1]; + final long lastTS = state.timestamps[0]; + double reset = state.reset; + for (int i = 1; i < len; i++) { + if (state.values[i - 1] < state.values[i]) { + reset += state.values[i]; + } + } + final double firstValue = state.values[len - 1]; + final double lastValue = state.values[0] + reset; + return (lastValue - firstValue) * unitInMillis / (lastTS - firstTS); + } + + private static double computeRate($Type$RateState state, long rangeStart, long rangeEnd, long unitInMillis) { + final int len = state.entries(); + final long firstTS = state.timestamps[state.timestamps.length - 1]; + final long lastTS = state.timestamps[0]; + double reset = state.reset; + for (int i = 1; i < len; i++) { + if (state.values[i - 1] < state.values[i]) { + reset += state.values[i]; + } + } + double firstValue = state.values[len - 1]; + double lastValue = state.values[0] + reset; + final double sampleTS = lastTS - firstTS; + final double averageSampleInterval = sampleTS / state.sampleCount; + final double slope = (lastValue - firstValue) / sampleTS; + double startGap = firstTS - rangeStart; + if (startGap > 0) { + if (startGap > averageSampleInterval * 1.1) { + startGap = averageSampleInterval / 2.0; // limit to half of the average of the sample interval + } + firstValue = Math.max(0.0, firstValue - startGap * slope); + } + double endGap = rangeEnd - lastTS; + if (endGap > 0) { + if (endGap > averageSampleInterval * 1.1) { + endGap = averageSampleInterval / 2.0; + } + lastValue = lastValue + endGap * slope; + } + return (lastValue - firstValue) * unitInMillis / (rangeEnd - rangeStart); + } + + Block evaluateFinal(IntVector selected, GroupingAggregatorEvaluationContext evalContext) { int positionCount = selected.getPositionCount(); - try (DoubleBlock.Builder rates = blockFactory.newDoubleBlockBuilder(positionCount)) { + try (DoubleBlock.Builder rates = evalContext.blockFactory().newDoubleBlockBuilder(positionCount)) { for (int p = 0; p < positionCount; p++) { final var groupId = selected.getInt(p); final var state = groupId < states.size() ? states.get(groupId) : null; - if (state == null) { + if (state == null || state.sampleCount < 2) { rates.appendNull(); continue; } int len = state.entries(); - long dt = state.timestamps[0] - state.timestamps[len - 1]; - if (dt == 0) { - // TODO: maybe issue warning when we don't have enough sample? - rates.appendNull(); + final double rate; + if (evalContext instanceof TimeSeriesGroupingAggregatorEvaluationContext tsContext) { + rate = computeRate( + state, + tsContext.rangeStartInMillis(groupId), + tsContext.rangeEndInMillis(groupId), + unitInMillis); } else { - double reset = state.reset; - for (int i = 1; i < len; i++) { - if (state.values[i - 1] < state.values[i]) { - reset += state.values[i]; - } - } - double dv = state.values[0] - state.values[len - 1] + reset; - rates.appendDouble(dv * unitInMillis / dt); + rate = computeRate(state, unitInMillis); } + rates.appendDouble(rate); } return rates.build(); } diff --git a/x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/operator/HashAggregationOperator.java b/x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/operator/HashAggregationOperator.java index e40683948a966..75d5a5bc51323 100644 --- a/x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/operator/HashAggregationOperator.java +++ b/x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/operator/HashAggregationOperator.java @@ -86,7 +86,7 @@ public String describe() { private final List aggregators; - private final DriverContext driverContext; + protected final DriverContext driverContext; /** * Nanoseconds this operator has spent hashing grouping keys. @@ -226,7 +226,7 @@ public void finish() { blocks = new Block[keys.length + Arrays.stream(aggBlockCounts).sum()]; System.arraycopy(keys, 0, blocks, 0, keys.length); int offset = keys.length; - var evaluationContext = new GroupingAggregatorEvaluationContext(driverContext); + var evaluationContext = evaluationContext(keys); for (int i = 0; i < aggregators.size(); i++) { var aggregator = aggregators.get(i); aggregator.evaluate(blocks, offset, selected, evaluationContext); @@ -245,6 +245,10 @@ public void finish() { } } + protected GroupingAggregatorEvaluationContext evaluationContext(Block[] keys) { + return new GroupingAggregatorEvaluationContext(driverContext); + } + @Override public boolean isFinished() { return finished && output == null; diff --git a/x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/operator/TimeSeriesAggregationOperator.java b/x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/operator/TimeSeriesAggregationOperator.java index 736e88aa3cf82..be79cb48ef867 100644 --- a/x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/operator/TimeSeriesAggregationOperator.java +++ b/x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/operator/TimeSeriesAggregationOperator.java @@ -11,7 +11,12 @@ import org.elasticsearch.compute.Describable; import org.elasticsearch.compute.aggregation.AggregatorMode; import org.elasticsearch.compute.aggregation.GroupingAggregator; +import org.elasticsearch.compute.aggregation.GroupingAggregatorEvaluationContext; +import org.elasticsearch.compute.aggregation.TimeSeriesGroupingAggregatorEvaluationContext; import org.elasticsearch.compute.aggregation.blockhash.BlockHash; +import org.elasticsearch.compute.data.Block; +import org.elasticsearch.compute.data.ElementType; +import org.elasticsearch.compute.data.LongBlock; import java.util.List; import java.util.function.Supplier; @@ -67,4 +72,23 @@ public TimeSeriesAggregationOperator( super(aggregators, blockHash, driverContext); this.timeBucket = timeBucket; } + + @Override + protected GroupingAggregatorEvaluationContext evaluationContext(Block[] keys) { + if (keys.length < 2) { + return super.evaluationContext(keys); + } + final LongBlock timestamps = keys[0].elementType() == ElementType.LONG ? (LongBlock) keys[0] : (LongBlock) keys[1]; + return new TimeSeriesGroupingAggregatorEvaluationContext(driverContext) { + @Override + public long rangeStartInMillis(int groupId) { + return timestamps.getLong(groupId); + } + + @Override + public long rangeEndInMillis(int groupId) { + return timeBucket.nextRoundingValue(timestamps.getLong(groupId)); + } + }; + } } From 7c91bee8629f48ada6171d9cd7ed371ce4f42bb3 Mon Sep 17 00:00:00 2001 From: Nhat Nguyen Date: Fri, 4 Apr 2025 09:39:06 -0700 Subject: [PATCH 2/6] Add test --- .../aggregation/RateDoubleAggregator.java | 29 +- .../aggregation/RateFloatAggregator.java | 29 +- .../aggregation/RateIntAggregator.java | 29 +- .../aggregation/RateLongAggregator.java | 29 +- .../aggregation/X-RateAggregator.java.st | 29 +- ...imeSeriesAggregationOperatorFactories.java | 164 --------- .../TimeSeriesAggregationOperatorTests.java | 346 ------------------ .../main/resources/k8s-timeseries.csv-spec | 42 +-- .../xpack/esql/action/TimeSeriesIT.java | 2 + 9 files changed, 78 insertions(+), 621 deletions(-) delete mode 100644 x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/operator/TimeSeriesAggregationOperatorFactories.java delete mode 100644 x-pack/plugin/esql/compute/src/test/java/org/elasticsearch/compute/operator/TimeSeriesAggregationOperatorTests.java diff --git a/x-pack/plugin/esql/compute/src/main/generated-src/org/elasticsearch/compute/aggregation/RateDoubleAggregator.java b/x-pack/plugin/esql/compute/src/main/generated-src/org/elasticsearch/compute/aggregation/RateDoubleAggregator.java index f3c5886b1009d..49caa791fd1a0 100644 --- a/x-pack/plugin/esql/compute/src/main/generated-src/org/elasticsearch/compute/aggregation/RateDoubleAggregator.java +++ b/x-pack/plugin/esql/compute/src/main/generated-src/org/elasticsearch/compute/aggregation/RateDoubleAggregator.java @@ -69,11 +69,7 @@ public static void combineStates( current.combineState(currentGroupId, otherState, otherGroupId); } - public static Block evaluateFinal( - DoubleRateGroupingState state, - IntVector selected, - GroupingAggregatorEvaluationContext evalContext - ) { + public static Block evaluateFinal(DoubleRateGroupingState state, IntVector selected, GroupingAggregatorEvaluationContext evalContext) { return state.evaluateFinal(selected, evalContext); } @@ -352,17 +348,18 @@ private static double computeRate(DoubleRateState state, long rangeStart, long r final double slope = (lastValue - firstValue) / sampleTS; double startGap = firstTS - rangeStart; if (startGap > 0) { - if (startGap > averageSampleInterval * 1.1) { - startGap = averageSampleInterval / 2.0; // limit to half of the average of the sample interval - } - firstValue = Math.max(0.0, firstValue - startGap * slope); + if (startGap > averageSampleInterval * 1.1) { + // limit to half of the average sample interval if samples are far from the boundary + startGap = averageSampleInterval / 2.0; + } + firstValue = Math.max(0.0, firstValue - startGap * slope); } double endGap = rangeEnd - lastTS; if (endGap > 0) { - if (endGap > averageSampleInterval * 1.1) { - endGap = averageSampleInterval / 2.0; - } - lastValue = lastValue + endGap * slope; + if (endGap > averageSampleInterval * 1.1) { + endGap = averageSampleInterval / 2.0; + } + lastValue = lastValue + endGap * slope; } return (lastValue - firstValue) * unitInMillis / (rangeEnd - rangeStart); } @@ -380,11 +377,7 @@ Block evaluateFinal(IntVector selected, GroupingAggregatorEvaluationContext eval int len = state.entries(); final double rate; if (evalContext instanceof TimeSeriesGroupingAggregatorEvaluationContext tsContext) { - rate = computeRate( - state, - tsContext.rangeStartInMillis(groupId), - tsContext.rangeEndInMillis(groupId), - unitInMillis); + rate = computeRate(state, tsContext.rangeStartInMillis(groupId), tsContext.rangeEndInMillis(groupId), unitInMillis); } else { rate = computeRate(state, unitInMillis); } diff --git a/x-pack/plugin/esql/compute/src/main/generated-src/org/elasticsearch/compute/aggregation/RateFloatAggregator.java b/x-pack/plugin/esql/compute/src/main/generated-src/org/elasticsearch/compute/aggregation/RateFloatAggregator.java index 207bfa9e885a3..1198305ea6863 100644 --- a/x-pack/plugin/esql/compute/src/main/generated-src/org/elasticsearch/compute/aggregation/RateFloatAggregator.java +++ b/x-pack/plugin/esql/compute/src/main/generated-src/org/elasticsearch/compute/aggregation/RateFloatAggregator.java @@ -70,11 +70,7 @@ public static void combineStates( current.combineState(currentGroupId, otherState, otherGroupId); } - public static Block evaluateFinal( - FloatRateGroupingState state, - IntVector selected, - GroupingAggregatorEvaluationContext evalContext - ) { + public static Block evaluateFinal(FloatRateGroupingState state, IntVector selected, GroupingAggregatorEvaluationContext evalContext) { return state.evaluateFinal(selected, evalContext); } @@ -353,17 +349,18 @@ private static double computeRate(FloatRateState state, long rangeStart, long ra final double slope = (lastValue - firstValue) / sampleTS; double startGap = firstTS - rangeStart; if (startGap > 0) { - if (startGap > averageSampleInterval * 1.1) { - startGap = averageSampleInterval / 2.0; // limit to half of the average of the sample interval - } - firstValue = Math.max(0.0, firstValue - startGap * slope); + if (startGap > averageSampleInterval * 1.1) { + // limit to half of the average sample interval if samples are far from the boundary + startGap = averageSampleInterval / 2.0; + } + firstValue = Math.max(0.0, firstValue - startGap * slope); } double endGap = rangeEnd - lastTS; if (endGap > 0) { - if (endGap > averageSampleInterval * 1.1) { - endGap = averageSampleInterval / 2.0; - } - lastValue = lastValue + endGap * slope; + if (endGap > averageSampleInterval * 1.1) { + endGap = averageSampleInterval / 2.0; + } + lastValue = lastValue + endGap * slope; } return (lastValue - firstValue) * unitInMillis / (rangeEnd - rangeStart); } @@ -381,11 +378,7 @@ Block evaluateFinal(IntVector selected, GroupingAggregatorEvaluationContext eval int len = state.entries(); final double rate; if (evalContext instanceof TimeSeriesGroupingAggregatorEvaluationContext tsContext) { - rate = computeRate( - state, - tsContext.rangeStartInMillis(groupId), - tsContext.rangeEndInMillis(groupId), - unitInMillis); + rate = computeRate(state, tsContext.rangeStartInMillis(groupId), tsContext.rangeEndInMillis(groupId), unitInMillis); } else { rate = computeRate(state, unitInMillis); } diff --git a/x-pack/plugin/esql/compute/src/main/generated-src/org/elasticsearch/compute/aggregation/RateIntAggregator.java b/x-pack/plugin/esql/compute/src/main/generated-src/org/elasticsearch/compute/aggregation/RateIntAggregator.java index 97940fbef7dad..083619c8bfd28 100644 --- a/x-pack/plugin/esql/compute/src/main/generated-src/org/elasticsearch/compute/aggregation/RateIntAggregator.java +++ b/x-pack/plugin/esql/compute/src/main/generated-src/org/elasticsearch/compute/aggregation/RateIntAggregator.java @@ -70,11 +70,7 @@ public static void combineStates( current.combineState(currentGroupId, otherState, otherGroupId); } - public static Block evaluateFinal( - IntRateGroupingState state, - IntVector selected, - GroupingAggregatorEvaluationContext evalContext - ) { + public static Block evaluateFinal(IntRateGroupingState state, IntVector selected, GroupingAggregatorEvaluationContext evalContext) { return state.evaluateFinal(selected, evalContext); } @@ -353,17 +349,18 @@ private static double computeRate(IntRateState state, long rangeStart, long rang final double slope = (lastValue - firstValue) / sampleTS; double startGap = firstTS - rangeStart; if (startGap > 0) { - if (startGap > averageSampleInterval * 1.1) { - startGap = averageSampleInterval / 2.0; // limit to half of the average of the sample interval - } - firstValue = Math.max(0.0, firstValue - startGap * slope); + if (startGap > averageSampleInterval * 1.1) { + // limit to half of the average sample interval if samples are far from the boundary + startGap = averageSampleInterval / 2.0; + } + firstValue = Math.max(0.0, firstValue - startGap * slope); } double endGap = rangeEnd - lastTS; if (endGap > 0) { - if (endGap > averageSampleInterval * 1.1) { - endGap = averageSampleInterval / 2.0; - } - lastValue = lastValue + endGap * slope; + if (endGap > averageSampleInterval * 1.1) { + endGap = averageSampleInterval / 2.0; + } + lastValue = lastValue + endGap * slope; } return (lastValue - firstValue) * unitInMillis / (rangeEnd - rangeStart); } @@ -381,11 +378,7 @@ Block evaluateFinal(IntVector selected, GroupingAggregatorEvaluationContext eval int len = state.entries(); final double rate; if (evalContext instanceof TimeSeriesGroupingAggregatorEvaluationContext tsContext) { - rate = computeRate( - state, - tsContext.rangeStartInMillis(groupId), - tsContext.rangeEndInMillis(groupId), - unitInMillis); + rate = computeRate(state, tsContext.rangeStartInMillis(groupId), tsContext.rangeEndInMillis(groupId), unitInMillis); } else { rate = computeRate(state, unitInMillis); } diff --git a/x-pack/plugin/esql/compute/src/main/generated-src/org/elasticsearch/compute/aggregation/RateLongAggregator.java b/x-pack/plugin/esql/compute/src/main/generated-src/org/elasticsearch/compute/aggregation/RateLongAggregator.java index 68186d18c49e4..d9c5808b5deaf 100644 --- a/x-pack/plugin/esql/compute/src/main/generated-src/org/elasticsearch/compute/aggregation/RateLongAggregator.java +++ b/x-pack/plugin/esql/compute/src/main/generated-src/org/elasticsearch/compute/aggregation/RateLongAggregator.java @@ -69,11 +69,7 @@ public static void combineStates( current.combineState(currentGroupId, otherState, otherGroupId); } - public static Block evaluateFinal( - LongRateGroupingState state, - IntVector selected, - GroupingAggregatorEvaluationContext evalContext - ) { + public static Block evaluateFinal(LongRateGroupingState state, IntVector selected, GroupingAggregatorEvaluationContext evalContext) { return state.evaluateFinal(selected, evalContext); } @@ -352,17 +348,18 @@ private static double computeRate(LongRateState state, long rangeStart, long ran final double slope = (lastValue - firstValue) / sampleTS; double startGap = firstTS - rangeStart; if (startGap > 0) { - if (startGap > averageSampleInterval * 1.1) { - startGap = averageSampleInterval / 2.0; // limit to half of the average of the sample interval - } - firstValue = Math.max(0.0, firstValue - startGap * slope); + if (startGap > averageSampleInterval * 1.1) { + // limit to half of the average sample interval if samples are far from the boundary + startGap = averageSampleInterval / 2.0; + } + firstValue = Math.max(0.0, firstValue - startGap * slope); } double endGap = rangeEnd - lastTS; if (endGap > 0) { - if (endGap > averageSampleInterval * 1.1) { - endGap = averageSampleInterval / 2.0; - } - lastValue = lastValue + endGap * slope; + if (endGap > averageSampleInterval * 1.1) { + endGap = averageSampleInterval / 2.0; + } + lastValue = lastValue + endGap * slope; } return (lastValue - firstValue) * unitInMillis / (rangeEnd - rangeStart); } @@ -380,11 +377,7 @@ Block evaluateFinal(IntVector selected, GroupingAggregatorEvaluationContext eval int len = state.entries(); final double rate; if (evalContext instanceof TimeSeriesGroupingAggregatorEvaluationContext tsContext) { - rate = computeRate( - state, - tsContext.rangeStartInMillis(groupId), - tsContext.rangeEndInMillis(groupId), - unitInMillis); + rate = computeRate(state, tsContext.rangeStartInMillis(groupId), tsContext.rangeEndInMillis(groupId), unitInMillis); } else { rate = computeRate(state, unitInMillis); } diff --git a/x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/aggregation/X-RateAggregator.java.st b/x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/aggregation/X-RateAggregator.java.st index 3eb12f5149163..dec08d0d6cdd7 100644 --- a/x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/aggregation/X-RateAggregator.java.st +++ b/x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/aggregation/X-RateAggregator.java.st @@ -74,11 +74,7 @@ public class Rate$Type$Aggregator { current.combineState(currentGroupId, otherState, otherGroupId); } - public static Block evaluateFinal( - $Type$RateGroupingState state, - IntVector selected, - GroupingAggregatorEvaluationContext evalContext - ) { + public static Block evaluateFinal($Type$RateGroupingState state, IntVector selected, GroupingAggregatorEvaluationContext evalContext) { return state.evaluateFinal(selected, evalContext); } @@ -357,17 +353,18 @@ public class Rate$Type$Aggregator { final double slope = (lastValue - firstValue) / sampleTS; double startGap = firstTS - rangeStart; if (startGap > 0) { - if (startGap > averageSampleInterval * 1.1) { - startGap = averageSampleInterval / 2.0; // limit to half of the average of the sample interval - } - firstValue = Math.max(0.0, firstValue - startGap * slope); + if (startGap > averageSampleInterval * 1.1) { + // limit to half of the average sample interval if samples are far from the boundary + startGap = averageSampleInterval / 2.0; + } + firstValue = Math.max(0.0, firstValue - startGap * slope); } double endGap = rangeEnd - lastTS; if (endGap > 0) { - if (endGap > averageSampleInterval * 1.1) { - endGap = averageSampleInterval / 2.0; - } - lastValue = lastValue + endGap * slope; + if (endGap > averageSampleInterval * 1.1) { + endGap = averageSampleInterval / 2.0; + } + lastValue = lastValue + endGap * slope; } return (lastValue - firstValue) * unitInMillis / (rangeEnd - rangeStart); } @@ -385,11 +382,7 @@ public class Rate$Type$Aggregator { int len = state.entries(); final double rate; if (evalContext instanceof TimeSeriesGroupingAggregatorEvaluationContext tsContext) { - rate = computeRate( - state, - tsContext.rangeStartInMillis(groupId), - tsContext.rangeEndInMillis(groupId), - unitInMillis); + rate = computeRate(state, tsContext.rangeStartInMillis(groupId), tsContext.rangeEndInMillis(groupId), unitInMillis); } else { rate = computeRate(state, unitInMillis); } diff --git a/x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/operator/TimeSeriesAggregationOperatorFactories.java b/x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/operator/TimeSeriesAggregationOperatorFactories.java deleted file mode 100644 index e6b30a4797d45..0000000000000 --- a/x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/operator/TimeSeriesAggregationOperatorFactories.java +++ /dev/null @@ -1,164 +0,0 @@ -/* - * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one - * or more contributor license agreements. Licensed under the Elastic License - * 2.0; you may not use this file except in compliance with the Elastic License - * 2.0. - */ - -package org.elasticsearch.compute.operator; - -import org.elasticsearch.common.Rounding; -import org.elasticsearch.compute.aggregation.AggregatorFunctionSupplier; -import org.elasticsearch.compute.aggregation.AggregatorMode; -import org.elasticsearch.compute.aggregation.GroupingAggregator; -import org.elasticsearch.compute.aggregation.blockhash.BlockHash; -import org.elasticsearch.compute.aggregation.blockhash.TimeSeriesBlockHash; -import org.elasticsearch.compute.data.ElementType; - -import java.util.ArrayList; -import java.util.List; - -/** - * This class provides operator factories for time-series aggregations. - * A time-series aggregation executes in three stages, deviating from the typical two-stage aggregation. - * For example: {@code sum(rate(write_requests)), avg(cpu) BY cluster, time-bucket} - * - * 1. Initial Stage: - * In this stage, a standard hash aggregation is executed, grouped by tsid and time-bucket. - * The {@code values} aggregations are added to collect values of the grouping keys excluding the time-bucket, - * which are then used for final result grouping. - * {@code rate[INITIAL](write_requests), avg[INITIAL](cpu), values[SINGLE](cluster) BY tsid, time-bucket} - * - * 2. Intermediate Stage: - * Equivalent to the final mode of a standard hash aggregation. - * This stage merges and reduces the result of the rate aggregations, - * but merges (without reducing) the results of non-rate aggregations. - * {@code rate[FINAL](write_requests), avg[INTERMEDIATE](cpu), values[SINGLE](cluster) BY tsid, time-bucket} - * - * 3. Final Stage: - * This extra stage performs outer aggregations over the rate results - * and combines the intermediate results of non-rate aggregations using the specified user-defined grouping keys. - * {@code sum[SINGLE](rate_result), avg[FINAL](cpu) BY cluster, bucket} - */ -public final class TimeSeriesAggregationOperatorFactories { - - public record SupplierWithChannels(AggregatorFunctionSupplier supplier, List channels) {} - - public record Initial( - int tsHashChannel, - int timeBucketChannel, - Rounding.Prepared timeBucket, - List groupings, - List rates, - List nonRates, - int maxPageSize - ) implements Operator.OperatorFactory { - @Override - public Operator get(DriverContext driverContext) { - List aggregators = new ArrayList<>(groupings.size() + rates.size() + nonRates.size()); - for (SupplierWithChannels f : rates) { - aggregators.add(f.supplier.groupingAggregatorFactory(AggregatorMode.INITIAL, f.channels)); - } - for (SupplierWithChannels f : nonRates) { - aggregators.add(f.supplier.groupingAggregatorFactory(AggregatorMode.INITIAL, f.channels)); - } - aggregators.addAll(valuesAggregatorForGroupings(groupings, timeBucketChannel)); - return new TimeSeriesAggregationOperator( - timeBucket, - aggregators, - () -> new TimeSeriesBlockHash(tsHashChannel, timeBucketChannel, driverContext.blockFactory()), - driverContext - ); - } - - @Override - public String describe() { - return "TimeSeriesInitialAggregationOperatorFactory"; - } - } - - public record Intermediate( - int tsHashChannel, - int timeBucketChannel, - Rounding.Prepared timeBucket, - List groupings, - List rates, - List nonRates, - int maxPageSize - ) implements Operator.OperatorFactory { - @Override - public Operator get(DriverContext driverContext) { - List aggregators = new ArrayList<>(groupings.size() + rates.size() + nonRates.size()); - for (SupplierWithChannels f : rates) { - aggregators.add(f.supplier.groupingAggregatorFactory(AggregatorMode.FINAL, f.channels)); - } - for (SupplierWithChannels f : nonRates) { - aggregators.add(f.supplier.groupingAggregatorFactory(AggregatorMode.INTERMEDIATE, f.channels)); - } - aggregators.addAll(valuesAggregatorForGroupings(groupings, timeBucketChannel)); - List hashGroups = List.of( - new BlockHash.GroupSpec(tsHashChannel, ElementType.BYTES_REF), - new BlockHash.GroupSpec(timeBucketChannel, ElementType.LONG) - ); - return new TimeSeriesAggregationOperator( - timeBucket, - aggregators, - () -> BlockHash.build(hashGroups, driverContext.blockFactory(), maxPageSize, true), - driverContext - ); - } - - @Override - public String describe() { - return "TimeSeriesIntermediateAggregationOperatorFactory"; - } - } - - public record Final( - List groupings, - List outerRates, - List nonRates, - int maxPageSize - ) implements Operator.OperatorFactory { - @Override - public Operator get(DriverContext driverContext) { - List aggregators = new ArrayList<>(outerRates.size() + nonRates.size()); - for (SupplierWithChannels f : outerRates) { - aggregators.add(f.supplier.groupingAggregatorFactory(AggregatorMode.SINGLE, f.channels)); - } - for (SupplierWithChannels f : nonRates) { - aggregators.add(f.supplier.groupingAggregatorFactory(AggregatorMode.FINAL, f.channels)); - } - return new HashAggregationOperator( - aggregators, - () -> BlockHash.build(groupings, driverContext.blockFactory(), maxPageSize, false), - driverContext - ); - } - - @Override - public String describe() { - return "TimeSeriesFinalAggregationOperatorFactory"; - } - } - - static List valuesAggregatorForGroupings(List groupings, int timeBucketChannel) { - List aggregators = new ArrayList<>(); - for (BlockHash.GroupSpec g : groupings) { - if (g.channel() != timeBucketChannel) { - // TODO: perhaps introduce a specialized aggregator for this? - var aggregatorSupplier = (switch (g.elementType()) { - case BYTES_REF -> new org.elasticsearch.compute.aggregation.ValuesBytesRefAggregatorFunctionSupplier(); - case DOUBLE -> new org.elasticsearch.compute.aggregation.ValuesDoubleAggregatorFunctionSupplier(); - case INT -> new org.elasticsearch.compute.aggregation.ValuesIntAggregatorFunctionSupplier(); - case LONG -> new org.elasticsearch.compute.aggregation.ValuesLongAggregatorFunctionSupplier(); - case BOOLEAN -> new org.elasticsearch.compute.aggregation.ValuesBooleanAggregatorFunctionSupplier(); - case FLOAT, NULL, DOC, COMPOSITE, UNKNOWN -> throw new IllegalArgumentException("unsupported grouping type"); - }); - final List channels = List.of(g.channel()); - aggregators.add(aggregatorSupplier.groupingAggregatorFactory(AggregatorMode.SINGLE, channels)); - } - } - return aggregators; - } -} diff --git a/x-pack/plugin/esql/compute/src/test/java/org/elasticsearch/compute/operator/TimeSeriesAggregationOperatorTests.java b/x-pack/plugin/esql/compute/src/test/java/org/elasticsearch/compute/operator/TimeSeriesAggregationOperatorTests.java deleted file mode 100644 index c1ed7ae510172..0000000000000 --- a/x-pack/plugin/esql/compute/src/test/java/org/elasticsearch/compute/operator/TimeSeriesAggregationOperatorTests.java +++ /dev/null @@ -1,346 +0,0 @@ -/* - * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one - * or more contributor license agreements. Licensed under the Elastic License - * 2.0; you may not use this file except in compliance with the Elastic License - * 2.0. - */ - -package org.elasticsearch.compute.operator; - -import org.apache.lucene.index.IndexReader; -import org.apache.lucene.store.Directory; -import org.apache.lucene.util.BytesRef; -import org.elasticsearch.common.Randomness; -import org.elasticsearch.common.Rounding; -import org.elasticsearch.common.util.CollectionUtils; -import org.elasticsearch.compute.aggregation.RateLongAggregatorFunctionSupplier; -import org.elasticsearch.compute.aggregation.SumDoubleAggregatorFunctionSupplier; -import org.elasticsearch.compute.aggregation.blockhash.BlockHash; -import org.elasticsearch.compute.data.Block; -import org.elasticsearch.compute.data.BlockFactory; -import org.elasticsearch.compute.data.BlockUtils; -import org.elasticsearch.compute.data.ElementType; -import org.elasticsearch.compute.data.LongBlock; -import org.elasticsearch.compute.data.LongVector; -import org.elasticsearch.compute.data.Page; -import org.elasticsearch.compute.lucene.ValuesSourceReaderOperatorTests; -import org.elasticsearch.compute.test.ComputeTestCase; -import org.elasticsearch.compute.test.OperatorTestCase; -import org.elasticsearch.compute.test.TestDriverFactory; -import org.elasticsearch.compute.test.TestResultPageSinkOperator; -import org.elasticsearch.core.IOUtils; -import org.elasticsearch.core.TimeValue; -import org.elasticsearch.index.mapper.KeywordFieldMapper; -import org.elasticsearch.index.mapper.NumberFieldMapper; -import org.junit.After; - -import java.io.IOException; -import java.time.ZoneOffset; -import java.util.ArrayList; -import java.util.List; -import java.util.stream.IntStream; - -import static org.elasticsearch.compute.lucene.TimeSeriesSortedSourceOperatorTests.createTimeSeriesSourceOperator; -import static org.elasticsearch.compute.lucene.TimeSeriesSortedSourceOperatorTests.writeTS; -import static org.elasticsearch.compute.operator.TimeSeriesAggregationOperatorFactories.SupplierWithChannels; -import static org.hamcrest.Matchers.equalTo; - -public class TimeSeriesAggregationOperatorTests extends ComputeTestCase { - - private IndexReader reader = null; - private Directory directory = null; - - @After - public void cleanup() throws IOException { - IOUtils.close(reader, directory); - } - - /** - * A {@link DriverContext} with a nonBreakingBigArrays. - */ - protected DriverContext driverContext() { // TODO make this final once all operators support memory tracking - BlockFactory blockFactory = blockFactory(); - return new DriverContext(blockFactory.bigArrays(), blockFactory); - } - - public void testBasicRate() throws Exception { - long[] v1 = { 1, 1, 3, 0, 2, 9, 21, 3, 7, 7, 9, 12 }; - long[] t1 = { 1, 5, 11, 20, 21, 59, 88, 91, 92, 97, 99, 112 }; - - long[] v2 = { 7, 2, 0, 11, 24, 0, 4, 1, 10, 2 }; - long[] t2 = { 1, 2, 4, 5, 6, 8, 10, 11, 12, 14 }; - - long[] v3 = { 0, 1, 0, 1, 1, 4, 2, 2, 2, 2, 3, 5, 5 }; - long[] t3 = { 2, 3, 5, 7, 8, 9, 10, 12, 14, 15, 18, 20, 22 }; - List pods = List.of( - new Pod("p1", "cluster_1", new Interval(2100, t1, v1)), - new Pod("p2", "cluster_1", new Interval(600, t2, v2)), - new Pod("p3", "cluster_2", new Interval(1100, t3, v3)) - ); - long unit = between(1, 5); - { - List> actual = runRateTest( - pods, - List.of("cluster"), - TimeValue.timeValueMillis(unit), - TimeValue.timeValueMillis(500) - ); - List> expected = List.of( - List.of(new BytesRef("cluster_1"), 35.0 * unit / 111.0 + 42.0 * unit / 13.0), - List.of(new BytesRef("cluster_2"), 10.0 * unit / 20.0) - ); - assertThat(actual, equalTo(expected)); - } - { - List> actual = runRateTest(pods, List.of("pod"), TimeValue.timeValueMillis(unit), TimeValue.timeValueMillis(500)); - List> expected = List.of( - List.of(new BytesRef("p1"), 35.0 * unit / 111.0), - List.of(new BytesRef("p2"), 42.0 * unit / 13.0), - List.of(new BytesRef("p3"), 10.0 * unit / 20.0) - ); - assertThat(actual, equalTo(expected)); - } - { - List> actual = runRateTest( - pods, - List.of("cluster", "bucket"), - TimeValue.timeValueMillis(unit), - TimeValue.timeValueMillis(500) - ); - List> expected = List.of( - List.of(new BytesRef("cluster_1"), 2000L, 35.0 * unit / 111.0), - List.of(new BytesRef("cluster_1"), 500L, 42.0 * unit / 13.0), - List.of(new BytesRef("cluster_2"), 1000L, 10.0 * unit / 20.0) - ); - assertThat(actual, equalTo(expected)); - } - } - - public void testRateWithInterval() throws Exception { - long[] v1 = { 1, 2, 3, 0, 1, 2, 3, 4, 5, 0, 1, 2, 3 }; - long[] t1 = { 0, 10_000, 20_000, 30_000, 40_000, 50_000, 60_000, 70_000, 80_000, 90_000, 100_000, 110_000, 120_000 }; - - long[] v2 = { 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2 }; - long[] t2 = { 0, 10_000, 20_000, 30_000, 40_000, 50_000, 60_000, 70_000, 80_000, 90_000, 100_000, 110_000, 120_000 }; - - long[] v3 = { 2, 4, 8, 16, 32, 64, 128, 256, 512, 1024, 2048, 4096, 8192 }; - long[] t3 = { 0, 10_000, 20_000, 30_000, 40_000, 50_000, 60_000, 70_000, 80_000, 90_000, 100_000, 110_000, 120_000 }; - List pods = List.of( - new Pod("p1", "cluster_1", new Interval(0, t1, v1)), - new Pod("p2", "cluster_2", new Interval(0, t2, v2)), - new Pod("p3", "cluster_2", new Interval(0, t3, v3)) - ); - List> actual = runRateTest( - pods, - List.of("pod", "bucket"), - TimeValue.timeValueMillis(1), - TimeValue.timeValueMinutes(1) - ); - List> expected = List.of( - List.of(new BytesRef("p1]"), 120_000L, 0.0D), - List.of(new BytesRef("p1"), 60_000L, 8.0E-5D), - List.of(new BytesRef("p1"), 0, 8.0E-5D), - List.of(new BytesRef("p2"), 120_000L, 0.0D), - List.of(new BytesRef("p2"), 60_000L, 0.0D), - List.of(new BytesRef("p2"), 0L, 0.0D), - List.of(new BytesRef("p3"), 120_000L, 0.0D), - List.of(new BytesRef("p3"), 60_000L, 0.07936D), - List.of(new BytesRef("p3"), 0L, 0.00124D) - ); - } - - public void testRandomRate() throws Exception { - int numPods = between(1, 10); - List pods = new ArrayList<>(); - TimeValue unit = TimeValue.timeValueSeconds(1); - List> expected = new ArrayList<>(); - for (int p = 0; p < numPods; p++) { - int numIntervals = randomIntBetween(1, 3); - Interval[] intervals = new Interval[numIntervals]; - long startTimeInHours = between(10, 100); - String podName = "p" + p; - for (int interval = 0; interval < numIntervals; interval++) { - final long startInterval = TimeValue.timeValueHours(--startTimeInHours).millis(); - int numValues = between(2, 100); - long[] values = new long[numValues]; - long[] times = new long[numValues]; - long delta = 0; - for (int i = 0; i < numValues; i++) { - values[i] = randomIntBetween(0, 100); - delta += TimeValue.timeValueSeconds(between(1, 10)).millis(); - times[i] = delta; - } - intervals[interval] = new Interval(startInterval, times, values); - if (numValues == 1) { - expected.add(List.of(new BytesRef(podName), startInterval, null)); - } else { - expected.add(List.of(new BytesRef(podName), startInterval, intervals[interval].expectedRate(unit))); - } - } - Pod pod = new Pod(podName, "cluster", intervals); - pods.add(pod); - } - List> actual = runRateTest(pods, List.of("pod", "bucket"), unit, TimeValue.timeValueHours(1)); - assertThat(actual, equalTo(expected)); - } - - record Interval(long offset, long[] times, long[] values) { - double expectedRate(TimeValue unit) { - double dv = 0; - for (int v = 0; v < values.length - 1; v++) { - if (values[v + 1] < values[v]) { - dv += values[v]; - } - } - dv += (values[values.length - 1] - values[0]); - long dt = times[times.length - 1] - times[0]; - return (dv * unit.millis()) / dt; - } - } - - record Pod(String name, String cluster, Interval... intervals) {} - - List> runRateTest(List pods, List groupings, TimeValue unit, TimeValue bucketInterval) throws IOException { - cleanup(); - directory = newDirectory(); - long unitInMillis = unit.millis(); - record Doc(String pod, String cluster, long timestamp, long requests) { - - } - var sourceOperatorFactory = createTimeSeriesSourceOperator( - directory, - r -> this.reader = r, - Integer.MAX_VALUE, - between(1, 100), - randomBoolean(), - writer -> { - List docs = new ArrayList<>(); - for (Pod pod : pods) { - for (Interval interval : pod.intervals) { - for (int i = 0; i < interval.times.length; i++) { - docs.add(new Doc(pod.name, pod.cluster, interval.offset + interval.times[i], interval.values[i])); - } - } - } - Randomness.shuffle(docs); - for (Doc doc : docs) { - writeTS( - writer, - doc.timestamp, - new Object[] { "pod", doc.pod, "cluster", doc.cluster }, - new Object[] { "requests", doc.requests } - ); - } - return docs.size(); - } - ); - var ctx = driverContext(); - - List intermediateOperators = new ArrayList<>(); - final Rounding.Prepared rounding = new Rounding.Builder(bucketInterval).timeZone(ZoneOffset.UTC).build().prepareForUnknown(); - var timeBucket = new EvalOperator(ctx.blockFactory(), new EvalOperator.ExpressionEvaluator() { - @Override - public Block eval(Page page) { - LongBlock timestampsBlock = page.getBlock(2); - LongVector timestamps = timestampsBlock.asVector(); - try (var builder = blockFactory().newLongVectorFixedBuilder(timestamps.getPositionCount())) { - for (int i = 0; i < timestamps.getPositionCount(); i++) { - builder.appendLong(rounding.round(timestampsBlock.getLong(i))); - } - return builder.build().asBlock(); - } - } - - @Override - public void close() { - - } - }); - intermediateOperators.add(timeBucket); - var rateField = new NumberFieldMapper.NumberFieldType("requests", NumberFieldMapper.NumberType.LONG); - Operator extractRate = (ValuesSourceReaderOperatorTests.factory(reader, rateField, ElementType.LONG).get(ctx)); - intermediateOperators.add(extractRate); - List nonBucketGroupings = new ArrayList<>(groupings); - nonBucketGroupings.remove("bucket"); - for (String grouping : nonBucketGroupings) { - var groupingField = new KeywordFieldMapper.KeywordFieldType(grouping); - intermediateOperators.add(ValuesSourceReaderOperatorTests.factory(reader, groupingField, ElementType.BYTES_REF).get(ctx)); - } - // _doc, tsid, timestamp, bucket, requests, grouping1, grouping2 - Operator intialAgg = new TimeSeriesAggregationOperatorFactories.Initial( - 1, - 3, - rounding, - IntStream.range(0, nonBucketGroupings.size()).mapToObj(n -> new BlockHash.GroupSpec(5 + n, ElementType.BYTES_REF)).toList(), - List.of(new SupplierWithChannels(new RateLongAggregatorFunctionSupplier(unitInMillis), List.of(4, 2))), - List.of(), - between(1, 100) - ).get(ctx); - - // tsid, bucket, rate[0][0],rate[0][1],rate[0][2], grouping1, grouping2 - Operator intermediateAgg = new TimeSeriesAggregationOperatorFactories.Intermediate( - 0, - 1, - rounding, - IntStream.range(0, nonBucketGroupings.size()).mapToObj(n -> new BlockHash.GroupSpec(5 + n, ElementType.BYTES_REF)).toList(), - List.of(new SupplierWithChannels(new RateLongAggregatorFunctionSupplier(unitInMillis), List.of(2, 3, 4))), - List.of(), - between(1, 100) - ).get(ctx); - // tsid, bucket, rate, grouping1, grouping2 - List finalGroups = new ArrayList<>(); - int groupChannel = 3; - for (String grouping : groupings) { - if (grouping.equals("bucket")) { - finalGroups.add(new BlockHash.GroupSpec(1, ElementType.LONG)); - } else { - finalGroups.add(new BlockHash.GroupSpec(groupChannel++, ElementType.BYTES_REF)); - } - } - Operator finalAgg = new TimeSeriesAggregationOperatorFactories.Final( - finalGroups, - List.of(new SupplierWithChannels(new SumDoubleAggregatorFunctionSupplier(), List.of(2))), - List.of(), - between(1, 100) - ).get(ctx); - - List results = new ArrayList<>(); - OperatorTestCase.runDriver( - TestDriverFactory.create( - ctx, - sourceOperatorFactory.get(ctx), - CollectionUtils.concatLists(intermediateOperators, List.of(intialAgg, intermediateAgg, finalAgg)), - new TestResultPageSinkOperator(results::add) - ) - ); - List> values = new ArrayList<>(); - for (Page result : results) { - for (int p = 0; p < result.getPositionCount(); p++) { - int blockCount = result.getBlockCount(); - List row = new ArrayList<>(); - for (int b = 0; b < blockCount; b++) { - row.add(BlockUtils.toJavaObject(result.getBlock(b), p)); - } - values.add(row); - } - result.releaseBlocks(); - } - values.sort((v1, v2) -> { - for (int i = 0; i < v1.size(); i++) { - if (v1.get(i) instanceof BytesRef b1) { - int cmp = b1.compareTo((BytesRef) v2.get(i)); - if (cmp != 0) { - return cmp; - } - } else if (v1.get(i) instanceof Long b1) { - int cmp = b1.compareTo((Long) v2.get(i)); - if (cmp != 0) { - return -cmp; - } - } - } - return 0; - }); - return values; - } -} diff --git a/x-pack/plugin/esql/qa/testFixtures/src/main/resources/k8s-timeseries.csv-spec b/x-pack/plugin/esql/qa/testFixtures/src/main/resources/k8s-timeseries.csv-spec index 8ae06eef71df9..ac3157a082bcb 100644 --- a/x-pack/plugin/esql/qa/testFixtures/src/main/resources/k8s-timeseries.csv-spec +++ b/x-pack/plugin/esql/qa/testFixtures/src/main/resources/k8s-timeseries.csv-spec @@ -77,7 +77,7 @@ required_capability: metrics_command TS k8s | STATS max(rate(network.total_bytes_in)) BY time_bucket = bucket(@timestamp,5minute) | SORT time_bucket DESC | LIMIT 2; max(rate(network.total_bytes_in)): double | time_bucket:date -10.594594594594595 | 2024-05-10T00:20:00.000Z +6.980660660660663 | 2024-05-10T00:20:00.000Z 23.702205882352942 | 2024-05-10T00:15:00.000Z ; @@ -85,10 +85,10 @@ twoRatesWithBucket required_capability: metrics_command TS k8s | STATS max(rate(network.total_bytes_in)), sum(rate(network.total_bytes_in)) BY time_bucket = bucket(@timestamp,5minute) | SORT time_bucket DESC | LIMIT 3; -max(rate(network.total_bytes_in)): double | sum(rate(network.total_bytes_in)): double | time_bucket:date -10.594594594594595 | 42.70864495221802 | 2024-05-10T00:20:00.000Z -23.702205882352942 | 112.36715680313907 | 2024-05-10T00:15:00.000Z -17.90625 | 85.18387414067914 | 2024-05-10T00:10:00.000Z +max(rate(network.total_bytes_in)):double | sum(rate(network.total_bytes_in)):double | time_bucket:datetime +6.980660660660663 | 23.959973363184154 | 2024-05-10T00:20:00.000Z +23.702205882352942 | 94.9517511187984 | 2024-05-10T00:15:00.000Z +14.97039381153305 | 63.00652190262822 | 2024-05-10T00:10:00.000Z ; @@ -96,13 +96,13 @@ oneRateWithBucketAndCluster required_capability: metrics_command TS k8s | STATS max(rate(network.total_bytes_in)) BY time_bucket = bucket(@timestamp,5minute), cluster | SORT time_bucket DESC, cluster | LIMIT 6; -max(rate(network.total_bytes_in)): double | time_bucket:date | cluster: keyword -10.594594594594595 | 2024-05-10T00:20:00.000Z | prod -5.586206896551724 | 2024-05-10T00:20:00.000Z | qa -5.37037037037037 | 2024-05-10T00:20:00.000Z | staging -15.913978494623656 | 2024-05-10T00:15:00.000Z | prod -23.702205882352942 | 2024-05-10T00:15:00.000Z | qa -9.823232323232324 | 2024-05-10T00:15:00.000Z | staging +max(rate(network.total_bytes_in)):double | time_bucket:datetime | cluster:keyword +6.980660660660663 | 2024-05-10T00:20:00.000Z | prod +4.05 | 2024-05-10T00:20:00.000Z | qa +3.19 | 2024-05-10T00:20:00.000Z | staging +11.860805860805861 | 2024-05-10T00:15:00.000Z | prod +23.702205882352942 | 2024-05-10T00:15:00.000Z | qa +7.784911616161616 | 2024-05-10T00:15:00.000Z | staging ; BytesAndCostByBucketAndCluster @@ -110,20 +110,20 @@ required_capability: metrics_command TS k8s | STATS max(rate(network.total_bytes_in)), max(network.cost) BY time_bucket = bucket(@timestamp,5minute), cluster | SORT time_bucket DESC, cluster | LIMIT 6; max(rate(network.total_bytes_in)): double | max(network.cost): double | time_bucket:date | cluster: keyword -10.594594594594595 | 10.75 | 2024-05-10T00:20:00.000Z | prod -5.586206896551724 | 11.875 | 2024-05-10T00:20:00.000Z | qa -5.37037037037037 | 9.5 | 2024-05-10T00:20:00.000Z | staging -15.913978494623656 | 12.375 | 2024-05-10T00:15:00.000Z | prod +6.980660660660663 | 10.75 | 2024-05-10T00:20:00.000Z | prod +4.05 | 11.875 | 2024-05-10T00:20:00.000Z | qa +3.19 | 9.5 | 2024-05-10T00:20:00.000Z | staging +11.860805860805861 | 12.375 | 2024-05-10T00:15:00.000Z | prod 23.702205882352942 | 12.125 | 2024-05-10T00:15:00.000Z | qa -9.823232323232324 | 11.5 | 2024-05-10T00:15:00.000Z | staging +7.784911616161616 | 11.5 | 2024-05-10T00:15:00.000Z | staging ; oneRateWithBucketAndClusterThenFilter required_capability: metrics_command TS k8s | WHERE cluster=="prod" | STATS max(rate(network.total_bytes_in)) BY time_bucket = bucket(@timestamp,5minute), cluster | SORT time_bucket DESC | LIMIT 3; -max(rate(network.total_bytes_in)): double | time_bucket:date | cluster: keyword -10.594594594594595 | 2024-05-10T00:20:00.000Z | prod -15.913978494623656 | 2024-05-10T00:15:00.000Z | prod -11.562737642585551 | 2024-05-10T00:10:00.000Z | prod +max(rate(network.total_bytes_in)):double | time_bucket:datetime | cluster:keyword +6.980660660660663 | 2024-05-10T00:20:00.000Z | prod +11.860805860805861 | 2024-05-10T00:15:00.000Z | prod +11.562737642585551 | 2024-05-10T00:10:00.000Z | prod ; diff --git a/x-pack/plugin/esql/src/internalClusterTest/java/org/elasticsearch/xpack/esql/action/TimeSeriesIT.java b/x-pack/plugin/esql/src/internalClusterTest/java/org/elasticsearch/xpack/esql/action/TimeSeriesIT.java index c52148c9a349a..7ae986df28782 100644 --- a/x-pack/plugin/esql/src/internalClusterTest/java/org/elasticsearch/xpack/esql/action/TimeSeriesIT.java +++ b/x-pack/plugin/esql/src/internalClusterTest/java/org/elasticsearch/xpack/esql/action/TimeSeriesIT.java @@ -365,6 +365,7 @@ record RateKey(String cluster, String host) { } } + @AwaitsFix(bugUrl = "removed?") public void testRateWithTimeBucket() { var rounding = new Rounding.Builder(TimeValue.timeValueSeconds(60)).timeZone(ZoneOffset.UTC).build().prepareForUnknown(); record RateKey(String host, String cluster, long interval) {} @@ -459,6 +460,7 @@ record RateKey(String host, String cluster, long interval) {} } } + @AwaitsFix(bugUrl = "removed?") public void testRateWithTimeBucketAndCluster() { var rounding = new Rounding.Builder(TimeValue.timeValueSeconds(60)).timeZone(ZoneOffset.UTC).build().prepareForUnknown(); record RateKey(String host, String cluster, long interval) {} From 03549915493e04f073ccfc71d02a886a6a88819e Mon Sep 17 00:00:00 2001 From: Nhat Nguyen Date: Mon, 7 Apr 2025 10:10:25 -0700 Subject: [PATCH 3/6] Add comment --- .../aggregation/RateDoubleAggregator.java | 18 +++++++++++++----- .../aggregation/RateFloatAggregator.java | 18 +++++++++++++----- .../compute/aggregation/RateIntAggregator.java | 18 +++++++++++++----- .../aggregation/RateLongAggregator.java | 18 +++++++++++++----- .../aggregation/X-RateAggregator.java.st | 18 +++++++++++++----- 5 files changed, 65 insertions(+), 25 deletions(-) diff --git a/x-pack/plugin/esql/compute/src/main/generated-src/org/elasticsearch/compute/aggregation/RateDoubleAggregator.java b/x-pack/plugin/esql/compute/src/main/generated-src/org/elasticsearch/compute/aggregation/RateDoubleAggregator.java index 49caa791fd1a0..7dff7493a5d44 100644 --- a/x-pack/plugin/esql/compute/src/main/generated-src/org/elasticsearch/compute/aggregation/RateDoubleAggregator.java +++ b/x-pack/plugin/esql/compute/src/main/generated-src/org/elasticsearch/compute/aggregation/RateDoubleAggregator.java @@ -316,7 +316,7 @@ public void toIntermediate(Block[] blocks, int offset, IntVector selected, Drive } } - private static double computeRate(DoubleRateState state, long unitInMillis) { + private static double computeRateWithoutExtrapolate(DoubleRateState state, long unitInMillis) { final int len = state.entries(); final long firstTS = state.timestamps[state.timestamps.length - 1]; final long lastTS = state.timestamps[0]; @@ -331,7 +331,16 @@ private static double computeRate(DoubleRateState state, long unitInMillis) { return (lastValue - firstValue) * unitInMillis / (lastTS - firstTS); } - private static double computeRate(DoubleRateState state, long rangeStart, long rangeEnd, long unitInMillis) { + /** + * Credit to PromQL for this extrapolation algorithm: + * If samples are close enough to the rangeStart and rangeEnd, we extrapolate the rate all the way to the boundary in question. + * "Close enough" is defined as "up to 10% more than the average duration between samples within the range". + * Essentially, we assume a more or less regular spacing between samples. If we don't see a sample where we would expect one, + * we assume the series does not cover the whole range but starts and/or ends within the range. + * We still extrapolate the rate in this case, but not all the way to the boundary, only by half of the average duration between + * samples (which is our guess for where the series actually starts or ends). + */ + private static double extrapolateRate(DoubleRateState state, long rangeStart, long rangeEnd, long unitInMillis) { final int len = state.entries(); final long firstTS = state.timestamps[state.timestamps.length - 1]; final long lastTS = state.timestamps[0]; @@ -349,7 +358,6 @@ private static double computeRate(DoubleRateState state, long rangeStart, long r double startGap = firstTS - rangeStart; if (startGap > 0) { if (startGap > averageSampleInterval * 1.1) { - // limit to half of the average sample interval if samples are far from the boundary startGap = averageSampleInterval / 2.0; } firstValue = Math.max(0.0, firstValue - startGap * slope); @@ -377,9 +385,9 @@ Block evaluateFinal(IntVector selected, GroupingAggregatorEvaluationContext eval int len = state.entries(); final double rate; if (evalContext instanceof TimeSeriesGroupingAggregatorEvaluationContext tsContext) { - rate = computeRate(state, tsContext.rangeStartInMillis(groupId), tsContext.rangeEndInMillis(groupId), unitInMillis); + rate = extrapolateRate(state, tsContext.rangeStartInMillis(groupId), tsContext.rangeEndInMillis(groupId), unitInMillis); } else { - rate = computeRate(state, unitInMillis); + rate = computeRateWithoutExtrapolate(state, unitInMillis); } rates.appendDouble(rate); } diff --git a/x-pack/plugin/esql/compute/src/main/generated-src/org/elasticsearch/compute/aggregation/RateFloatAggregator.java b/x-pack/plugin/esql/compute/src/main/generated-src/org/elasticsearch/compute/aggregation/RateFloatAggregator.java index 1198305ea6863..32779503ffea6 100644 --- a/x-pack/plugin/esql/compute/src/main/generated-src/org/elasticsearch/compute/aggregation/RateFloatAggregator.java +++ b/x-pack/plugin/esql/compute/src/main/generated-src/org/elasticsearch/compute/aggregation/RateFloatAggregator.java @@ -317,7 +317,7 @@ public void toIntermediate(Block[] blocks, int offset, IntVector selected, Drive } } - private static double computeRate(FloatRateState state, long unitInMillis) { + private static double computeRateWithoutExtrapolate(FloatRateState state, long unitInMillis) { final int len = state.entries(); final long firstTS = state.timestamps[state.timestamps.length - 1]; final long lastTS = state.timestamps[0]; @@ -332,7 +332,16 @@ private static double computeRate(FloatRateState state, long unitInMillis) { return (lastValue - firstValue) * unitInMillis / (lastTS - firstTS); } - private static double computeRate(FloatRateState state, long rangeStart, long rangeEnd, long unitInMillis) { + /** + * Credit to PromQL for this extrapolation algorithm: + * If samples are close enough to the rangeStart and rangeEnd, we extrapolate the rate all the way to the boundary in question. + * "Close enough" is defined as "up to 10% more than the average duration between samples within the range". + * Essentially, we assume a more or less regular spacing between samples. If we don't see a sample where we would expect one, + * we assume the series does not cover the whole range but starts and/or ends within the range. + * We still extrapolate the rate in this case, but not all the way to the boundary, only by half of the average duration between + * samples (which is our guess for where the series actually starts or ends). + */ + private static double extrapolateRate(FloatRateState state, long rangeStart, long rangeEnd, long unitInMillis) { final int len = state.entries(); final long firstTS = state.timestamps[state.timestamps.length - 1]; final long lastTS = state.timestamps[0]; @@ -350,7 +359,6 @@ private static double computeRate(FloatRateState state, long rangeStart, long ra double startGap = firstTS - rangeStart; if (startGap > 0) { if (startGap > averageSampleInterval * 1.1) { - // limit to half of the average sample interval if samples are far from the boundary startGap = averageSampleInterval / 2.0; } firstValue = Math.max(0.0, firstValue - startGap * slope); @@ -378,9 +386,9 @@ Block evaluateFinal(IntVector selected, GroupingAggregatorEvaluationContext eval int len = state.entries(); final double rate; if (evalContext instanceof TimeSeriesGroupingAggregatorEvaluationContext tsContext) { - rate = computeRate(state, tsContext.rangeStartInMillis(groupId), tsContext.rangeEndInMillis(groupId), unitInMillis); + rate = extrapolateRate(state, tsContext.rangeStartInMillis(groupId), tsContext.rangeEndInMillis(groupId), unitInMillis); } else { - rate = computeRate(state, unitInMillis); + rate = computeRateWithoutExtrapolate(state, unitInMillis); } rates.appendDouble(rate); } diff --git a/x-pack/plugin/esql/compute/src/main/generated-src/org/elasticsearch/compute/aggregation/RateIntAggregator.java b/x-pack/plugin/esql/compute/src/main/generated-src/org/elasticsearch/compute/aggregation/RateIntAggregator.java index 083619c8bfd28..67723ff71333a 100644 --- a/x-pack/plugin/esql/compute/src/main/generated-src/org/elasticsearch/compute/aggregation/RateIntAggregator.java +++ b/x-pack/plugin/esql/compute/src/main/generated-src/org/elasticsearch/compute/aggregation/RateIntAggregator.java @@ -317,7 +317,7 @@ public void toIntermediate(Block[] blocks, int offset, IntVector selected, Drive } } - private static double computeRate(IntRateState state, long unitInMillis) { + private static double computeRateWithoutExtrapolate(IntRateState state, long unitInMillis) { final int len = state.entries(); final long firstTS = state.timestamps[state.timestamps.length - 1]; final long lastTS = state.timestamps[0]; @@ -332,7 +332,16 @@ private static double computeRate(IntRateState state, long unitInMillis) { return (lastValue - firstValue) * unitInMillis / (lastTS - firstTS); } - private static double computeRate(IntRateState state, long rangeStart, long rangeEnd, long unitInMillis) { + /** + * Credit to PromQL for this extrapolation algorithm: + * If samples are close enough to the rangeStart and rangeEnd, we extrapolate the rate all the way to the boundary in question. + * "Close enough" is defined as "up to 10% more than the average duration between samples within the range". + * Essentially, we assume a more or less regular spacing between samples. If we don't see a sample where we would expect one, + * we assume the series does not cover the whole range but starts and/or ends within the range. + * We still extrapolate the rate in this case, but not all the way to the boundary, only by half of the average duration between + * samples (which is our guess for where the series actually starts or ends). + */ + private static double extrapolateRate(IntRateState state, long rangeStart, long rangeEnd, long unitInMillis) { final int len = state.entries(); final long firstTS = state.timestamps[state.timestamps.length - 1]; final long lastTS = state.timestamps[0]; @@ -350,7 +359,6 @@ private static double computeRate(IntRateState state, long rangeStart, long rang double startGap = firstTS - rangeStart; if (startGap > 0) { if (startGap > averageSampleInterval * 1.1) { - // limit to half of the average sample interval if samples are far from the boundary startGap = averageSampleInterval / 2.0; } firstValue = Math.max(0.0, firstValue - startGap * slope); @@ -378,9 +386,9 @@ Block evaluateFinal(IntVector selected, GroupingAggregatorEvaluationContext eval int len = state.entries(); final double rate; if (evalContext instanceof TimeSeriesGroupingAggregatorEvaluationContext tsContext) { - rate = computeRate(state, tsContext.rangeStartInMillis(groupId), tsContext.rangeEndInMillis(groupId), unitInMillis); + rate = extrapolateRate(state, tsContext.rangeStartInMillis(groupId), tsContext.rangeEndInMillis(groupId), unitInMillis); } else { - rate = computeRate(state, unitInMillis); + rate = computeRateWithoutExtrapolate(state, unitInMillis); } rates.appendDouble(rate); } diff --git a/x-pack/plugin/esql/compute/src/main/generated-src/org/elasticsearch/compute/aggregation/RateLongAggregator.java b/x-pack/plugin/esql/compute/src/main/generated-src/org/elasticsearch/compute/aggregation/RateLongAggregator.java index d9c5808b5deaf..67ea7fc185e1a 100644 --- a/x-pack/plugin/esql/compute/src/main/generated-src/org/elasticsearch/compute/aggregation/RateLongAggregator.java +++ b/x-pack/plugin/esql/compute/src/main/generated-src/org/elasticsearch/compute/aggregation/RateLongAggregator.java @@ -316,7 +316,7 @@ public void toIntermediate(Block[] blocks, int offset, IntVector selected, Drive } } - private static double computeRate(LongRateState state, long unitInMillis) { + private static double computeRateWithoutExtrapolate(LongRateState state, long unitInMillis) { final int len = state.entries(); final long firstTS = state.timestamps[state.timestamps.length - 1]; final long lastTS = state.timestamps[0]; @@ -331,7 +331,16 @@ private static double computeRate(LongRateState state, long unitInMillis) { return (lastValue - firstValue) * unitInMillis / (lastTS - firstTS); } - private static double computeRate(LongRateState state, long rangeStart, long rangeEnd, long unitInMillis) { + /** + * Credit to PromQL for this extrapolation algorithm: + * If samples are close enough to the rangeStart and rangeEnd, we extrapolate the rate all the way to the boundary in question. + * "Close enough" is defined as "up to 10% more than the average duration between samples within the range". + * Essentially, we assume a more or less regular spacing between samples. If we don't see a sample where we would expect one, + * we assume the series does not cover the whole range but starts and/or ends within the range. + * We still extrapolate the rate in this case, but not all the way to the boundary, only by half of the average duration between + * samples (which is our guess for where the series actually starts or ends). + */ + private static double extrapolateRate(LongRateState state, long rangeStart, long rangeEnd, long unitInMillis) { final int len = state.entries(); final long firstTS = state.timestamps[state.timestamps.length - 1]; final long lastTS = state.timestamps[0]; @@ -349,7 +358,6 @@ private static double computeRate(LongRateState state, long rangeStart, long ran double startGap = firstTS - rangeStart; if (startGap > 0) { if (startGap > averageSampleInterval * 1.1) { - // limit to half of the average sample interval if samples are far from the boundary startGap = averageSampleInterval / 2.0; } firstValue = Math.max(0.0, firstValue - startGap * slope); @@ -377,9 +385,9 @@ Block evaluateFinal(IntVector selected, GroupingAggregatorEvaluationContext eval int len = state.entries(); final double rate; if (evalContext instanceof TimeSeriesGroupingAggregatorEvaluationContext tsContext) { - rate = computeRate(state, tsContext.rangeStartInMillis(groupId), tsContext.rangeEndInMillis(groupId), unitInMillis); + rate = extrapolateRate(state, tsContext.rangeStartInMillis(groupId), tsContext.rangeEndInMillis(groupId), unitInMillis); } else { - rate = computeRate(state, unitInMillis); + rate = computeRateWithoutExtrapolate(state, unitInMillis); } rates.appendDouble(rate); } diff --git a/x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/aggregation/X-RateAggregator.java.st b/x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/aggregation/X-RateAggregator.java.st index dec08d0d6cdd7..5273a77a53f93 100644 --- a/x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/aggregation/X-RateAggregator.java.st +++ b/x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/aggregation/X-RateAggregator.java.st @@ -321,7 +321,7 @@ public class Rate$Type$Aggregator { } } - private static double computeRate($Type$RateState state, long unitInMillis) { + private static double computeRateWithoutExtrapolate($Type$RateState state, long unitInMillis) { final int len = state.entries(); final long firstTS = state.timestamps[state.timestamps.length - 1]; final long lastTS = state.timestamps[0]; @@ -336,7 +336,16 @@ public class Rate$Type$Aggregator { return (lastValue - firstValue) * unitInMillis / (lastTS - firstTS); } - private static double computeRate($Type$RateState state, long rangeStart, long rangeEnd, long unitInMillis) { + /** + * Credit to PromQL for this extrapolation algorithm: + * If samples are close enough to the rangeStart and rangeEnd, we extrapolate the rate all the way to the boundary in question. + * "Close enough" is defined as "up to 10% more than the average duration between samples within the range". + * Essentially, we assume a more or less regular spacing between samples. If we don't see a sample where we would expect one, + * we assume the series does not cover the whole range but starts and/or ends within the range. + * We still extrapolate the rate in this case, but not all the way to the boundary, only by half of the average duration between + * samples (which is our guess for where the series actually starts or ends). + */ + private static double extrapolateRate($Type$RateState state, long rangeStart, long rangeEnd, long unitInMillis) { final int len = state.entries(); final long firstTS = state.timestamps[state.timestamps.length - 1]; final long lastTS = state.timestamps[0]; @@ -354,7 +363,6 @@ public class Rate$Type$Aggregator { double startGap = firstTS - rangeStart; if (startGap > 0) { if (startGap > averageSampleInterval * 1.1) { - // limit to half of the average sample interval if samples are far from the boundary startGap = averageSampleInterval / 2.0; } firstValue = Math.max(0.0, firstValue - startGap * slope); @@ -382,9 +390,9 @@ public class Rate$Type$Aggregator { int len = state.entries(); final double rate; if (evalContext instanceof TimeSeriesGroupingAggregatorEvaluationContext tsContext) { - rate = computeRate(state, tsContext.rangeStartInMillis(groupId), tsContext.rangeEndInMillis(groupId), unitInMillis); + rate = extrapolateRate(state, tsContext.rangeStartInMillis(groupId), tsContext.rangeEndInMillis(groupId), unitInMillis); } else { - rate = computeRate(state, unitInMillis); + rate = computeRateWithoutExtrapolate(state, unitInMillis); } rates.appendDouble(rate); } From 51e0b6b1f8b773e40e3522dc487e62c8f0d0a270 Mon Sep 17 00:00:00 2001 From: Nhat Nguyen Date: Mon, 7 Apr 2025 10:43:25 -0700 Subject: [PATCH 4/6] Add not enough sample tests --- .../main/resources/k8s-timeseries.csv-spec | 28 +++++++++++++++++++ 1 file changed, 28 insertions(+) diff --git a/x-pack/plugin/esql/qa/testFixtures/src/main/resources/k8s-timeseries.csv-spec b/x-pack/plugin/esql/qa/testFixtures/src/main/resources/k8s-timeseries.csv-spec index ac3157a082bcb..9d2f7d6b15dd3 100644 --- a/x-pack/plugin/esql/qa/testFixtures/src/main/resources/k8s-timeseries.csv-spec +++ b/x-pack/plugin/esql/qa/testFixtures/src/main/resources/k8s-timeseries.csv-spec @@ -127,3 +127,31 @@ max(rate(network.total_bytes_in)):double | time_bucket:datetime | cluster:ke 11.860805860805861 | 2024-05-10T00:15:00.000Z | prod 11.562737642585551 | 2024-05-10T00:10:00.000Z | prod ; + + +oneRateWithBucketAndClusterThenFilter +required_capability: metrics_command +TS k8s | WHERE cluster=="prod" | STATS max(rate(network.total_bytes_in)) BY time_bucket = bucket(@timestamp,5minute), cluster | SORT time_bucket DESC | LIMIT 3; + +max(rate(network.total_bytes_in)):double | time_bucket:datetime | cluster:keyword +6.980660660660663 | 2024-05-10T00:20:00.000Z | prod +11.860805860805861 | 2024-05-10T00:15:00.000Z | prod +11.562737642585551 | 2024-05-10T00:10:00.000Z | prod +; + +notEnoughSamples +required_capability: metrics_command +TS k8s | WHERE @timestamp <= "2024-05-10T00:06:14.000Z" | STATS max(rate(network.total_bytes_in)) BY pod, time_bucket = bucket(@timestamp,1minute) | SORT pod, time_bucket DESC | LIMIT 10; + +max(rate(network.total_bytes_in)):double | pod:keyword | time_bucket:datetime +null | one | 2024-05-10T00:06:00.000Z +0.075 | one | 2024-05-10T00:05:00.000Z +null | one | 2024-05-10T00:04:00.000Z +16.45 | one | 2024-05-10T00:03:00.000Z +null | one | 2024-05-10T00:01:00.000Z +null | three | 2024-05-10T00:06:00.000Z +null | three | 2024-05-10T00:05:00.000Z +1.534413580246913 | three | 2024-05-10T00:03:00.000Z +null | three | 2024-05-10T00:02:00.000Z +null | three | 2024-05-10T00:01:00.000Z +; From bffe2d291d12e81865d2331ef3a0aa7cf6bdf594 Mon Sep 17 00:00:00 2001 From: Nhat Nguyen Date: Mon, 7 Apr 2025 10:45:17 -0700 Subject: [PATCH 5/6] assertion --- .../elasticsearch/compute/aggregation/RateDoubleAggregator.java | 2 ++ .../elasticsearch/compute/aggregation/RateFloatAggregator.java | 2 ++ .../elasticsearch/compute/aggregation/RateIntAggregator.java | 2 ++ .../elasticsearch/compute/aggregation/RateLongAggregator.java | 2 ++ .../elasticsearch/compute/aggregation/X-RateAggregator.java.st | 2 ++ 5 files changed, 10 insertions(+) diff --git a/x-pack/plugin/esql/compute/src/main/generated-src/org/elasticsearch/compute/aggregation/RateDoubleAggregator.java b/x-pack/plugin/esql/compute/src/main/generated-src/org/elasticsearch/compute/aggregation/RateDoubleAggregator.java index 7dff7493a5d44..5dc2ea1655fbf 100644 --- a/x-pack/plugin/esql/compute/src/main/generated-src/org/elasticsearch/compute/aggregation/RateDoubleAggregator.java +++ b/x-pack/plugin/esql/compute/src/main/generated-src/org/elasticsearch/compute/aggregation/RateDoubleAggregator.java @@ -318,6 +318,7 @@ public void toIntermediate(Block[] blocks, int offset, IntVector selected, Drive private static double computeRateWithoutExtrapolate(DoubleRateState state, long unitInMillis) { final int len = state.entries(); + assert len >= 2 : "rate requires at least two samples; got " + len; final long firstTS = state.timestamps[state.timestamps.length - 1]; final long lastTS = state.timestamps[0]; double reset = state.reset; @@ -342,6 +343,7 @@ private static double computeRateWithoutExtrapolate(DoubleRateState state, long */ private static double extrapolateRate(DoubleRateState state, long rangeStart, long rangeEnd, long unitInMillis) { final int len = state.entries(); + assert len >= 2 : "rate requires at least two samples; got " + len; final long firstTS = state.timestamps[state.timestamps.length - 1]; final long lastTS = state.timestamps[0]; double reset = state.reset; diff --git a/x-pack/plugin/esql/compute/src/main/generated-src/org/elasticsearch/compute/aggregation/RateFloatAggregator.java b/x-pack/plugin/esql/compute/src/main/generated-src/org/elasticsearch/compute/aggregation/RateFloatAggregator.java index 32779503ffea6..1bd4e4eb00e27 100644 --- a/x-pack/plugin/esql/compute/src/main/generated-src/org/elasticsearch/compute/aggregation/RateFloatAggregator.java +++ b/x-pack/plugin/esql/compute/src/main/generated-src/org/elasticsearch/compute/aggregation/RateFloatAggregator.java @@ -319,6 +319,7 @@ public void toIntermediate(Block[] blocks, int offset, IntVector selected, Drive private static double computeRateWithoutExtrapolate(FloatRateState state, long unitInMillis) { final int len = state.entries(); + assert len >= 2 : "rate requires at least two samples; got " + len; final long firstTS = state.timestamps[state.timestamps.length - 1]; final long lastTS = state.timestamps[0]; double reset = state.reset; @@ -343,6 +344,7 @@ private static double computeRateWithoutExtrapolate(FloatRateState state, long u */ private static double extrapolateRate(FloatRateState state, long rangeStart, long rangeEnd, long unitInMillis) { final int len = state.entries(); + assert len >= 2 : "rate requires at least two samples; got " + len; final long firstTS = state.timestamps[state.timestamps.length - 1]; final long lastTS = state.timestamps[0]; double reset = state.reset; diff --git a/x-pack/plugin/esql/compute/src/main/generated-src/org/elasticsearch/compute/aggregation/RateIntAggregator.java b/x-pack/plugin/esql/compute/src/main/generated-src/org/elasticsearch/compute/aggregation/RateIntAggregator.java index 67723ff71333a..4789b7468f194 100644 --- a/x-pack/plugin/esql/compute/src/main/generated-src/org/elasticsearch/compute/aggregation/RateIntAggregator.java +++ b/x-pack/plugin/esql/compute/src/main/generated-src/org/elasticsearch/compute/aggregation/RateIntAggregator.java @@ -319,6 +319,7 @@ public void toIntermediate(Block[] blocks, int offset, IntVector selected, Drive private static double computeRateWithoutExtrapolate(IntRateState state, long unitInMillis) { final int len = state.entries(); + assert len >= 2 : "rate requires at least two samples; got " + len; final long firstTS = state.timestamps[state.timestamps.length - 1]; final long lastTS = state.timestamps[0]; double reset = state.reset; @@ -343,6 +344,7 @@ private static double computeRateWithoutExtrapolate(IntRateState state, long uni */ private static double extrapolateRate(IntRateState state, long rangeStart, long rangeEnd, long unitInMillis) { final int len = state.entries(); + assert len >= 2 : "rate requires at least two samples; got " + len; final long firstTS = state.timestamps[state.timestamps.length - 1]; final long lastTS = state.timestamps[0]; double reset = state.reset; diff --git a/x-pack/plugin/esql/compute/src/main/generated-src/org/elasticsearch/compute/aggregation/RateLongAggregator.java b/x-pack/plugin/esql/compute/src/main/generated-src/org/elasticsearch/compute/aggregation/RateLongAggregator.java index 67ea7fc185e1a..2ab242226a56f 100644 --- a/x-pack/plugin/esql/compute/src/main/generated-src/org/elasticsearch/compute/aggregation/RateLongAggregator.java +++ b/x-pack/plugin/esql/compute/src/main/generated-src/org/elasticsearch/compute/aggregation/RateLongAggregator.java @@ -318,6 +318,7 @@ public void toIntermediate(Block[] blocks, int offset, IntVector selected, Drive private static double computeRateWithoutExtrapolate(LongRateState state, long unitInMillis) { final int len = state.entries(); + assert len >= 2 : "rate requires at least two samples; got " + len; final long firstTS = state.timestamps[state.timestamps.length - 1]; final long lastTS = state.timestamps[0]; double reset = state.reset; @@ -342,6 +343,7 @@ private static double computeRateWithoutExtrapolate(LongRateState state, long un */ private static double extrapolateRate(LongRateState state, long rangeStart, long rangeEnd, long unitInMillis) { final int len = state.entries(); + assert len >= 2 : "rate requires at least two samples; got " + len; final long firstTS = state.timestamps[state.timestamps.length - 1]; final long lastTS = state.timestamps[0]; double reset = state.reset; diff --git a/x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/aggregation/X-RateAggregator.java.st b/x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/aggregation/X-RateAggregator.java.st index 5273a77a53f93..babdca13bcc64 100644 --- a/x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/aggregation/X-RateAggregator.java.st +++ b/x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/aggregation/X-RateAggregator.java.st @@ -323,6 +323,7 @@ public class Rate$Type$Aggregator { private static double computeRateWithoutExtrapolate($Type$RateState state, long unitInMillis) { final int len = state.entries(); + assert len >= 2 : "rate requires at least two samples; got " + len; final long firstTS = state.timestamps[state.timestamps.length - 1]; final long lastTS = state.timestamps[0]; double reset = state.reset; @@ -347,6 +348,7 @@ public class Rate$Type$Aggregator { */ private static double extrapolateRate($Type$RateState state, long rangeStart, long rangeEnd, long unitInMillis) { final int len = state.entries(); + assert len >= 2 : "rate requires at least two samples; got " + len; final long firstTS = state.timestamps[state.timestamps.length - 1]; final long lastTS = state.timestamps[0]; double reset = state.reset; From f4510060e714075031a453438c4255e226cac267 Mon Sep 17 00:00:00 2001 From: Nhat Nguyen Date: Mon, 7 Apr 2025 12:28:50 -0700 Subject: [PATCH 6/6] style --- .../compute/aggregation/RateDoubleAggregator.java | 7 ++++++- .../compute/aggregation/RateFloatAggregator.java | 7 ++++++- .../compute/aggregation/RateIntAggregator.java | 7 ++++++- .../compute/aggregation/RateLongAggregator.java | 7 ++++++- .../compute/aggregation/X-RateAggregator.java.st | 7 ++++++- 5 files changed, 30 insertions(+), 5 deletions(-) diff --git a/x-pack/plugin/esql/compute/src/main/generated-src/org/elasticsearch/compute/aggregation/RateDoubleAggregator.java b/x-pack/plugin/esql/compute/src/main/generated-src/org/elasticsearch/compute/aggregation/RateDoubleAggregator.java index 5dc2ea1655fbf..b09f2e90417f0 100644 --- a/x-pack/plugin/esql/compute/src/main/generated-src/org/elasticsearch/compute/aggregation/RateDoubleAggregator.java +++ b/x-pack/plugin/esql/compute/src/main/generated-src/org/elasticsearch/compute/aggregation/RateDoubleAggregator.java @@ -387,7 +387,12 @@ Block evaluateFinal(IntVector selected, GroupingAggregatorEvaluationContext eval int len = state.entries(); final double rate; if (evalContext instanceof TimeSeriesGroupingAggregatorEvaluationContext tsContext) { - rate = extrapolateRate(state, tsContext.rangeStartInMillis(groupId), tsContext.rangeEndInMillis(groupId), unitInMillis); + rate = extrapolateRate( + state, + tsContext.rangeStartInMillis(groupId), + tsContext.rangeEndInMillis(groupId), + unitInMillis + ); } else { rate = computeRateWithoutExtrapolate(state, unitInMillis); } diff --git a/x-pack/plugin/esql/compute/src/main/generated-src/org/elasticsearch/compute/aggregation/RateFloatAggregator.java b/x-pack/plugin/esql/compute/src/main/generated-src/org/elasticsearch/compute/aggregation/RateFloatAggregator.java index 1bd4e4eb00e27..ca5a3d8f7ab95 100644 --- a/x-pack/plugin/esql/compute/src/main/generated-src/org/elasticsearch/compute/aggregation/RateFloatAggregator.java +++ b/x-pack/plugin/esql/compute/src/main/generated-src/org/elasticsearch/compute/aggregation/RateFloatAggregator.java @@ -388,7 +388,12 @@ Block evaluateFinal(IntVector selected, GroupingAggregatorEvaluationContext eval int len = state.entries(); final double rate; if (evalContext instanceof TimeSeriesGroupingAggregatorEvaluationContext tsContext) { - rate = extrapolateRate(state, tsContext.rangeStartInMillis(groupId), tsContext.rangeEndInMillis(groupId), unitInMillis); + rate = extrapolateRate( + state, + tsContext.rangeStartInMillis(groupId), + tsContext.rangeEndInMillis(groupId), + unitInMillis + ); } else { rate = computeRateWithoutExtrapolate(state, unitInMillis); } diff --git a/x-pack/plugin/esql/compute/src/main/generated-src/org/elasticsearch/compute/aggregation/RateIntAggregator.java b/x-pack/plugin/esql/compute/src/main/generated-src/org/elasticsearch/compute/aggregation/RateIntAggregator.java index 4789b7468f194..860b1b8c1b45a 100644 --- a/x-pack/plugin/esql/compute/src/main/generated-src/org/elasticsearch/compute/aggregation/RateIntAggregator.java +++ b/x-pack/plugin/esql/compute/src/main/generated-src/org/elasticsearch/compute/aggregation/RateIntAggregator.java @@ -388,7 +388,12 @@ Block evaluateFinal(IntVector selected, GroupingAggregatorEvaluationContext eval int len = state.entries(); final double rate; if (evalContext instanceof TimeSeriesGroupingAggregatorEvaluationContext tsContext) { - rate = extrapolateRate(state, tsContext.rangeStartInMillis(groupId), tsContext.rangeEndInMillis(groupId), unitInMillis); + rate = extrapolateRate( + state, + tsContext.rangeStartInMillis(groupId), + tsContext.rangeEndInMillis(groupId), + unitInMillis + ); } else { rate = computeRateWithoutExtrapolate(state, unitInMillis); } diff --git a/x-pack/plugin/esql/compute/src/main/generated-src/org/elasticsearch/compute/aggregation/RateLongAggregator.java b/x-pack/plugin/esql/compute/src/main/generated-src/org/elasticsearch/compute/aggregation/RateLongAggregator.java index 2ab242226a56f..8f74def3c949f 100644 --- a/x-pack/plugin/esql/compute/src/main/generated-src/org/elasticsearch/compute/aggregation/RateLongAggregator.java +++ b/x-pack/plugin/esql/compute/src/main/generated-src/org/elasticsearch/compute/aggregation/RateLongAggregator.java @@ -387,7 +387,12 @@ Block evaluateFinal(IntVector selected, GroupingAggregatorEvaluationContext eval int len = state.entries(); final double rate; if (evalContext instanceof TimeSeriesGroupingAggregatorEvaluationContext tsContext) { - rate = extrapolateRate(state, tsContext.rangeStartInMillis(groupId), tsContext.rangeEndInMillis(groupId), unitInMillis); + rate = extrapolateRate( + state, + tsContext.rangeStartInMillis(groupId), + tsContext.rangeEndInMillis(groupId), + unitInMillis + ); } else { rate = computeRateWithoutExtrapolate(state, unitInMillis); } diff --git a/x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/aggregation/X-RateAggregator.java.st b/x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/aggregation/X-RateAggregator.java.st index babdca13bcc64..a584dce56e905 100644 --- a/x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/aggregation/X-RateAggregator.java.st +++ b/x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/aggregation/X-RateAggregator.java.st @@ -392,7 +392,12 @@ public class Rate$Type$Aggregator { int len = state.entries(); final double rate; if (evalContext instanceof TimeSeriesGroupingAggregatorEvaluationContext tsContext) { - rate = extrapolateRate(state, tsContext.rangeStartInMillis(groupId), tsContext.rangeEndInMillis(groupId), unitInMillis); + rate = extrapolateRate( + state, + tsContext.rangeStartInMillis(groupId), + tsContext.rangeEndInMillis(groupId), + unitInMillis + ); } else { rate = computeRateWithoutExtrapolate(state, unitInMillis); }