diff --git a/muted-tests.yml b/muted-tests.yml index 8162a1009f27e..1020767f43707 100644 --- a/muted-tests.yml +++ b/muted-tests.yml @@ -480,6 +480,12 @@ tests: - class: org.elasticsearch.xpack.esql.inference.rerank.RerankOperatorTests method: testSimpleCircuitBreaking issue: https://github.com/elastic/elasticsearch/issues/133619 +- class: org.elasticsearch.xpack.kql.parser.KqlParserBooleanQueryTests + method: testParseOrQuery + issue: https://github.com/elastic/elasticsearch/issues/133863 +- class: org.elasticsearch.xpack.kql.parser.KqlParserBooleanQueryTests + method: testParseAndQuery + issue: https://github.com/elastic/elasticsearch/issues/133871 - class: org.elasticsearch.xpack.ml.integration.InferenceIT method: testInferClassificationModel issue: https://github.com/elastic/elasticsearch/issues/133448 diff --git a/x-pack/plugin/esql/compute/build.gradle b/x-pack/plugin/esql/compute/build.gradle index aa249a8883382..4a7dd9e986873 100644 --- a/x-pack/plugin/esql/compute/build.gradle +++ b/x-pack/plugin/esql/compute/build.gradle @@ -491,6 +491,28 @@ tasks.named('stringTemplates').configure { } } + File irateAggregatorInputFile = file("src/main/java/org/elasticsearch/compute/aggregation/X-IrateAggregator.java.st") + template { + it.properties = intProperties + it.inputFile = irateAggregatorInputFile + it.outputFile = "org/elasticsearch/compute/aggregation/IrateIntAggregator.java" + } + template { + it.properties = longProperties + it.inputFile = irateAggregatorInputFile + it.outputFile = "org/elasticsearch/compute/aggregation/IrateLongAggregator.java" + } + template { + it.properties = floatProperties + it.inputFile = irateAggregatorInputFile + it.outputFile = "org/elasticsearch/compute/aggregation/IrateFloatAggregator.java" + } + template { + it.properties = doubleProperties + it.inputFile = irateAggregatorInputFile + it.outputFile = "org/elasticsearch/compute/aggregation/IrateDoubleAggregator.java" + } + File fallibleStateInputFile = new File("${projectDir}/src/main/java/org/elasticsearch/compute/aggregation/X-FallibleState.java.st") template { it.properties = booleanProperties @@ -601,6 +623,7 @@ tasks.named('stringTemplates').configure { it.inputFile = fallibleArrayStateInputFile it.outputFile = "org/elasticsearch/compute/aggregation/FloatFallibleArrayState.java" } + File valuesAggregatorInputFile = new File("${projectDir}/src/main/java/org/elasticsearch/compute/aggregation/X-ValuesAggregator.java.st") template { it.properties = intProperties diff --git a/x-pack/plugin/esql/compute/src/main/generated-src/org/elasticsearch/compute/aggregation/IrateDoubleAggregator.java b/x-pack/plugin/esql/compute/src/main/generated-src/org/elasticsearch/compute/aggregation/IrateDoubleAggregator.java new file mode 100644 index 0000000000000..7c6af73d719bd --- /dev/null +++ b/x-pack/plugin/esql/compute/src/main/generated-src/org/elasticsearch/compute/aggregation/IrateDoubleAggregator.java @@ -0,0 +1,215 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License + * 2.0; you may not use this file except in compliance with the Elastic License + * 2.0. + */ + +package org.elasticsearch.compute.aggregation; + +// begin generated imports +import org.apache.lucene.util.Accountable; +import org.apache.lucene.util.RamUsageEstimator; +import org.elasticsearch.common.breaker.CircuitBreaker; +import org.elasticsearch.common.util.BigArrays; +import org.elasticsearch.common.util.ObjectArray; +import org.elasticsearch.compute.ann.GroupingAggregator; +import org.elasticsearch.compute.ann.IntermediateState; +import org.elasticsearch.compute.data.Block; +import org.elasticsearch.compute.data.BlockFactory; +import org.elasticsearch.compute.data.DoubleBlock; +import org.elasticsearch.compute.data.DoubleVector; +import org.elasticsearch.compute.data.FloatBlock; +import org.elasticsearch.compute.data.IntBlock; +import org.elasticsearch.compute.data.IntVector; +import org.elasticsearch.compute.data.LongBlock; +import org.elasticsearch.compute.operator.DriverContext; +import org.elasticsearch.core.Releasable; +import org.elasticsearch.core.Releasables; +// end generated imports + +/** + * A rate grouping aggregation definition for double. + * This class is generated. Edit `X-IrateAggregator.java.st` instead. + */ +@GroupingAggregator( + value = { @IntermediateState(name = "timestamps", type = "LONG_BLOCK"), @IntermediateState(name = "values", type = "DOUBLE_BLOCK") } +) +public class IrateDoubleAggregator { + public static DoubleIrateGroupingState initGrouping(DriverContext driverContext) { + return new DoubleIrateGroupingState(driverContext.bigArrays(), driverContext.breaker()); + } + + public static void combine(DoubleIrateGroupingState current, int groupId, double value, long timestamp) { + current.ensureCapacity(groupId); + current.append(groupId, timestamp, value); + } + + public static void combineIntermediate( + DoubleIrateGroupingState current, + int groupId, + LongBlock timestamps, + DoubleBlock values, + int otherPosition + ) { + current.combine(groupId, timestamps, values, otherPosition); + } + + public static Block evaluateFinal(DoubleIrateGroupingState state, IntVector selected, GroupingAggregatorEvaluationContext evalContext) { + return state.evaluateFinal(selected, evalContext); + } + + private static class DoubleIrateState { + static final long BASE_RAM_USAGE = RamUsageEstimator.sizeOfObject(DoubleIrateState.class); + long lastTimestamp; + long secondLastTimestamp = -1; + double lastValue; + double secondLastValue; + boolean hasSecond; + + DoubleIrateState(long lastTimestamp, double lastValue) { + this.lastTimestamp = lastTimestamp; + this.lastValue = lastValue; + this.hasSecond = false; + } + + long bytesUsed() { + return BASE_RAM_USAGE; + } + } + + public static final class DoubleIrateGroupingState implements Releasable, Accountable, GroupingAggregatorState { + private ObjectArray states; + private final BigArrays bigArrays; + private final CircuitBreaker breaker; + private long stateBytes; // for individual states + + DoubleIrateGroupingState(BigArrays bigArrays, CircuitBreaker breaker) { + this.bigArrays = bigArrays; + this.breaker = breaker; + this.states = bigArrays.newObjectArray(1); + } + + void ensureCapacity(int groupId) { + states = bigArrays.grow(states, groupId + 1); + } + + void adjustBreaker(long bytes) { + breaker.addEstimateBytesAndMaybeBreak(bytes, "<>"); + stateBytes += bytes; + assert stateBytes >= 0 : stateBytes; + } + + void append(int groupId, long timestamp, double value) { + var state = states.get(groupId); + if (state == null) { + state = new DoubleIrateState(timestamp, value); + states.set(groupId, state); + adjustBreaker(state.bytesUsed()); + } else { + // We only need the last two values, but we need to keep them sorted by timestamp. + if (timestamp > state.lastTimestamp) { + // new timestamp is the most recent + state.secondLastTimestamp = state.lastTimestamp; + state.secondLastValue = state.lastValue; + state.lastTimestamp = timestamp; + state.lastValue = value; + state.hasSecond = true; + } else if (timestamp > state.secondLastTimestamp) { + // new timestamp is the second most recent + state.secondLastTimestamp = timestamp; + state.secondLastValue = value; + state.hasSecond = true; + } // else: ignore, too old + } + } + + void combine(int groupId, LongBlock timestamps, DoubleBlock values, int otherPosition) { + final int valueCount = timestamps.getValueCount(otherPosition); + if (valueCount == 0) { + return; + } + final int firstTs = timestamps.getFirstValueIndex(otherPosition); + final int firstIndex = values.getFirstValueIndex(otherPosition); + ensureCapacity(groupId); + append(groupId, timestamps.getLong(firstTs), values.getDouble(firstIndex)); + if (valueCount > 1) { + ensureCapacity(groupId); + append(groupId, timestamps.getLong(firstTs + 1), values.getDouble(firstIndex + 1)); + } + } + + @Override + public long ramBytesUsed() { + return states.ramBytesUsed() + stateBytes; + } + + @Override + public void close() { + Releasables.close(states, () -> adjustBreaker(-stateBytes)); + } + + @Override + public void toIntermediate(Block[] blocks, int offset, IntVector selected, DriverContext driverContext) { + assert blocks.length >= offset + 2 : "blocks=" + blocks.length + ",offset=" + offset; + final BlockFactory blockFactory = driverContext.blockFactory(); + final int positionCount = selected.getPositionCount(); + try ( + LongBlock.Builder timestamps = blockFactory.newLongBlockBuilder(positionCount * 2); + DoubleBlock.Builder values = blockFactory.newDoubleBlockBuilder(positionCount * 2); + ) { + for (int i = 0; i < positionCount; i++) { + final var groupId = selected.getInt(i); + final var state = groupId < states.size() ? states.get(groupId) : null; + if (state != null) { + timestamps.beginPositionEntry(); + timestamps.appendLong(state.lastTimestamp); + if (state.hasSecond) { + timestamps.appendLong(state.secondLastTimestamp); + } + timestamps.endPositionEntry(); + + values.beginPositionEntry(); + values.appendDouble(state.lastValue); + if (state.hasSecond) { + values.appendDouble(state.secondLastValue); + } + values.endPositionEntry(); + } else { + timestamps.appendNull(); + values.appendNull(); + } + } + blocks[offset] = timestamps.build(); + blocks[offset + 1] = values.build(); + } + } + + Block evaluateFinal(IntVector selected, GroupingAggregatorEvaluationContext evalContext) { + int positionCount = selected.getPositionCount(); + try (DoubleBlock.Builder rates = evalContext.blockFactory().newDoubleBlockBuilder(positionCount)) { + for (int p = 0; p < positionCount; p++) { + final var groupId = selected.getInt(p); + final var state = groupId < states.size() ? states.get(groupId) : null; + if (state == null || state.hasSecond == false) { + rates.appendNull(); + continue; + } + // When the last value is less than the previous one, we assume a reset + // and use the last value directly. + final double ydiff = state.lastValue >= state.secondLastValue + ? state.lastValue - state.secondLastValue + : state.lastValue; + final long xdiff = state.lastTimestamp - state.secondLastTimestamp; + rates.appendDouble(ydiff / xdiff * 1000); + } + return rates.build(); + } + } + + @Override + public void enableGroupIdTracking(SeenGroupIds seenGroupIds) { + // noop - we handle the null states inside `toIntermediate` and `evaluateFinal` + } + } +} diff --git a/x-pack/plugin/esql/compute/src/main/generated-src/org/elasticsearch/compute/aggregation/IrateFloatAggregator.java b/x-pack/plugin/esql/compute/src/main/generated-src/org/elasticsearch/compute/aggregation/IrateFloatAggregator.java new file mode 100644 index 0000000000000..c4ce7a2955d73 --- /dev/null +++ b/x-pack/plugin/esql/compute/src/main/generated-src/org/elasticsearch/compute/aggregation/IrateFloatAggregator.java @@ -0,0 +1,215 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License + * 2.0; you may not use this file except in compliance with the Elastic License + * 2.0. + */ + +package org.elasticsearch.compute.aggregation; + +// begin generated imports +import org.apache.lucene.util.Accountable; +import org.apache.lucene.util.RamUsageEstimator; +import org.elasticsearch.common.breaker.CircuitBreaker; +import org.elasticsearch.common.util.BigArrays; +import org.elasticsearch.common.util.ObjectArray; +import org.elasticsearch.compute.ann.GroupingAggregator; +import org.elasticsearch.compute.ann.IntermediateState; +import org.elasticsearch.compute.data.Block; +import org.elasticsearch.compute.data.BlockFactory; +import org.elasticsearch.compute.data.DoubleBlock; +import org.elasticsearch.compute.data.DoubleVector; +import org.elasticsearch.compute.data.FloatBlock; +import org.elasticsearch.compute.data.IntBlock; +import org.elasticsearch.compute.data.IntVector; +import org.elasticsearch.compute.data.LongBlock; +import org.elasticsearch.compute.operator.DriverContext; +import org.elasticsearch.core.Releasable; +import org.elasticsearch.core.Releasables; +// end generated imports + +/** + * A rate grouping aggregation definition for float. + * This class is generated. Edit `X-IrateAggregator.java.st` instead. + */ +@GroupingAggregator( + value = { @IntermediateState(name = "timestamps", type = "LONG_BLOCK"), @IntermediateState(name = "values", type = "FLOAT_BLOCK") } +) +public class IrateFloatAggregator { + public static FloatIrateGroupingState initGrouping(DriverContext driverContext) { + return new FloatIrateGroupingState(driverContext.bigArrays(), driverContext.breaker()); + } + + public static void combine(FloatIrateGroupingState current, int groupId, float value, long timestamp) { + current.ensureCapacity(groupId); + current.append(groupId, timestamp, value); + } + + public static void combineIntermediate( + FloatIrateGroupingState current, + int groupId, + LongBlock timestamps, + FloatBlock values, + int otherPosition + ) { + current.combine(groupId, timestamps, values, otherPosition); + } + + public static Block evaluateFinal(FloatIrateGroupingState state, IntVector selected, GroupingAggregatorEvaluationContext evalContext) { + return state.evaluateFinal(selected, evalContext); + } + + private static class FloatIrateState { + static final long BASE_RAM_USAGE = RamUsageEstimator.sizeOfObject(FloatIrateState.class); + long lastTimestamp; + long secondLastTimestamp = -1; + float lastValue; + float secondLastValue; + boolean hasSecond; + + FloatIrateState(long lastTimestamp, float lastValue) { + this.lastTimestamp = lastTimestamp; + this.lastValue = lastValue; + this.hasSecond = false; + } + + long bytesUsed() { + return BASE_RAM_USAGE; + } + } + + public static final class FloatIrateGroupingState implements Releasable, Accountable, GroupingAggregatorState { + private ObjectArray states; + private final BigArrays bigArrays; + private final CircuitBreaker breaker; + private long stateBytes; // for individual states + + FloatIrateGroupingState(BigArrays bigArrays, CircuitBreaker breaker) { + this.bigArrays = bigArrays; + this.breaker = breaker; + this.states = bigArrays.newObjectArray(1); + } + + void ensureCapacity(int groupId) { + states = bigArrays.grow(states, groupId + 1); + } + + void adjustBreaker(long bytes) { + breaker.addEstimateBytesAndMaybeBreak(bytes, "<>"); + stateBytes += bytes; + assert stateBytes >= 0 : stateBytes; + } + + void append(int groupId, long timestamp, float value) { + var state = states.get(groupId); + if (state == null) { + state = new FloatIrateState(timestamp, value); + states.set(groupId, state); + adjustBreaker(state.bytesUsed()); + } else { + // We only need the last two values, but we need to keep them sorted by timestamp. + if (timestamp > state.lastTimestamp) { + // new timestamp is the most recent + state.secondLastTimestamp = state.lastTimestamp; + state.secondLastValue = state.lastValue; + state.lastTimestamp = timestamp; + state.lastValue = value; + state.hasSecond = true; + } else if (timestamp > state.secondLastTimestamp) { + // new timestamp is the second most recent + state.secondLastTimestamp = timestamp; + state.secondLastValue = value; + state.hasSecond = true; + } // else: ignore, too old + } + } + + void combine(int groupId, LongBlock timestamps, FloatBlock values, int otherPosition) { + final int valueCount = timestamps.getValueCount(otherPosition); + if (valueCount == 0) { + return; + } + final int firstTs = timestamps.getFirstValueIndex(otherPosition); + final int firstIndex = values.getFirstValueIndex(otherPosition); + ensureCapacity(groupId); + append(groupId, timestamps.getLong(firstTs), values.getFloat(firstIndex)); + if (valueCount > 1) { + ensureCapacity(groupId); + append(groupId, timestamps.getLong(firstTs + 1), values.getFloat(firstIndex + 1)); + } + } + + @Override + public long ramBytesUsed() { + return states.ramBytesUsed() + stateBytes; + } + + @Override + public void close() { + Releasables.close(states, () -> adjustBreaker(-stateBytes)); + } + + @Override + public void toIntermediate(Block[] blocks, int offset, IntVector selected, DriverContext driverContext) { + assert blocks.length >= offset + 2 : "blocks=" + blocks.length + ",offset=" + offset; + final BlockFactory blockFactory = driverContext.blockFactory(); + final int positionCount = selected.getPositionCount(); + try ( + LongBlock.Builder timestamps = blockFactory.newLongBlockBuilder(positionCount * 2); + FloatBlock.Builder values = blockFactory.newFloatBlockBuilder(positionCount * 2); + ) { + for (int i = 0; i < positionCount; i++) { + final var groupId = selected.getInt(i); + final var state = groupId < states.size() ? states.get(groupId) : null; + if (state != null) { + timestamps.beginPositionEntry(); + timestamps.appendLong(state.lastTimestamp); + if (state.hasSecond) { + timestamps.appendLong(state.secondLastTimestamp); + } + timestamps.endPositionEntry(); + + values.beginPositionEntry(); + values.appendFloat(state.lastValue); + if (state.hasSecond) { + values.appendFloat(state.secondLastValue); + } + values.endPositionEntry(); + } else { + timestamps.appendNull(); + values.appendNull(); + } + } + blocks[offset] = timestamps.build(); + blocks[offset + 1] = values.build(); + } + } + + Block evaluateFinal(IntVector selected, GroupingAggregatorEvaluationContext evalContext) { + int positionCount = selected.getPositionCount(); + try (DoubleBlock.Builder rates = evalContext.blockFactory().newDoubleBlockBuilder(positionCount)) { + for (int p = 0; p < positionCount; p++) { + final var groupId = selected.getInt(p); + final var state = groupId < states.size() ? states.get(groupId) : null; + if (state == null || state.hasSecond == false) { + rates.appendNull(); + continue; + } + // When the last value is less than the previous one, we assume a reset + // and use the last value directly. + final double ydiff = state.lastValue >= state.secondLastValue + ? state.lastValue - state.secondLastValue + : state.lastValue; + final long xdiff = state.lastTimestamp - state.secondLastTimestamp; + rates.appendDouble(ydiff / xdiff * 1000); + } + return rates.build(); + } + } + + @Override + public void enableGroupIdTracking(SeenGroupIds seenGroupIds) { + // noop - we handle the null states inside `toIntermediate` and `evaluateFinal` + } + } +} diff --git a/x-pack/plugin/esql/compute/src/main/generated-src/org/elasticsearch/compute/aggregation/IrateIntAggregator.java b/x-pack/plugin/esql/compute/src/main/generated-src/org/elasticsearch/compute/aggregation/IrateIntAggregator.java new file mode 100644 index 0000000000000..1cf6a69e0ecb6 --- /dev/null +++ b/x-pack/plugin/esql/compute/src/main/generated-src/org/elasticsearch/compute/aggregation/IrateIntAggregator.java @@ -0,0 +1,215 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License + * 2.0; you may not use this file except in compliance with the Elastic License + * 2.0. + */ + +package org.elasticsearch.compute.aggregation; + +// begin generated imports +import org.apache.lucene.util.Accountable; +import org.apache.lucene.util.RamUsageEstimator; +import org.elasticsearch.common.breaker.CircuitBreaker; +import org.elasticsearch.common.util.BigArrays; +import org.elasticsearch.common.util.ObjectArray; +import org.elasticsearch.compute.ann.GroupingAggregator; +import org.elasticsearch.compute.ann.IntermediateState; +import org.elasticsearch.compute.data.Block; +import org.elasticsearch.compute.data.BlockFactory; +import org.elasticsearch.compute.data.DoubleBlock; +import org.elasticsearch.compute.data.DoubleVector; +import org.elasticsearch.compute.data.FloatBlock; +import org.elasticsearch.compute.data.IntBlock; +import org.elasticsearch.compute.data.IntVector; +import org.elasticsearch.compute.data.LongBlock; +import org.elasticsearch.compute.operator.DriverContext; +import org.elasticsearch.core.Releasable; +import org.elasticsearch.core.Releasables; +// end generated imports + +/** + * A rate grouping aggregation definition for int. + * This class is generated. Edit `X-IrateAggregator.java.st` instead. + */ +@GroupingAggregator( + value = { @IntermediateState(name = "timestamps", type = "LONG_BLOCK"), @IntermediateState(name = "values", type = "INT_BLOCK") } +) +public class IrateIntAggregator { + public static IntIrateGroupingState initGrouping(DriverContext driverContext) { + return new IntIrateGroupingState(driverContext.bigArrays(), driverContext.breaker()); + } + + public static void combine(IntIrateGroupingState current, int groupId, int value, long timestamp) { + current.ensureCapacity(groupId); + current.append(groupId, timestamp, value); + } + + public static void combineIntermediate( + IntIrateGroupingState current, + int groupId, + LongBlock timestamps, + IntBlock values, + int otherPosition + ) { + current.combine(groupId, timestamps, values, otherPosition); + } + + public static Block evaluateFinal(IntIrateGroupingState state, IntVector selected, GroupingAggregatorEvaluationContext evalContext) { + return state.evaluateFinal(selected, evalContext); + } + + private static class IntIrateState { + static final long BASE_RAM_USAGE = RamUsageEstimator.sizeOfObject(IntIrateState.class); + long lastTimestamp; + long secondLastTimestamp = -1; + int lastValue; + int secondLastValue; + boolean hasSecond; + + IntIrateState(long lastTimestamp, int lastValue) { + this.lastTimestamp = lastTimestamp; + this.lastValue = lastValue; + this.hasSecond = false; + } + + long bytesUsed() { + return BASE_RAM_USAGE; + } + } + + public static final class IntIrateGroupingState implements Releasable, Accountable, GroupingAggregatorState { + private ObjectArray states; + private final BigArrays bigArrays; + private final CircuitBreaker breaker; + private long stateBytes; // for individual states + + IntIrateGroupingState(BigArrays bigArrays, CircuitBreaker breaker) { + this.bigArrays = bigArrays; + this.breaker = breaker; + this.states = bigArrays.newObjectArray(1); + } + + void ensureCapacity(int groupId) { + states = bigArrays.grow(states, groupId + 1); + } + + void adjustBreaker(long bytes) { + breaker.addEstimateBytesAndMaybeBreak(bytes, "<>"); + stateBytes += bytes; + assert stateBytes >= 0 : stateBytes; + } + + void append(int groupId, long timestamp, int value) { + var state = states.get(groupId); + if (state == null) { + state = new IntIrateState(timestamp, value); + states.set(groupId, state); + adjustBreaker(state.bytesUsed()); + } else { + // We only need the last two values, but we need to keep them sorted by timestamp. + if (timestamp > state.lastTimestamp) { + // new timestamp is the most recent + state.secondLastTimestamp = state.lastTimestamp; + state.secondLastValue = state.lastValue; + state.lastTimestamp = timestamp; + state.lastValue = value; + state.hasSecond = true; + } else if (timestamp > state.secondLastTimestamp) { + // new timestamp is the second most recent + state.secondLastTimestamp = timestamp; + state.secondLastValue = value; + state.hasSecond = true; + } // else: ignore, too old + } + } + + void combine(int groupId, LongBlock timestamps, IntBlock values, int otherPosition) { + final int valueCount = timestamps.getValueCount(otherPosition); + if (valueCount == 0) { + return; + } + final int firstTs = timestamps.getFirstValueIndex(otherPosition); + final int firstIndex = values.getFirstValueIndex(otherPosition); + ensureCapacity(groupId); + append(groupId, timestamps.getLong(firstTs), values.getInt(firstIndex)); + if (valueCount > 1) { + ensureCapacity(groupId); + append(groupId, timestamps.getLong(firstTs + 1), values.getInt(firstIndex + 1)); + } + } + + @Override + public long ramBytesUsed() { + return states.ramBytesUsed() + stateBytes; + } + + @Override + public void close() { + Releasables.close(states, () -> adjustBreaker(-stateBytes)); + } + + @Override + public void toIntermediate(Block[] blocks, int offset, IntVector selected, DriverContext driverContext) { + assert blocks.length >= offset + 2 : "blocks=" + blocks.length + ",offset=" + offset; + final BlockFactory blockFactory = driverContext.blockFactory(); + final int positionCount = selected.getPositionCount(); + try ( + LongBlock.Builder timestamps = blockFactory.newLongBlockBuilder(positionCount * 2); + IntBlock.Builder values = blockFactory.newIntBlockBuilder(positionCount * 2); + ) { + for (int i = 0; i < positionCount; i++) { + final var groupId = selected.getInt(i); + final var state = groupId < states.size() ? states.get(groupId) : null; + if (state != null) { + timestamps.beginPositionEntry(); + timestamps.appendLong(state.lastTimestamp); + if (state.hasSecond) { + timestamps.appendLong(state.secondLastTimestamp); + } + timestamps.endPositionEntry(); + + values.beginPositionEntry(); + values.appendInt(state.lastValue); + if (state.hasSecond) { + values.appendInt(state.secondLastValue); + } + values.endPositionEntry(); + } else { + timestamps.appendNull(); + values.appendNull(); + } + } + blocks[offset] = timestamps.build(); + blocks[offset + 1] = values.build(); + } + } + + Block evaluateFinal(IntVector selected, GroupingAggregatorEvaluationContext evalContext) { + int positionCount = selected.getPositionCount(); + try (DoubleBlock.Builder rates = evalContext.blockFactory().newDoubleBlockBuilder(positionCount)) { + for (int p = 0; p < positionCount; p++) { + final var groupId = selected.getInt(p); + final var state = groupId < states.size() ? states.get(groupId) : null; + if (state == null || state.hasSecond == false) { + rates.appendNull(); + continue; + } + // When the last value is less than the previous one, we assume a reset + // and use the last value directly. + final double ydiff = state.lastValue >= state.secondLastValue + ? state.lastValue - state.secondLastValue + : state.lastValue; + final long xdiff = state.lastTimestamp - state.secondLastTimestamp; + rates.appendDouble(ydiff / xdiff * 1000); + } + return rates.build(); + } + } + + @Override + public void enableGroupIdTracking(SeenGroupIds seenGroupIds) { + // noop - we handle the null states inside `toIntermediate` and `evaluateFinal` + } + } +} diff --git a/x-pack/plugin/esql/compute/src/main/generated-src/org/elasticsearch/compute/aggregation/IrateLongAggregator.java b/x-pack/plugin/esql/compute/src/main/generated-src/org/elasticsearch/compute/aggregation/IrateLongAggregator.java new file mode 100644 index 0000000000000..44419e5db116e --- /dev/null +++ b/x-pack/plugin/esql/compute/src/main/generated-src/org/elasticsearch/compute/aggregation/IrateLongAggregator.java @@ -0,0 +1,215 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License + * 2.0; you may not use this file except in compliance with the Elastic License + * 2.0. + */ + +package org.elasticsearch.compute.aggregation; + +// begin generated imports +import org.apache.lucene.util.Accountable; +import org.apache.lucene.util.RamUsageEstimator; +import org.elasticsearch.common.breaker.CircuitBreaker; +import org.elasticsearch.common.util.BigArrays; +import org.elasticsearch.common.util.ObjectArray; +import org.elasticsearch.compute.ann.GroupingAggregator; +import org.elasticsearch.compute.ann.IntermediateState; +import org.elasticsearch.compute.data.Block; +import org.elasticsearch.compute.data.BlockFactory; +import org.elasticsearch.compute.data.DoubleBlock; +import org.elasticsearch.compute.data.DoubleVector; +import org.elasticsearch.compute.data.FloatBlock; +import org.elasticsearch.compute.data.IntBlock; +import org.elasticsearch.compute.data.IntVector; +import org.elasticsearch.compute.data.LongBlock; +import org.elasticsearch.compute.operator.DriverContext; +import org.elasticsearch.core.Releasable; +import org.elasticsearch.core.Releasables; +// end generated imports + +/** + * A rate grouping aggregation definition for long. + * This class is generated. Edit `X-IrateAggregator.java.st` instead. + */ +@GroupingAggregator( + value = { @IntermediateState(name = "timestamps", type = "LONG_BLOCK"), @IntermediateState(name = "values", type = "LONG_BLOCK") } +) +public class IrateLongAggregator { + public static LongIrateGroupingState initGrouping(DriverContext driverContext) { + return new LongIrateGroupingState(driverContext.bigArrays(), driverContext.breaker()); + } + + public static void combine(LongIrateGroupingState current, int groupId, long value, long timestamp) { + current.ensureCapacity(groupId); + current.append(groupId, timestamp, value); + } + + public static void combineIntermediate( + LongIrateGroupingState current, + int groupId, + LongBlock timestamps, + LongBlock values, + int otherPosition + ) { + current.combine(groupId, timestamps, values, otherPosition); + } + + public static Block evaluateFinal(LongIrateGroupingState state, IntVector selected, GroupingAggregatorEvaluationContext evalContext) { + return state.evaluateFinal(selected, evalContext); + } + + private static class LongIrateState { + static final long BASE_RAM_USAGE = RamUsageEstimator.sizeOfObject(LongIrateState.class); + long lastTimestamp; + long secondLastTimestamp = -1; + long lastValue; + long secondLastValue; + boolean hasSecond; + + LongIrateState(long lastTimestamp, long lastValue) { + this.lastTimestamp = lastTimestamp; + this.lastValue = lastValue; + this.hasSecond = false; + } + + long bytesUsed() { + return BASE_RAM_USAGE; + } + } + + public static final class LongIrateGroupingState implements Releasable, Accountable, GroupingAggregatorState { + private ObjectArray states; + private final BigArrays bigArrays; + private final CircuitBreaker breaker; + private long stateBytes; // for individual states + + LongIrateGroupingState(BigArrays bigArrays, CircuitBreaker breaker) { + this.bigArrays = bigArrays; + this.breaker = breaker; + this.states = bigArrays.newObjectArray(1); + } + + void ensureCapacity(int groupId) { + states = bigArrays.grow(states, groupId + 1); + } + + void adjustBreaker(long bytes) { + breaker.addEstimateBytesAndMaybeBreak(bytes, "<>"); + stateBytes += bytes; + assert stateBytes >= 0 : stateBytes; + } + + void append(int groupId, long timestamp, long value) { + var state = states.get(groupId); + if (state == null) { + state = new LongIrateState(timestamp, value); + states.set(groupId, state); + adjustBreaker(state.bytesUsed()); + } else { + // We only need the last two values, but we need to keep them sorted by timestamp. + if (timestamp > state.lastTimestamp) { + // new timestamp is the most recent + state.secondLastTimestamp = state.lastTimestamp; + state.secondLastValue = state.lastValue; + state.lastTimestamp = timestamp; + state.lastValue = value; + state.hasSecond = true; + } else if (timestamp > state.secondLastTimestamp) { + // new timestamp is the second most recent + state.secondLastTimestamp = timestamp; + state.secondLastValue = value; + state.hasSecond = true; + } // else: ignore, too old + } + } + + void combine(int groupId, LongBlock timestamps, LongBlock values, int otherPosition) { + final int valueCount = timestamps.getValueCount(otherPosition); + if (valueCount == 0) { + return; + } + final int firstTs = timestamps.getFirstValueIndex(otherPosition); + final int firstIndex = values.getFirstValueIndex(otherPosition); + ensureCapacity(groupId); + append(groupId, timestamps.getLong(firstTs), values.getLong(firstIndex)); + if (valueCount > 1) { + ensureCapacity(groupId); + append(groupId, timestamps.getLong(firstTs + 1), values.getLong(firstIndex + 1)); + } + } + + @Override + public long ramBytesUsed() { + return states.ramBytesUsed() + stateBytes; + } + + @Override + public void close() { + Releasables.close(states, () -> adjustBreaker(-stateBytes)); + } + + @Override + public void toIntermediate(Block[] blocks, int offset, IntVector selected, DriverContext driverContext) { + assert blocks.length >= offset + 2 : "blocks=" + blocks.length + ",offset=" + offset; + final BlockFactory blockFactory = driverContext.blockFactory(); + final int positionCount = selected.getPositionCount(); + try ( + LongBlock.Builder timestamps = blockFactory.newLongBlockBuilder(positionCount * 2); + LongBlock.Builder values = blockFactory.newLongBlockBuilder(positionCount * 2); + ) { + for (int i = 0; i < positionCount; i++) { + final var groupId = selected.getInt(i); + final var state = groupId < states.size() ? states.get(groupId) : null; + if (state != null) { + timestamps.beginPositionEntry(); + timestamps.appendLong(state.lastTimestamp); + if (state.hasSecond) { + timestamps.appendLong(state.secondLastTimestamp); + } + timestamps.endPositionEntry(); + + values.beginPositionEntry(); + values.appendLong(state.lastValue); + if (state.hasSecond) { + values.appendLong(state.secondLastValue); + } + values.endPositionEntry(); + } else { + timestamps.appendNull(); + values.appendNull(); + } + } + blocks[offset] = timestamps.build(); + blocks[offset + 1] = values.build(); + } + } + + Block evaluateFinal(IntVector selected, GroupingAggregatorEvaluationContext evalContext) { + int positionCount = selected.getPositionCount(); + try (DoubleBlock.Builder rates = evalContext.blockFactory().newDoubleBlockBuilder(positionCount)) { + for (int p = 0; p < positionCount; p++) { + final var groupId = selected.getInt(p); + final var state = groupId < states.size() ? states.get(groupId) : null; + if (state == null || state.hasSecond == false) { + rates.appendNull(); + continue; + } + // When the last value is less than the previous one, we assume a reset + // and use the last value directly. + final double ydiff = state.lastValue >= state.secondLastValue + ? state.lastValue - state.secondLastValue + : state.lastValue; + final long xdiff = state.lastTimestamp - state.secondLastTimestamp; + rates.appendDouble(ydiff / xdiff * 1000); + } + return rates.build(); + } + } + + @Override + public void enableGroupIdTracking(SeenGroupIds seenGroupIds) { + // noop - we handle the null states inside `toIntermediate` and `evaluateFinal` + } + } +} diff --git a/x-pack/plugin/esql/compute/src/main/generated/org/elasticsearch/compute/aggregation/IrateDoubleAggregatorFunctionSupplier.java b/x-pack/plugin/esql/compute/src/main/generated/org/elasticsearch/compute/aggregation/IrateDoubleAggregatorFunctionSupplier.java new file mode 100644 index 0000000000000..c2a0c69464696 --- /dev/null +++ b/x-pack/plugin/esql/compute/src/main/generated/org/elasticsearch/compute/aggregation/IrateDoubleAggregatorFunctionSupplier.java @@ -0,0 +1,46 @@ +// Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one +// or more contributor license agreements. Licensed under the Elastic License +// 2.0; you may not use this file except in compliance with the Elastic License +// 2.0. +package org.elasticsearch.compute.aggregation; + +import java.lang.Integer; +import java.lang.Override; +import java.lang.String; +import java.util.List; +import org.elasticsearch.compute.operator.DriverContext; + +/** + * {@link AggregatorFunctionSupplier} implementation for {@link IrateDoubleAggregator}. + * This class is generated. Edit {@code AggregatorFunctionSupplierImplementer} instead. + */ +public final class IrateDoubleAggregatorFunctionSupplier implements AggregatorFunctionSupplier { + public IrateDoubleAggregatorFunctionSupplier() { + } + + @Override + public List nonGroupingIntermediateStateDesc() { + throw new UnsupportedOperationException("non-grouping aggregator is not supported"); + } + + @Override + public List groupingIntermediateStateDesc() { + return IrateDoubleGroupingAggregatorFunction.intermediateStateDesc(); + } + + @Override + public AggregatorFunction aggregator(DriverContext driverContext, List channels) { + throw new UnsupportedOperationException("non-grouping aggregator is not supported"); + } + + @Override + public IrateDoubleGroupingAggregatorFunction groupingAggregator(DriverContext driverContext, + List channels) { + return IrateDoubleGroupingAggregatorFunction.create(channels, driverContext); + } + + @Override + public String describe() { + return "irate of doubles"; + } +} diff --git a/x-pack/plugin/esql/compute/src/main/generated/org/elasticsearch/compute/aggregation/IrateDoubleGroupingAggregatorFunction.java b/x-pack/plugin/esql/compute/src/main/generated/org/elasticsearch/compute/aggregation/IrateDoubleGroupingAggregatorFunction.java new file mode 100644 index 0000000000000..6075a98f24f5a --- /dev/null +++ b/x-pack/plugin/esql/compute/src/main/generated/org/elasticsearch/compute/aggregation/IrateDoubleGroupingAggregatorFunction.java @@ -0,0 +1,390 @@ +// Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one +// or more contributor license agreements. Licensed under the Elastic License +// 2.0; you may not use this file except in compliance with the Elastic License +// 2.0. +package org.elasticsearch.compute.aggregation; + +import java.lang.Integer; +import java.lang.Override; +import java.lang.String; +import java.lang.StringBuilder; +import java.util.List; +import org.elasticsearch.compute.data.Block; +import org.elasticsearch.compute.data.DoubleBlock; +import org.elasticsearch.compute.data.DoubleVector; +import org.elasticsearch.compute.data.ElementType; +import org.elasticsearch.compute.data.IntArrayBlock; +import org.elasticsearch.compute.data.IntBigArrayBlock; +import org.elasticsearch.compute.data.IntVector; +import org.elasticsearch.compute.data.LongBlock; +import org.elasticsearch.compute.data.LongVector; +import org.elasticsearch.compute.data.Page; +import org.elasticsearch.compute.operator.DriverContext; + +/** + * {@link GroupingAggregatorFunction} implementation for {@link IrateDoubleAggregator}. + * This class is generated. Edit {@code GroupingAggregatorImplementer} instead. + */ +public final class IrateDoubleGroupingAggregatorFunction implements GroupingAggregatorFunction { + private static final List INTERMEDIATE_STATE_DESC = List.of( + new IntermediateStateDesc("timestamps", ElementType.LONG), + new IntermediateStateDesc("values", ElementType.DOUBLE) ); + + private final IrateDoubleAggregator.DoubleIrateGroupingState state; + + private final List channels; + + private final DriverContext driverContext; + + public IrateDoubleGroupingAggregatorFunction(List channels, + IrateDoubleAggregator.DoubleIrateGroupingState state, DriverContext driverContext) { + this.channels = channels; + this.state = state; + this.driverContext = driverContext; + } + + public static IrateDoubleGroupingAggregatorFunction create(List channels, + DriverContext driverContext) { + return new IrateDoubleGroupingAggregatorFunction(channels, IrateDoubleAggregator.initGrouping(driverContext), driverContext); + } + + public static List intermediateStateDesc() { + return INTERMEDIATE_STATE_DESC; + } + + @Override + public int intermediateBlockCount() { + return INTERMEDIATE_STATE_DESC.size(); + } + + @Override + public GroupingAggregatorFunction.AddInput prepareProcessRawInputPage(SeenGroupIds seenGroupIds, + Page page) { + DoubleBlock valueBlock = page.getBlock(channels.get(0)); + LongBlock timestampBlock = page.getBlock(channels.get(1)); + DoubleVector valueVector = valueBlock.asVector(); + if (valueVector == null) { + maybeEnableGroupIdTracking(seenGroupIds, valueBlock, timestampBlock); + return new GroupingAggregatorFunction.AddInput() { + @Override + public void add(int positionOffset, IntArrayBlock groupIds) { + addRawInput(positionOffset, groupIds, valueBlock, timestampBlock); + } + + @Override + public void add(int positionOffset, IntBigArrayBlock groupIds) { + addRawInput(positionOffset, groupIds, valueBlock, timestampBlock); + } + + @Override + public void add(int positionOffset, IntVector groupIds) { + addRawInput(positionOffset, groupIds, valueBlock, timestampBlock); + } + + @Override + public void close() { + } + }; + } + LongVector timestampVector = timestampBlock.asVector(); + if (timestampVector == null) { + maybeEnableGroupIdTracking(seenGroupIds, valueBlock, timestampBlock); + return new GroupingAggregatorFunction.AddInput() { + @Override + public void add(int positionOffset, IntArrayBlock groupIds) { + addRawInput(positionOffset, groupIds, valueBlock, timestampBlock); + } + + @Override + public void add(int positionOffset, IntBigArrayBlock groupIds) { + addRawInput(positionOffset, groupIds, valueBlock, timestampBlock); + } + + @Override + public void add(int positionOffset, IntVector groupIds) { + addRawInput(positionOffset, groupIds, valueBlock, timestampBlock); + } + + @Override + public void close() { + } + }; + } + return new GroupingAggregatorFunction.AddInput() { + @Override + public void add(int positionOffset, IntArrayBlock groupIds) { + addRawInput(positionOffset, groupIds, valueVector, timestampVector); + } + + @Override + public void add(int positionOffset, IntBigArrayBlock groupIds) { + addRawInput(positionOffset, groupIds, valueVector, timestampVector); + } + + @Override + public void add(int positionOffset, IntVector groupIds) { + addRawInput(positionOffset, groupIds, valueVector, timestampVector); + } + + @Override + public void close() { + } + }; + } + + private void addRawInput(int positionOffset, IntArrayBlock groups, DoubleBlock valueBlock, + LongBlock timestampBlock) { + for (int groupPosition = 0; groupPosition < groups.getPositionCount(); groupPosition++) { + if (groups.isNull(groupPosition)) { + continue; + } + int valuesPosition = groupPosition + positionOffset; + if (valueBlock.isNull(valuesPosition)) { + continue; + } + if (timestampBlock.isNull(valuesPosition)) { + continue; + } + int groupStart = groups.getFirstValueIndex(groupPosition); + int groupEnd = groupStart + groups.getValueCount(groupPosition); + for (int g = groupStart; g < groupEnd; g++) { + int groupId = groups.getInt(g); + int valueStart = valueBlock.getFirstValueIndex(valuesPosition); + int valueEnd = valueStart + valueBlock.getValueCount(valuesPosition); + for (int valueOffset = valueStart; valueOffset < valueEnd; valueOffset++) { + double valueValue = valueBlock.getDouble(valueOffset); + int timestampStart = timestampBlock.getFirstValueIndex(valuesPosition); + int timestampEnd = timestampStart + timestampBlock.getValueCount(valuesPosition); + for (int timestampOffset = timestampStart; timestampOffset < timestampEnd; timestampOffset++) { + long timestampValue = timestampBlock.getLong(timestampOffset); + IrateDoubleAggregator.combine(state, groupId, valueValue, timestampValue); + } + } + } + } + } + + private void addRawInput(int positionOffset, IntArrayBlock groups, DoubleVector valueVector, + LongVector timestampVector) { + for (int groupPosition = 0; groupPosition < groups.getPositionCount(); groupPosition++) { + if (groups.isNull(groupPosition)) { + continue; + } + int valuesPosition = groupPosition + positionOffset; + int groupStart = groups.getFirstValueIndex(groupPosition); + int groupEnd = groupStart + groups.getValueCount(groupPosition); + for (int g = groupStart; g < groupEnd; g++) { + int groupId = groups.getInt(g); + double valueValue = valueVector.getDouble(valuesPosition); + long timestampValue = timestampVector.getLong(valuesPosition); + IrateDoubleAggregator.combine(state, groupId, valueValue, timestampValue); + } + } + } + + @Override + public void addIntermediateInput(int positionOffset, IntArrayBlock groups, Page page) { + state.enableGroupIdTracking(new SeenGroupIds.Empty()); + assert channels.size() == intermediateBlockCount(); + Block timestampsUncast = page.getBlock(channels.get(0)); + if (timestampsUncast.areAllValuesNull()) { + return; + } + LongBlock timestamps = (LongBlock) timestampsUncast; + Block valuesUncast = page.getBlock(channels.get(1)); + if (valuesUncast.areAllValuesNull()) { + return; + } + DoubleBlock values = (DoubleBlock) valuesUncast; + assert timestamps.getPositionCount() == values.getPositionCount(); + for (int groupPosition = 0; groupPosition < groups.getPositionCount(); groupPosition++) { + if (groups.isNull(groupPosition)) { + continue; + } + int groupStart = groups.getFirstValueIndex(groupPosition); + int groupEnd = groupStart + groups.getValueCount(groupPosition); + for (int g = groupStart; g < groupEnd; g++) { + int groupId = groups.getInt(g); + int valuesPosition = groupPosition + positionOffset; + IrateDoubleAggregator.combineIntermediate(state, groupId, timestamps, values, valuesPosition); + } + } + } + + private void addRawInput(int positionOffset, IntBigArrayBlock groups, DoubleBlock valueBlock, + LongBlock timestampBlock) { + for (int groupPosition = 0; groupPosition < groups.getPositionCount(); groupPosition++) { + if (groups.isNull(groupPosition)) { + continue; + } + int valuesPosition = groupPosition + positionOffset; + if (valueBlock.isNull(valuesPosition)) { + continue; + } + if (timestampBlock.isNull(valuesPosition)) { + continue; + } + int groupStart = groups.getFirstValueIndex(groupPosition); + int groupEnd = groupStart + groups.getValueCount(groupPosition); + for (int g = groupStart; g < groupEnd; g++) { + int groupId = groups.getInt(g); + int valueStart = valueBlock.getFirstValueIndex(valuesPosition); + int valueEnd = valueStart + valueBlock.getValueCount(valuesPosition); + for (int valueOffset = valueStart; valueOffset < valueEnd; valueOffset++) { + double valueValue = valueBlock.getDouble(valueOffset); + int timestampStart = timestampBlock.getFirstValueIndex(valuesPosition); + int timestampEnd = timestampStart + timestampBlock.getValueCount(valuesPosition); + for (int timestampOffset = timestampStart; timestampOffset < timestampEnd; timestampOffset++) { + long timestampValue = timestampBlock.getLong(timestampOffset); + IrateDoubleAggregator.combine(state, groupId, valueValue, timestampValue); + } + } + } + } + } + + private void addRawInput(int positionOffset, IntBigArrayBlock groups, DoubleVector valueVector, + LongVector timestampVector) { + for (int groupPosition = 0; groupPosition < groups.getPositionCount(); groupPosition++) { + if (groups.isNull(groupPosition)) { + continue; + } + int valuesPosition = groupPosition + positionOffset; + int groupStart = groups.getFirstValueIndex(groupPosition); + int groupEnd = groupStart + groups.getValueCount(groupPosition); + for (int g = groupStart; g < groupEnd; g++) { + int groupId = groups.getInt(g); + double valueValue = valueVector.getDouble(valuesPosition); + long timestampValue = timestampVector.getLong(valuesPosition); + IrateDoubleAggregator.combine(state, groupId, valueValue, timestampValue); + } + } + } + + @Override + public void addIntermediateInput(int positionOffset, IntBigArrayBlock groups, Page page) { + state.enableGroupIdTracking(new SeenGroupIds.Empty()); + assert channels.size() == intermediateBlockCount(); + Block timestampsUncast = page.getBlock(channels.get(0)); + if (timestampsUncast.areAllValuesNull()) { + return; + } + LongBlock timestamps = (LongBlock) timestampsUncast; + Block valuesUncast = page.getBlock(channels.get(1)); + if (valuesUncast.areAllValuesNull()) { + return; + } + DoubleBlock values = (DoubleBlock) valuesUncast; + assert timestamps.getPositionCount() == values.getPositionCount(); + for (int groupPosition = 0; groupPosition < groups.getPositionCount(); groupPosition++) { + if (groups.isNull(groupPosition)) { + continue; + } + int groupStart = groups.getFirstValueIndex(groupPosition); + int groupEnd = groupStart + groups.getValueCount(groupPosition); + for (int g = groupStart; g < groupEnd; g++) { + int groupId = groups.getInt(g); + int valuesPosition = groupPosition + positionOffset; + IrateDoubleAggregator.combineIntermediate(state, groupId, timestamps, values, valuesPosition); + } + } + } + + private void addRawInput(int positionOffset, IntVector groups, DoubleBlock valueBlock, + LongBlock timestampBlock) { + for (int groupPosition = 0; groupPosition < groups.getPositionCount(); groupPosition++) { + int valuesPosition = groupPosition + positionOffset; + if (valueBlock.isNull(valuesPosition)) { + continue; + } + if (timestampBlock.isNull(valuesPosition)) { + continue; + } + int groupId = groups.getInt(groupPosition); + int valueStart = valueBlock.getFirstValueIndex(valuesPosition); + int valueEnd = valueStart + valueBlock.getValueCount(valuesPosition); + for (int valueOffset = valueStart; valueOffset < valueEnd; valueOffset++) { + double valueValue = valueBlock.getDouble(valueOffset); + int timestampStart = timestampBlock.getFirstValueIndex(valuesPosition); + int timestampEnd = timestampStart + timestampBlock.getValueCount(valuesPosition); + for (int timestampOffset = timestampStart; timestampOffset < timestampEnd; timestampOffset++) { + long timestampValue = timestampBlock.getLong(timestampOffset); + IrateDoubleAggregator.combine(state, groupId, valueValue, timestampValue); + } + } + } + } + + private void addRawInput(int positionOffset, IntVector groups, DoubleVector valueVector, + LongVector timestampVector) { + for (int groupPosition = 0; groupPosition < groups.getPositionCount(); groupPosition++) { + int valuesPosition = groupPosition + positionOffset; + int groupId = groups.getInt(groupPosition); + double valueValue = valueVector.getDouble(valuesPosition); + long timestampValue = timestampVector.getLong(valuesPosition); + IrateDoubleAggregator.combine(state, groupId, valueValue, timestampValue); + } + } + + @Override + public void addIntermediateInput(int positionOffset, IntVector groups, Page page) { + state.enableGroupIdTracking(new SeenGroupIds.Empty()); + assert channels.size() == intermediateBlockCount(); + Block timestampsUncast = page.getBlock(channels.get(0)); + if (timestampsUncast.areAllValuesNull()) { + return; + } + LongBlock timestamps = (LongBlock) timestampsUncast; + Block valuesUncast = page.getBlock(channels.get(1)); + if (valuesUncast.areAllValuesNull()) { + return; + } + DoubleBlock values = (DoubleBlock) valuesUncast; + assert timestamps.getPositionCount() == values.getPositionCount(); + for (int groupPosition = 0; groupPosition < groups.getPositionCount(); groupPosition++) { + int groupId = groups.getInt(groupPosition); + int valuesPosition = groupPosition + positionOffset; + IrateDoubleAggregator.combineIntermediate(state, groupId, timestamps, values, valuesPosition); + } + } + + private void maybeEnableGroupIdTracking(SeenGroupIds seenGroupIds, DoubleBlock valueBlock, + LongBlock timestampBlock) { + if (valueBlock.mayHaveNulls()) { + state.enableGroupIdTracking(seenGroupIds); + } + if (timestampBlock.mayHaveNulls()) { + state.enableGroupIdTracking(seenGroupIds); + } + } + + @Override + public void selectedMayContainUnseenGroups(SeenGroupIds seenGroupIds) { + state.enableGroupIdTracking(seenGroupIds); + } + + @Override + public void evaluateIntermediate(Block[] blocks, int offset, IntVector selected) { + state.toIntermediate(blocks, offset, selected, driverContext); + } + + @Override + public void evaluateFinal(Block[] blocks, int offset, IntVector selected, + GroupingAggregatorEvaluationContext ctx) { + blocks[offset] = IrateDoubleAggregator.evaluateFinal(state, selected, ctx); + } + + @Override + public String toString() { + StringBuilder sb = new StringBuilder(); + sb.append(getClass().getSimpleName()).append("["); + sb.append("channels=").append(channels); + sb.append("]"); + return sb.toString(); + } + + @Override + public void close() { + state.close(); + } +} diff --git a/x-pack/plugin/esql/compute/src/main/generated/org/elasticsearch/compute/aggregation/IrateFloatAggregatorFunctionSupplier.java b/x-pack/plugin/esql/compute/src/main/generated/org/elasticsearch/compute/aggregation/IrateFloatAggregatorFunctionSupplier.java new file mode 100644 index 0000000000000..5c656ad734e20 --- /dev/null +++ b/x-pack/plugin/esql/compute/src/main/generated/org/elasticsearch/compute/aggregation/IrateFloatAggregatorFunctionSupplier.java @@ -0,0 +1,46 @@ +// Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one +// or more contributor license agreements. Licensed under the Elastic License +// 2.0; you may not use this file except in compliance with the Elastic License +// 2.0. +package org.elasticsearch.compute.aggregation; + +import java.lang.Integer; +import java.lang.Override; +import java.lang.String; +import java.util.List; +import org.elasticsearch.compute.operator.DriverContext; + +/** + * {@link AggregatorFunctionSupplier} implementation for {@link IrateFloatAggregator}. + * This class is generated. Edit {@code AggregatorFunctionSupplierImplementer} instead. + */ +public final class IrateFloatAggregatorFunctionSupplier implements AggregatorFunctionSupplier { + public IrateFloatAggregatorFunctionSupplier() { + } + + @Override + public List nonGroupingIntermediateStateDesc() { + throw new UnsupportedOperationException("non-grouping aggregator is not supported"); + } + + @Override + public List groupingIntermediateStateDesc() { + return IrateFloatGroupingAggregatorFunction.intermediateStateDesc(); + } + + @Override + public AggregatorFunction aggregator(DriverContext driverContext, List channels) { + throw new UnsupportedOperationException("non-grouping aggregator is not supported"); + } + + @Override + public IrateFloatGroupingAggregatorFunction groupingAggregator(DriverContext driverContext, + List channels) { + return IrateFloatGroupingAggregatorFunction.create(channels, driverContext); + } + + @Override + public String describe() { + return "irate of floats"; + } +} diff --git a/x-pack/plugin/esql/compute/src/main/generated/org/elasticsearch/compute/aggregation/IrateFloatGroupingAggregatorFunction.java b/x-pack/plugin/esql/compute/src/main/generated/org/elasticsearch/compute/aggregation/IrateFloatGroupingAggregatorFunction.java new file mode 100644 index 0000000000000..d4e98548cca5a --- /dev/null +++ b/x-pack/plugin/esql/compute/src/main/generated/org/elasticsearch/compute/aggregation/IrateFloatGroupingAggregatorFunction.java @@ -0,0 +1,390 @@ +// Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one +// or more contributor license agreements. Licensed under the Elastic License +// 2.0; you may not use this file except in compliance with the Elastic License +// 2.0. +package org.elasticsearch.compute.aggregation; + +import java.lang.Integer; +import java.lang.Override; +import java.lang.String; +import java.lang.StringBuilder; +import java.util.List; +import org.elasticsearch.compute.data.Block; +import org.elasticsearch.compute.data.ElementType; +import org.elasticsearch.compute.data.FloatBlock; +import org.elasticsearch.compute.data.FloatVector; +import org.elasticsearch.compute.data.IntArrayBlock; +import org.elasticsearch.compute.data.IntBigArrayBlock; +import org.elasticsearch.compute.data.IntVector; +import org.elasticsearch.compute.data.LongBlock; +import org.elasticsearch.compute.data.LongVector; +import org.elasticsearch.compute.data.Page; +import org.elasticsearch.compute.operator.DriverContext; + +/** + * {@link GroupingAggregatorFunction} implementation for {@link IrateFloatAggregator}. + * This class is generated. Edit {@code GroupingAggregatorImplementer} instead. + */ +public final class IrateFloatGroupingAggregatorFunction implements GroupingAggregatorFunction { + private static final List INTERMEDIATE_STATE_DESC = List.of( + new IntermediateStateDesc("timestamps", ElementType.LONG), + new IntermediateStateDesc("values", ElementType.FLOAT) ); + + private final IrateFloatAggregator.FloatIrateGroupingState state; + + private final List channels; + + private final DriverContext driverContext; + + public IrateFloatGroupingAggregatorFunction(List channels, + IrateFloatAggregator.FloatIrateGroupingState state, DriverContext driverContext) { + this.channels = channels; + this.state = state; + this.driverContext = driverContext; + } + + public static IrateFloatGroupingAggregatorFunction create(List channels, + DriverContext driverContext) { + return new IrateFloatGroupingAggregatorFunction(channels, IrateFloatAggregator.initGrouping(driverContext), driverContext); + } + + public static List intermediateStateDesc() { + return INTERMEDIATE_STATE_DESC; + } + + @Override + public int intermediateBlockCount() { + return INTERMEDIATE_STATE_DESC.size(); + } + + @Override + public GroupingAggregatorFunction.AddInput prepareProcessRawInputPage(SeenGroupIds seenGroupIds, + Page page) { + FloatBlock valueBlock = page.getBlock(channels.get(0)); + LongBlock timestampBlock = page.getBlock(channels.get(1)); + FloatVector valueVector = valueBlock.asVector(); + if (valueVector == null) { + maybeEnableGroupIdTracking(seenGroupIds, valueBlock, timestampBlock); + return new GroupingAggregatorFunction.AddInput() { + @Override + public void add(int positionOffset, IntArrayBlock groupIds) { + addRawInput(positionOffset, groupIds, valueBlock, timestampBlock); + } + + @Override + public void add(int positionOffset, IntBigArrayBlock groupIds) { + addRawInput(positionOffset, groupIds, valueBlock, timestampBlock); + } + + @Override + public void add(int positionOffset, IntVector groupIds) { + addRawInput(positionOffset, groupIds, valueBlock, timestampBlock); + } + + @Override + public void close() { + } + }; + } + LongVector timestampVector = timestampBlock.asVector(); + if (timestampVector == null) { + maybeEnableGroupIdTracking(seenGroupIds, valueBlock, timestampBlock); + return new GroupingAggregatorFunction.AddInput() { + @Override + public void add(int positionOffset, IntArrayBlock groupIds) { + addRawInput(positionOffset, groupIds, valueBlock, timestampBlock); + } + + @Override + public void add(int positionOffset, IntBigArrayBlock groupIds) { + addRawInput(positionOffset, groupIds, valueBlock, timestampBlock); + } + + @Override + public void add(int positionOffset, IntVector groupIds) { + addRawInput(positionOffset, groupIds, valueBlock, timestampBlock); + } + + @Override + public void close() { + } + }; + } + return new GroupingAggregatorFunction.AddInput() { + @Override + public void add(int positionOffset, IntArrayBlock groupIds) { + addRawInput(positionOffset, groupIds, valueVector, timestampVector); + } + + @Override + public void add(int positionOffset, IntBigArrayBlock groupIds) { + addRawInput(positionOffset, groupIds, valueVector, timestampVector); + } + + @Override + public void add(int positionOffset, IntVector groupIds) { + addRawInput(positionOffset, groupIds, valueVector, timestampVector); + } + + @Override + public void close() { + } + }; + } + + private void addRawInput(int positionOffset, IntArrayBlock groups, FloatBlock valueBlock, + LongBlock timestampBlock) { + for (int groupPosition = 0; groupPosition < groups.getPositionCount(); groupPosition++) { + if (groups.isNull(groupPosition)) { + continue; + } + int valuesPosition = groupPosition + positionOffset; + if (valueBlock.isNull(valuesPosition)) { + continue; + } + if (timestampBlock.isNull(valuesPosition)) { + continue; + } + int groupStart = groups.getFirstValueIndex(groupPosition); + int groupEnd = groupStart + groups.getValueCount(groupPosition); + for (int g = groupStart; g < groupEnd; g++) { + int groupId = groups.getInt(g); + int valueStart = valueBlock.getFirstValueIndex(valuesPosition); + int valueEnd = valueStart + valueBlock.getValueCount(valuesPosition); + for (int valueOffset = valueStart; valueOffset < valueEnd; valueOffset++) { + float valueValue = valueBlock.getFloat(valueOffset); + int timestampStart = timestampBlock.getFirstValueIndex(valuesPosition); + int timestampEnd = timestampStart + timestampBlock.getValueCount(valuesPosition); + for (int timestampOffset = timestampStart; timestampOffset < timestampEnd; timestampOffset++) { + long timestampValue = timestampBlock.getLong(timestampOffset); + IrateFloatAggregator.combine(state, groupId, valueValue, timestampValue); + } + } + } + } + } + + private void addRawInput(int positionOffset, IntArrayBlock groups, FloatVector valueVector, + LongVector timestampVector) { + for (int groupPosition = 0; groupPosition < groups.getPositionCount(); groupPosition++) { + if (groups.isNull(groupPosition)) { + continue; + } + int valuesPosition = groupPosition + positionOffset; + int groupStart = groups.getFirstValueIndex(groupPosition); + int groupEnd = groupStart + groups.getValueCount(groupPosition); + for (int g = groupStart; g < groupEnd; g++) { + int groupId = groups.getInt(g); + float valueValue = valueVector.getFloat(valuesPosition); + long timestampValue = timestampVector.getLong(valuesPosition); + IrateFloatAggregator.combine(state, groupId, valueValue, timestampValue); + } + } + } + + @Override + public void addIntermediateInput(int positionOffset, IntArrayBlock groups, Page page) { + state.enableGroupIdTracking(new SeenGroupIds.Empty()); + assert channels.size() == intermediateBlockCount(); + Block timestampsUncast = page.getBlock(channels.get(0)); + if (timestampsUncast.areAllValuesNull()) { + return; + } + LongBlock timestamps = (LongBlock) timestampsUncast; + Block valuesUncast = page.getBlock(channels.get(1)); + if (valuesUncast.areAllValuesNull()) { + return; + } + FloatBlock values = (FloatBlock) valuesUncast; + assert timestamps.getPositionCount() == values.getPositionCount(); + for (int groupPosition = 0; groupPosition < groups.getPositionCount(); groupPosition++) { + if (groups.isNull(groupPosition)) { + continue; + } + int groupStart = groups.getFirstValueIndex(groupPosition); + int groupEnd = groupStart + groups.getValueCount(groupPosition); + for (int g = groupStart; g < groupEnd; g++) { + int groupId = groups.getInt(g); + int valuesPosition = groupPosition + positionOffset; + IrateFloatAggregator.combineIntermediate(state, groupId, timestamps, values, valuesPosition); + } + } + } + + private void addRawInput(int positionOffset, IntBigArrayBlock groups, FloatBlock valueBlock, + LongBlock timestampBlock) { + for (int groupPosition = 0; groupPosition < groups.getPositionCount(); groupPosition++) { + if (groups.isNull(groupPosition)) { + continue; + } + int valuesPosition = groupPosition + positionOffset; + if (valueBlock.isNull(valuesPosition)) { + continue; + } + if (timestampBlock.isNull(valuesPosition)) { + continue; + } + int groupStart = groups.getFirstValueIndex(groupPosition); + int groupEnd = groupStart + groups.getValueCount(groupPosition); + for (int g = groupStart; g < groupEnd; g++) { + int groupId = groups.getInt(g); + int valueStart = valueBlock.getFirstValueIndex(valuesPosition); + int valueEnd = valueStart + valueBlock.getValueCount(valuesPosition); + for (int valueOffset = valueStart; valueOffset < valueEnd; valueOffset++) { + float valueValue = valueBlock.getFloat(valueOffset); + int timestampStart = timestampBlock.getFirstValueIndex(valuesPosition); + int timestampEnd = timestampStart + timestampBlock.getValueCount(valuesPosition); + for (int timestampOffset = timestampStart; timestampOffset < timestampEnd; timestampOffset++) { + long timestampValue = timestampBlock.getLong(timestampOffset); + IrateFloatAggregator.combine(state, groupId, valueValue, timestampValue); + } + } + } + } + } + + private void addRawInput(int positionOffset, IntBigArrayBlock groups, FloatVector valueVector, + LongVector timestampVector) { + for (int groupPosition = 0; groupPosition < groups.getPositionCount(); groupPosition++) { + if (groups.isNull(groupPosition)) { + continue; + } + int valuesPosition = groupPosition + positionOffset; + int groupStart = groups.getFirstValueIndex(groupPosition); + int groupEnd = groupStart + groups.getValueCount(groupPosition); + for (int g = groupStart; g < groupEnd; g++) { + int groupId = groups.getInt(g); + float valueValue = valueVector.getFloat(valuesPosition); + long timestampValue = timestampVector.getLong(valuesPosition); + IrateFloatAggregator.combine(state, groupId, valueValue, timestampValue); + } + } + } + + @Override + public void addIntermediateInput(int positionOffset, IntBigArrayBlock groups, Page page) { + state.enableGroupIdTracking(new SeenGroupIds.Empty()); + assert channels.size() == intermediateBlockCount(); + Block timestampsUncast = page.getBlock(channels.get(0)); + if (timestampsUncast.areAllValuesNull()) { + return; + } + LongBlock timestamps = (LongBlock) timestampsUncast; + Block valuesUncast = page.getBlock(channels.get(1)); + if (valuesUncast.areAllValuesNull()) { + return; + } + FloatBlock values = (FloatBlock) valuesUncast; + assert timestamps.getPositionCount() == values.getPositionCount(); + for (int groupPosition = 0; groupPosition < groups.getPositionCount(); groupPosition++) { + if (groups.isNull(groupPosition)) { + continue; + } + int groupStart = groups.getFirstValueIndex(groupPosition); + int groupEnd = groupStart + groups.getValueCount(groupPosition); + for (int g = groupStart; g < groupEnd; g++) { + int groupId = groups.getInt(g); + int valuesPosition = groupPosition + positionOffset; + IrateFloatAggregator.combineIntermediate(state, groupId, timestamps, values, valuesPosition); + } + } + } + + private void addRawInput(int positionOffset, IntVector groups, FloatBlock valueBlock, + LongBlock timestampBlock) { + for (int groupPosition = 0; groupPosition < groups.getPositionCount(); groupPosition++) { + int valuesPosition = groupPosition + positionOffset; + if (valueBlock.isNull(valuesPosition)) { + continue; + } + if (timestampBlock.isNull(valuesPosition)) { + continue; + } + int groupId = groups.getInt(groupPosition); + int valueStart = valueBlock.getFirstValueIndex(valuesPosition); + int valueEnd = valueStart + valueBlock.getValueCount(valuesPosition); + for (int valueOffset = valueStart; valueOffset < valueEnd; valueOffset++) { + float valueValue = valueBlock.getFloat(valueOffset); + int timestampStart = timestampBlock.getFirstValueIndex(valuesPosition); + int timestampEnd = timestampStart + timestampBlock.getValueCount(valuesPosition); + for (int timestampOffset = timestampStart; timestampOffset < timestampEnd; timestampOffset++) { + long timestampValue = timestampBlock.getLong(timestampOffset); + IrateFloatAggregator.combine(state, groupId, valueValue, timestampValue); + } + } + } + } + + private void addRawInput(int positionOffset, IntVector groups, FloatVector valueVector, + LongVector timestampVector) { + for (int groupPosition = 0; groupPosition < groups.getPositionCount(); groupPosition++) { + int valuesPosition = groupPosition + positionOffset; + int groupId = groups.getInt(groupPosition); + float valueValue = valueVector.getFloat(valuesPosition); + long timestampValue = timestampVector.getLong(valuesPosition); + IrateFloatAggregator.combine(state, groupId, valueValue, timestampValue); + } + } + + @Override + public void addIntermediateInput(int positionOffset, IntVector groups, Page page) { + state.enableGroupIdTracking(new SeenGroupIds.Empty()); + assert channels.size() == intermediateBlockCount(); + Block timestampsUncast = page.getBlock(channels.get(0)); + if (timestampsUncast.areAllValuesNull()) { + return; + } + LongBlock timestamps = (LongBlock) timestampsUncast; + Block valuesUncast = page.getBlock(channels.get(1)); + if (valuesUncast.areAllValuesNull()) { + return; + } + FloatBlock values = (FloatBlock) valuesUncast; + assert timestamps.getPositionCount() == values.getPositionCount(); + for (int groupPosition = 0; groupPosition < groups.getPositionCount(); groupPosition++) { + int groupId = groups.getInt(groupPosition); + int valuesPosition = groupPosition + positionOffset; + IrateFloatAggregator.combineIntermediate(state, groupId, timestamps, values, valuesPosition); + } + } + + private void maybeEnableGroupIdTracking(SeenGroupIds seenGroupIds, FloatBlock valueBlock, + LongBlock timestampBlock) { + if (valueBlock.mayHaveNulls()) { + state.enableGroupIdTracking(seenGroupIds); + } + if (timestampBlock.mayHaveNulls()) { + state.enableGroupIdTracking(seenGroupIds); + } + } + + @Override + public void selectedMayContainUnseenGroups(SeenGroupIds seenGroupIds) { + state.enableGroupIdTracking(seenGroupIds); + } + + @Override + public void evaluateIntermediate(Block[] blocks, int offset, IntVector selected) { + state.toIntermediate(blocks, offset, selected, driverContext); + } + + @Override + public void evaluateFinal(Block[] blocks, int offset, IntVector selected, + GroupingAggregatorEvaluationContext ctx) { + blocks[offset] = IrateFloatAggregator.evaluateFinal(state, selected, ctx); + } + + @Override + public String toString() { + StringBuilder sb = new StringBuilder(); + sb.append(getClass().getSimpleName()).append("["); + sb.append("channels=").append(channels); + sb.append("]"); + return sb.toString(); + } + + @Override + public void close() { + state.close(); + } +} diff --git a/x-pack/plugin/esql/compute/src/main/generated/org/elasticsearch/compute/aggregation/IrateIntAggregatorFunctionSupplier.java b/x-pack/plugin/esql/compute/src/main/generated/org/elasticsearch/compute/aggregation/IrateIntAggregatorFunctionSupplier.java new file mode 100644 index 0000000000000..2392b6cfdaf0c --- /dev/null +++ b/x-pack/plugin/esql/compute/src/main/generated/org/elasticsearch/compute/aggregation/IrateIntAggregatorFunctionSupplier.java @@ -0,0 +1,46 @@ +// Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one +// or more contributor license agreements. Licensed under the Elastic License +// 2.0; you may not use this file except in compliance with the Elastic License +// 2.0. +package org.elasticsearch.compute.aggregation; + +import java.lang.Integer; +import java.lang.Override; +import java.lang.String; +import java.util.List; +import org.elasticsearch.compute.operator.DriverContext; + +/** + * {@link AggregatorFunctionSupplier} implementation for {@link IrateIntAggregator}. + * This class is generated. Edit {@code AggregatorFunctionSupplierImplementer} instead. + */ +public final class IrateIntAggregatorFunctionSupplier implements AggregatorFunctionSupplier { + public IrateIntAggregatorFunctionSupplier() { + } + + @Override + public List nonGroupingIntermediateStateDesc() { + throw new UnsupportedOperationException("non-grouping aggregator is not supported"); + } + + @Override + public List groupingIntermediateStateDesc() { + return IrateIntGroupingAggregatorFunction.intermediateStateDesc(); + } + + @Override + public AggregatorFunction aggregator(DriverContext driverContext, List channels) { + throw new UnsupportedOperationException("non-grouping aggregator is not supported"); + } + + @Override + public IrateIntGroupingAggregatorFunction groupingAggregator(DriverContext driverContext, + List channels) { + return IrateIntGroupingAggregatorFunction.create(channels, driverContext); + } + + @Override + public String describe() { + return "irate of ints"; + } +} diff --git a/x-pack/plugin/esql/compute/src/main/generated/org/elasticsearch/compute/aggregation/IrateIntGroupingAggregatorFunction.java b/x-pack/plugin/esql/compute/src/main/generated/org/elasticsearch/compute/aggregation/IrateIntGroupingAggregatorFunction.java new file mode 100644 index 0000000000000..5802fa22225e3 --- /dev/null +++ b/x-pack/plugin/esql/compute/src/main/generated/org/elasticsearch/compute/aggregation/IrateIntGroupingAggregatorFunction.java @@ -0,0 +1,389 @@ +// Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one +// or more contributor license agreements. Licensed under the Elastic License +// 2.0; you may not use this file except in compliance with the Elastic License +// 2.0. +package org.elasticsearch.compute.aggregation; + +import java.lang.Integer; +import java.lang.Override; +import java.lang.String; +import java.lang.StringBuilder; +import java.util.List; +import org.elasticsearch.compute.data.Block; +import org.elasticsearch.compute.data.ElementType; +import org.elasticsearch.compute.data.IntArrayBlock; +import org.elasticsearch.compute.data.IntBigArrayBlock; +import org.elasticsearch.compute.data.IntBlock; +import org.elasticsearch.compute.data.IntVector; +import org.elasticsearch.compute.data.LongBlock; +import org.elasticsearch.compute.data.LongVector; +import org.elasticsearch.compute.data.Page; +import org.elasticsearch.compute.operator.DriverContext; + +/** + * {@link GroupingAggregatorFunction} implementation for {@link IrateIntAggregator}. + * This class is generated. Edit {@code GroupingAggregatorImplementer} instead. + */ +public final class IrateIntGroupingAggregatorFunction implements GroupingAggregatorFunction { + private static final List INTERMEDIATE_STATE_DESC = List.of( + new IntermediateStateDesc("timestamps", ElementType.LONG), + new IntermediateStateDesc("values", ElementType.INT) ); + + private final IrateIntAggregator.IntIrateGroupingState state; + + private final List channels; + + private final DriverContext driverContext; + + public IrateIntGroupingAggregatorFunction(List channels, + IrateIntAggregator.IntIrateGroupingState state, DriverContext driverContext) { + this.channels = channels; + this.state = state; + this.driverContext = driverContext; + } + + public static IrateIntGroupingAggregatorFunction create(List channels, + DriverContext driverContext) { + return new IrateIntGroupingAggregatorFunction(channels, IrateIntAggregator.initGrouping(driverContext), driverContext); + } + + public static List intermediateStateDesc() { + return INTERMEDIATE_STATE_DESC; + } + + @Override + public int intermediateBlockCount() { + return INTERMEDIATE_STATE_DESC.size(); + } + + @Override + public GroupingAggregatorFunction.AddInput prepareProcessRawInputPage(SeenGroupIds seenGroupIds, + Page page) { + IntBlock valueBlock = page.getBlock(channels.get(0)); + LongBlock timestampBlock = page.getBlock(channels.get(1)); + IntVector valueVector = valueBlock.asVector(); + if (valueVector == null) { + maybeEnableGroupIdTracking(seenGroupIds, valueBlock, timestampBlock); + return new GroupingAggregatorFunction.AddInput() { + @Override + public void add(int positionOffset, IntArrayBlock groupIds) { + addRawInput(positionOffset, groupIds, valueBlock, timestampBlock); + } + + @Override + public void add(int positionOffset, IntBigArrayBlock groupIds) { + addRawInput(positionOffset, groupIds, valueBlock, timestampBlock); + } + + @Override + public void add(int positionOffset, IntVector groupIds) { + addRawInput(positionOffset, groupIds, valueBlock, timestampBlock); + } + + @Override + public void close() { + } + }; + } + LongVector timestampVector = timestampBlock.asVector(); + if (timestampVector == null) { + maybeEnableGroupIdTracking(seenGroupIds, valueBlock, timestampBlock); + return new GroupingAggregatorFunction.AddInput() { + @Override + public void add(int positionOffset, IntArrayBlock groupIds) { + addRawInput(positionOffset, groupIds, valueBlock, timestampBlock); + } + + @Override + public void add(int positionOffset, IntBigArrayBlock groupIds) { + addRawInput(positionOffset, groupIds, valueBlock, timestampBlock); + } + + @Override + public void add(int positionOffset, IntVector groupIds) { + addRawInput(positionOffset, groupIds, valueBlock, timestampBlock); + } + + @Override + public void close() { + } + }; + } + return new GroupingAggregatorFunction.AddInput() { + @Override + public void add(int positionOffset, IntArrayBlock groupIds) { + addRawInput(positionOffset, groupIds, valueVector, timestampVector); + } + + @Override + public void add(int positionOffset, IntBigArrayBlock groupIds) { + addRawInput(positionOffset, groupIds, valueVector, timestampVector); + } + + @Override + public void add(int positionOffset, IntVector groupIds) { + addRawInput(positionOffset, groupIds, valueVector, timestampVector); + } + + @Override + public void close() { + } + }; + } + + private void addRawInput(int positionOffset, IntArrayBlock groups, IntBlock valueBlock, + LongBlock timestampBlock) { + for (int groupPosition = 0; groupPosition < groups.getPositionCount(); groupPosition++) { + if (groups.isNull(groupPosition)) { + continue; + } + int valuesPosition = groupPosition + positionOffset; + if (valueBlock.isNull(valuesPosition)) { + continue; + } + if (timestampBlock.isNull(valuesPosition)) { + continue; + } + int groupStart = groups.getFirstValueIndex(groupPosition); + int groupEnd = groupStart + groups.getValueCount(groupPosition); + for (int g = groupStart; g < groupEnd; g++) { + int groupId = groups.getInt(g); + int valueStart = valueBlock.getFirstValueIndex(valuesPosition); + int valueEnd = valueStart + valueBlock.getValueCount(valuesPosition); + for (int valueOffset = valueStart; valueOffset < valueEnd; valueOffset++) { + int valueValue = valueBlock.getInt(valueOffset); + int timestampStart = timestampBlock.getFirstValueIndex(valuesPosition); + int timestampEnd = timestampStart + timestampBlock.getValueCount(valuesPosition); + for (int timestampOffset = timestampStart; timestampOffset < timestampEnd; timestampOffset++) { + long timestampValue = timestampBlock.getLong(timestampOffset); + IrateIntAggregator.combine(state, groupId, valueValue, timestampValue); + } + } + } + } + } + + private void addRawInput(int positionOffset, IntArrayBlock groups, IntVector valueVector, + LongVector timestampVector) { + for (int groupPosition = 0; groupPosition < groups.getPositionCount(); groupPosition++) { + if (groups.isNull(groupPosition)) { + continue; + } + int valuesPosition = groupPosition + positionOffset; + int groupStart = groups.getFirstValueIndex(groupPosition); + int groupEnd = groupStart + groups.getValueCount(groupPosition); + for (int g = groupStart; g < groupEnd; g++) { + int groupId = groups.getInt(g); + int valueValue = valueVector.getInt(valuesPosition); + long timestampValue = timestampVector.getLong(valuesPosition); + IrateIntAggregator.combine(state, groupId, valueValue, timestampValue); + } + } + } + + @Override + public void addIntermediateInput(int positionOffset, IntArrayBlock groups, Page page) { + state.enableGroupIdTracking(new SeenGroupIds.Empty()); + assert channels.size() == intermediateBlockCount(); + Block timestampsUncast = page.getBlock(channels.get(0)); + if (timestampsUncast.areAllValuesNull()) { + return; + } + LongBlock timestamps = (LongBlock) timestampsUncast; + Block valuesUncast = page.getBlock(channels.get(1)); + if (valuesUncast.areAllValuesNull()) { + return; + } + IntBlock values = (IntBlock) valuesUncast; + assert timestamps.getPositionCount() == values.getPositionCount(); + for (int groupPosition = 0; groupPosition < groups.getPositionCount(); groupPosition++) { + if (groups.isNull(groupPosition)) { + continue; + } + int groupStart = groups.getFirstValueIndex(groupPosition); + int groupEnd = groupStart + groups.getValueCount(groupPosition); + for (int g = groupStart; g < groupEnd; g++) { + int groupId = groups.getInt(g); + int valuesPosition = groupPosition + positionOffset; + IrateIntAggregator.combineIntermediate(state, groupId, timestamps, values, valuesPosition); + } + } + } + + private void addRawInput(int positionOffset, IntBigArrayBlock groups, IntBlock valueBlock, + LongBlock timestampBlock) { + for (int groupPosition = 0; groupPosition < groups.getPositionCount(); groupPosition++) { + if (groups.isNull(groupPosition)) { + continue; + } + int valuesPosition = groupPosition + positionOffset; + if (valueBlock.isNull(valuesPosition)) { + continue; + } + if (timestampBlock.isNull(valuesPosition)) { + continue; + } + int groupStart = groups.getFirstValueIndex(groupPosition); + int groupEnd = groupStart + groups.getValueCount(groupPosition); + for (int g = groupStart; g < groupEnd; g++) { + int groupId = groups.getInt(g); + int valueStart = valueBlock.getFirstValueIndex(valuesPosition); + int valueEnd = valueStart + valueBlock.getValueCount(valuesPosition); + for (int valueOffset = valueStart; valueOffset < valueEnd; valueOffset++) { + int valueValue = valueBlock.getInt(valueOffset); + int timestampStart = timestampBlock.getFirstValueIndex(valuesPosition); + int timestampEnd = timestampStart + timestampBlock.getValueCount(valuesPosition); + for (int timestampOffset = timestampStart; timestampOffset < timestampEnd; timestampOffset++) { + long timestampValue = timestampBlock.getLong(timestampOffset); + IrateIntAggregator.combine(state, groupId, valueValue, timestampValue); + } + } + } + } + } + + private void addRawInput(int positionOffset, IntBigArrayBlock groups, IntVector valueVector, + LongVector timestampVector) { + for (int groupPosition = 0; groupPosition < groups.getPositionCount(); groupPosition++) { + if (groups.isNull(groupPosition)) { + continue; + } + int valuesPosition = groupPosition + positionOffset; + int groupStart = groups.getFirstValueIndex(groupPosition); + int groupEnd = groupStart + groups.getValueCount(groupPosition); + for (int g = groupStart; g < groupEnd; g++) { + int groupId = groups.getInt(g); + int valueValue = valueVector.getInt(valuesPosition); + long timestampValue = timestampVector.getLong(valuesPosition); + IrateIntAggregator.combine(state, groupId, valueValue, timestampValue); + } + } + } + + @Override + public void addIntermediateInput(int positionOffset, IntBigArrayBlock groups, Page page) { + state.enableGroupIdTracking(new SeenGroupIds.Empty()); + assert channels.size() == intermediateBlockCount(); + Block timestampsUncast = page.getBlock(channels.get(0)); + if (timestampsUncast.areAllValuesNull()) { + return; + } + LongBlock timestamps = (LongBlock) timestampsUncast; + Block valuesUncast = page.getBlock(channels.get(1)); + if (valuesUncast.areAllValuesNull()) { + return; + } + IntBlock values = (IntBlock) valuesUncast; + assert timestamps.getPositionCount() == values.getPositionCount(); + for (int groupPosition = 0; groupPosition < groups.getPositionCount(); groupPosition++) { + if (groups.isNull(groupPosition)) { + continue; + } + int groupStart = groups.getFirstValueIndex(groupPosition); + int groupEnd = groupStart + groups.getValueCount(groupPosition); + for (int g = groupStart; g < groupEnd; g++) { + int groupId = groups.getInt(g); + int valuesPosition = groupPosition + positionOffset; + IrateIntAggregator.combineIntermediate(state, groupId, timestamps, values, valuesPosition); + } + } + } + + private void addRawInput(int positionOffset, IntVector groups, IntBlock valueBlock, + LongBlock timestampBlock) { + for (int groupPosition = 0; groupPosition < groups.getPositionCount(); groupPosition++) { + int valuesPosition = groupPosition + positionOffset; + if (valueBlock.isNull(valuesPosition)) { + continue; + } + if (timestampBlock.isNull(valuesPosition)) { + continue; + } + int groupId = groups.getInt(groupPosition); + int valueStart = valueBlock.getFirstValueIndex(valuesPosition); + int valueEnd = valueStart + valueBlock.getValueCount(valuesPosition); + for (int valueOffset = valueStart; valueOffset < valueEnd; valueOffset++) { + int valueValue = valueBlock.getInt(valueOffset); + int timestampStart = timestampBlock.getFirstValueIndex(valuesPosition); + int timestampEnd = timestampStart + timestampBlock.getValueCount(valuesPosition); + for (int timestampOffset = timestampStart; timestampOffset < timestampEnd; timestampOffset++) { + long timestampValue = timestampBlock.getLong(timestampOffset); + IrateIntAggregator.combine(state, groupId, valueValue, timestampValue); + } + } + } + } + + private void addRawInput(int positionOffset, IntVector groups, IntVector valueVector, + LongVector timestampVector) { + for (int groupPosition = 0; groupPosition < groups.getPositionCount(); groupPosition++) { + int valuesPosition = groupPosition + positionOffset; + int groupId = groups.getInt(groupPosition); + int valueValue = valueVector.getInt(valuesPosition); + long timestampValue = timestampVector.getLong(valuesPosition); + IrateIntAggregator.combine(state, groupId, valueValue, timestampValue); + } + } + + @Override + public void addIntermediateInput(int positionOffset, IntVector groups, Page page) { + state.enableGroupIdTracking(new SeenGroupIds.Empty()); + assert channels.size() == intermediateBlockCount(); + Block timestampsUncast = page.getBlock(channels.get(0)); + if (timestampsUncast.areAllValuesNull()) { + return; + } + LongBlock timestamps = (LongBlock) timestampsUncast; + Block valuesUncast = page.getBlock(channels.get(1)); + if (valuesUncast.areAllValuesNull()) { + return; + } + IntBlock values = (IntBlock) valuesUncast; + assert timestamps.getPositionCount() == values.getPositionCount(); + for (int groupPosition = 0; groupPosition < groups.getPositionCount(); groupPosition++) { + int groupId = groups.getInt(groupPosition); + int valuesPosition = groupPosition + positionOffset; + IrateIntAggregator.combineIntermediate(state, groupId, timestamps, values, valuesPosition); + } + } + + private void maybeEnableGroupIdTracking(SeenGroupIds seenGroupIds, IntBlock valueBlock, + LongBlock timestampBlock) { + if (valueBlock.mayHaveNulls()) { + state.enableGroupIdTracking(seenGroupIds); + } + if (timestampBlock.mayHaveNulls()) { + state.enableGroupIdTracking(seenGroupIds); + } + } + + @Override + public void selectedMayContainUnseenGroups(SeenGroupIds seenGroupIds) { + state.enableGroupIdTracking(seenGroupIds); + } + + @Override + public void evaluateIntermediate(Block[] blocks, int offset, IntVector selected) { + state.toIntermediate(blocks, offset, selected, driverContext); + } + + @Override + public void evaluateFinal(Block[] blocks, int offset, IntVector selected, + GroupingAggregatorEvaluationContext ctx) { + blocks[offset] = IrateIntAggregator.evaluateFinal(state, selected, ctx); + } + + @Override + public String toString() { + StringBuilder sb = new StringBuilder(); + sb.append(getClass().getSimpleName()).append("["); + sb.append("channels=").append(channels); + sb.append("]"); + return sb.toString(); + } + + @Override + public void close() { + state.close(); + } +} diff --git a/x-pack/plugin/esql/compute/src/main/generated/org/elasticsearch/compute/aggregation/IrateLongAggregatorFunctionSupplier.java b/x-pack/plugin/esql/compute/src/main/generated/org/elasticsearch/compute/aggregation/IrateLongAggregatorFunctionSupplier.java new file mode 100644 index 0000000000000..92cc7bdc7b2fc --- /dev/null +++ b/x-pack/plugin/esql/compute/src/main/generated/org/elasticsearch/compute/aggregation/IrateLongAggregatorFunctionSupplier.java @@ -0,0 +1,46 @@ +// Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one +// or more contributor license agreements. Licensed under the Elastic License +// 2.0; you may not use this file except in compliance with the Elastic License +// 2.0. +package org.elasticsearch.compute.aggregation; + +import java.lang.Integer; +import java.lang.Override; +import java.lang.String; +import java.util.List; +import org.elasticsearch.compute.operator.DriverContext; + +/** + * {@link AggregatorFunctionSupplier} implementation for {@link IrateLongAggregator}. + * This class is generated. Edit {@code AggregatorFunctionSupplierImplementer} instead. + */ +public final class IrateLongAggregatorFunctionSupplier implements AggregatorFunctionSupplier { + public IrateLongAggregatorFunctionSupplier() { + } + + @Override + public List nonGroupingIntermediateStateDesc() { + throw new UnsupportedOperationException("non-grouping aggregator is not supported"); + } + + @Override + public List groupingIntermediateStateDesc() { + return IrateLongGroupingAggregatorFunction.intermediateStateDesc(); + } + + @Override + public AggregatorFunction aggregator(DriverContext driverContext, List channels) { + throw new UnsupportedOperationException("non-grouping aggregator is not supported"); + } + + @Override + public IrateLongGroupingAggregatorFunction groupingAggregator(DriverContext driverContext, + List channels) { + return IrateLongGroupingAggregatorFunction.create(channels, driverContext); + } + + @Override + public String describe() { + return "irate of longs"; + } +} diff --git a/x-pack/plugin/esql/compute/src/main/generated/org/elasticsearch/compute/aggregation/IrateLongGroupingAggregatorFunction.java b/x-pack/plugin/esql/compute/src/main/generated/org/elasticsearch/compute/aggregation/IrateLongGroupingAggregatorFunction.java new file mode 100644 index 0000000000000..bfac0fc0771e7 --- /dev/null +++ b/x-pack/plugin/esql/compute/src/main/generated/org/elasticsearch/compute/aggregation/IrateLongGroupingAggregatorFunction.java @@ -0,0 +1,388 @@ +// Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one +// or more contributor license agreements. Licensed under the Elastic License +// 2.0; you may not use this file except in compliance with the Elastic License +// 2.0. +package org.elasticsearch.compute.aggregation; + +import java.lang.Integer; +import java.lang.Override; +import java.lang.String; +import java.lang.StringBuilder; +import java.util.List; +import org.elasticsearch.compute.data.Block; +import org.elasticsearch.compute.data.ElementType; +import org.elasticsearch.compute.data.IntArrayBlock; +import org.elasticsearch.compute.data.IntBigArrayBlock; +import org.elasticsearch.compute.data.IntVector; +import org.elasticsearch.compute.data.LongBlock; +import org.elasticsearch.compute.data.LongVector; +import org.elasticsearch.compute.data.Page; +import org.elasticsearch.compute.operator.DriverContext; + +/** + * {@link GroupingAggregatorFunction} implementation for {@link IrateLongAggregator}. + * This class is generated. Edit {@code GroupingAggregatorImplementer} instead. + */ +public final class IrateLongGroupingAggregatorFunction implements GroupingAggregatorFunction { + private static final List INTERMEDIATE_STATE_DESC = List.of( + new IntermediateStateDesc("timestamps", ElementType.LONG), + new IntermediateStateDesc("values", ElementType.LONG) ); + + private final IrateLongAggregator.LongIrateGroupingState state; + + private final List channels; + + private final DriverContext driverContext; + + public IrateLongGroupingAggregatorFunction(List channels, + IrateLongAggregator.LongIrateGroupingState state, DriverContext driverContext) { + this.channels = channels; + this.state = state; + this.driverContext = driverContext; + } + + public static IrateLongGroupingAggregatorFunction create(List channels, + DriverContext driverContext) { + return new IrateLongGroupingAggregatorFunction(channels, IrateLongAggregator.initGrouping(driverContext), driverContext); + } + + public static List intermediateStateDesc() { + return INTERMEDIATE_STATE_DESC; + } + + @Override + public int intermediateBlockCount() { + return INTERMEDIATE_STATE_DESC.size(); + } + + @Override + public GroupingAggregatorFunction.AddInput prepareProcessRawInputPage(SeenGroupIds seenGroupIds, + Page page) { + LongBlock valueBlock = page.getBlock(channels.get(0)); + LongBlock timestampBlock = page.getBlock(channels.get(1)); + LongVector valueVector = valueBlock.asVector(); + if (valueVector == null) { + maybeEnableGroupIdTracking(seenGroupIds, valueBlock, timestampBlock); + return new GroupingAggregatorFunction.AddInput() { + @Override + public void add(int positionOffset, IntArrayBlock groupIds) { + addRawInput(positionOffset, groupIds, valueBlock, timestampBlock); + } + + @Override + public void add(int positionOffset, IntBigArrayBlock groupIds) { + addRawInput(positionOffset, groupIds, valueBlock, timestampBlock); + } + + @Override + public void add(int positionOffset, IntVector groupIds) { + addRawInput(positionOffset, groupIds, valueBlock, timestampBlock); + } + + @Override + public void close() { + } + }; + } + LongVector timestampVector = timestampBlock.asVector(); + if (timestampVector == null) { + maybeEnableGroupIdTracking(seenGroupIds, valueBlock, timestampBlock); + return new GroupingAggregatorFunction.AddInput() { + @Override + public void add(int positionOffset, IntArrayBlock groupIds) { + addRawInput(positionOffset, groupIds, valueBlock, timestampBlock); + } + + @Override + public void add(int positionOffset, IntBigArrayBlock groupIds) { + addRawInput(positionOffset, groupIds, valueBlock, timestampBlock); + } + + @Override + public void add(int positionOffset, IntVector groupIds) { + addRawInput(positionOffset, groupIds, valueBlock, timestampBlock); + } + + @Override + public void close() { + } + }; + } + return new GroupingAggregatorFunction.AddInput() { + @Override + public void add(int positionOffset, IntArrayBlock groupIds) { + addRawInput(positionOffset, groupIds, valueVector, timestampVector); + } + + @Override + public void add(int positionOffset, IntBigArrayBlock groupIds) { + addRawInput(positionOffset, groupIds, valueVector, timestampVector); + } + + @Override + public void add(int positionOffset, IntVector groupIds) { + addRawInput(positionOffset, groupIds, valueVector, timestampVector); + } + + @Override + public void close() { + } + }; + } + + private void addRawInput(int positionOffset, IntArrayBlock groups, LongBlock valueBlock, + LongBlock timestampBlock) { + for (int groupPosition = 0; groupPosition < groups.getPositionCount(); groupPosition++) { + if (groups.isNull(groupPosition)) { + continue; + } + int valuesPosition = groupPosition + positionOffset; + if (valueBlock.isNull(valuesPosition)) { + continue; + } + if (timestampBlock.isNull(valuesPosition)) { + continue; + } + int groupStart = groups.getFirstValueIndex(groupPosition); + int groupEnd = groupStart + groups.getValueCount(groupPosition); + for (int g = groupStart; g < groupEnd; g++) { + int groupId = groups.getInt(g); + int valueStart = valueBlock.getFirstValueIndex(valuesPosition); + int valueEnd = valueStart + valueBlock.getValueCount(valuesPosition); + for (int valueOffset = valueStart; valueOffset < valueEnd; valueOffset++) { + long valueValue = valueBlock.getLong(valueOffset); + int timestampStart = timestampBlock.getFirstValueIndex(valuesPosition); + int timestampEnd = timestampStart + timestampBlock.getValueCount(valuesPosition); + for (int timestampOffset = timestampStart; timestampOffset < timestampEnd; timestampOffset++) { + long timestampValue = timestampBlock.getLong(timestampOffset); + IrateLongAggregator.combine(state, groupId, valueValue, timestampValue); + } + } + } + } + } + + private void addRawInput(int positionOffset, IntArrayBlock groups, LongVector valueVector, + LongVector timestampVector) { + for (int groupPosition = 0; groupPosition < groups.getPositionCount(); groupPosition++) { + if (groups.isNull(groupPosition)) { + continue; + } + int valuesPosition = groupPosition + positionOffset; + int groupStart = groups.getFirstValueIndex(groupPosition); + int groupEnd = groupStart + groups.getValueCount(groupPosition); + for (int g = groupStart; g < groupEnd; g++) { + int groupId = groups.getInt(g); + long valueValue = valueVector.getLong(valuesPosition); + long timestampValue = timestampVector.getLong(valuesPosition); + IrateLongAggregator.combine(state, groupId, valueValue, timestampValue); + } + } + } + + @Override + public void addIntermediateInput(int positionOffset, IntArrayBlock groups, Page page) { + state.enableGroupIdTracking(new SeenGroupIds.Empty()); + assert channels.size() == intermediateBlockCount(); + Block timestampsUncast = page.getBlock(channels.get(0)); + if (timestampsUncast.areAllValuesNull()) { + return; + } + LongBlock timestamps = (LongBlock) timestampsUncast; + Block valuesUncast = page.getBlock(channels.get(1)); + if (valuesUncast.areAllValuesNull()) { + return; + } + LongBlock values = (LongBlock) valuesUncast; + assert timestamps.getPositionCount() == values.getPositionCount(); + for (int groupPosition = 0; groupPosition < groups.getPositionCount(); groupPosition++) { + if (groups.isNull(groupPosition)) { + continue; + } + int groupStart = groups.getFirstValueIndex(groupPosition); + int groupEnd = groupStart + groups.getValueCount(groupPosition); + for (int g = groupStart; g < groupEnd; g++) { + int groupId = groups.getInt(g); + int valuesPosition = groupPosition + positionOffset; + IrateLongAggregator.combineIntermediate(state, groupId, timestamps, values, valuesPosition); + } + } + } + + private void addRawInput(int positionOffset, IntBigArrayBlock groups, LongBlock valueBlock, + LongBlock timestampBlock) { + for (int groupPosition = 0; groupPosition < groups.getPositionCount(); groupPosition++) { + if (groups.isNull(groupPosition)) { + continue; + } + int valuesPosition = groupPosition + positionOffset; + if (valueBlock.isNull(valuesPosition)) { + continue; + } + if (timestampBlock.isNull(valuesPosition)) { + continue; + } + int groupStart = groups.getFirstValueIndex(groupPosition); + int groupEnd = groupStart + groups.getValueCount(groupPosition); + for (int g = groupStart; g < groupEnd; g++) { + int groupId = groups.getInt(g); + int valueStart = valueBlock.getFirstValueIndex(valuesPosition); + int valueEnd = valueStart + valueBlock.getValueCount(valuesPosition); + for (int valueOffset = valueStart; valueOffset < valueEnd; valueOffset++) { + long valueValue = valueBlock.getLong(valueOffset); + int timestampStart = timestampBlock.getFirstValueIndex(valuesPosition); + int timestampEnd = timestampStart + timestampBlock.getValueCount(valuesPosition); + for (int timestampOffset = timestampStart; timestampOffset < timestampEnd; timestampOffset++) { + long timestampValue = timestampBlock.getLong(timestampOffset); + IrateLongAggregator.combine(state, groupId, valueValue, timestampValue); + } + } + } + } + } + + private void addRawInput(int positionOffset, IntBigArrayBlock groups, LongVector valueVector, + LongVector timestampVector) { + for (int groupPosition = 0; groupPosition < groups.getPositionCount(); groupPosition++) { + if (groups.isNull(groupPosition)) { + continue; + } + int valuesPosition = groupPosition + positionOffset; + int groupStart = groups.getFirstValueIndex(groupPosition); + int groupEnd = groupStart + groups.getValueCount(groupPosition); + for (int g = groupStart; g < groupEnd; g++) { + int groupId = groups.getInt(g); + long valueValue = valueVector.getLong(valuesPosition); + long timestampValue = timestampVector.getLong(valuesPosition); + IrateLongAggregator.combine(state, groupId, valueValue, timestampValue); + } + } + } + + @Override + public void addIntermediateInput(int positionOffset, IntBigArrayBlock groups, Page page) { + state.enableGroupIdTracking(new SeenGroupIds.Empty()); + assert channels.size() == intermediateBlockCount(); + Block timestampsUncast = page.getBlock(channels.get(0)); + if (timestampsUncast.areAllValuesNull()) { + return; + } + LongBlock timestamps = (LongBlock) timestampsUncast; + Block valuesUncast = page.getBlock(channels.get(1)); + if (valuesUncast.areAllValuesNull()) { + return; + } + LongBlock values = (LongBlock) valuesUncast; + assert timestamps.getPositionCount() == values.getPositionCount(); + for (int groupPosition = 0; groupPosition < groups.getPositionCount(); groupPosition++) { + if (groups.isNull(groupPosition)) { + continue; + } + int groupStart = groups.getFirstValueIndex(groupPosition); + int groupEnd = groupStart + groups.getValueCount(groupPosition); + for (int g = groupStart; g < groupEnd; g++) { + int groupId = groups.getInt(g); + int valuesPosition = groupPosition + positionOffset; + IrateLongAggregator.combineIntermediate(state, groupId, timestamps, values, valuesPosition); + } + } + } + + private void addRawInput(int positionOffset, IntVector groups, LongBlock valueBlock, + LongBlock timestampBlock) { + for (int groupPosition = 0; groupPosition < groups.getPositionCount(); groupPosition++) { + int valuesPosition = groupPosition + positionOffset; + if (valueBlock.isNull(valuesPosition)) { + continue; + } + if (timestampBlock.isNull(valuesPosition)) { + continue; + } + int groupId = groups.getInt(groupPosition); + int valueStart = valueBlock.getFirstValueIndex(valuesPosition); + int valueEnd = valueStart + valueBlock.getValueCount(valuesPosition); + for (int valueOffset = valueStart; valueOffset < valueEnd; valueOffset++) { + long valueValue = valueBlock.getLong(valueOffset); + int timestampStart = timestampBlock.getFirstValueIndex(valuesPosition); + int timestampEnd = timestampStart + timestampBlock.getValueCount(valuesPosition); + for (int timestampOffset = timestampStart; timestampOffset < timestampEnd; timestampOffset++) { + long timestampValue = timestampBlock.getLong(timestampOffset); + IrateLongAggregator.combine(state, groupId, valueValue, timestampValue); + } + } + } + } + + private void addRawInput(int positionOffset, IntVector groups, LongVector valueVector, + LongVector timestampVector) { + for (int groupPosition = 0; groupPosition < groups.getPositionCount(); groupPosition++) { + int valuesPosition = groupPosition + positionOffset; + int groupId = groups.getInt(groupPosition); + long valueValue = valueVector.getLong(valuesPosition); + long timestampValue = timestampVector.getLong(valuesPosition); + IrateLongAggregator.combine(state, groupId, valueValue, timestampValue); + } + } + + @Override + public void addIntermediateInput(int positionOffset, IntVector groups, Page page) { + state.enableGroupIdTracking(new SeenGroupIds.Empty()); + assert channels.size() == intermediateBlockCount(); + Block timestampsUncast = page.getBlock(channels.get(0)); + if (timestampsUncast.areAllValuesNull()) { + return; + } + LongBlock timestamps = (LongBlock) timestampsUncast; + Block valuesUncast = page.getBlock(channels.get(1)); + if (valuesUncast.areAllValuesNull()) { + return; + } + LongBlock values = (LongBlock) valuesUncast; + assert timestamps.getPositionCount() == values.getPositionCount(); + for (int groupPosition = 0; groupPosition < groups.getPositionCount(); groupPosition++) { + int groupId = groups.getInt(groupPosition); + int valuesPosition = groupPosition + positionOffset; + IrateLongAggregator.combineIntermediate(state, groupId, timestamps, values, valuesPosition); + } + } + + private void maybeEnableGroupIdTracking(SeenGroupIds seenGroupIds, LongBlock valueBlock, + LongBlock timestampBlock) { + if (valueBlock.mayHaveNulls()) { + state.enableGroupIdTracking(seenGroupIds); + } + if (timestampBlock.mayHaveNulls()) { + state.enableGroupIdTracking(seenGroupIds); + } + } + + @Override + public void selectedMayContainUnseenGroups(SeenGroupIds seenGroupIds) { + state.enableGroupIdTracking(seenGroupIds); + } + + @Override + public void evaluateIntermediate(Block[] blocks, int offset, IntVector selected) { + state.toIntermediate(blocks, offset, selected, driverContext); + } + + @Override + public void evaluateFinal(Block[] blocks, int offset, IntVector selected, + GroupingAggregatorEvaluationContext ctx) { + blocks[offset] = IrateLongAggregator.evaluateFinal(state, selected, ctx); + } + + @Override + public String toString() { + StringBuilder sb = new StringBuilder(); + sb.append(getClass().getSimpleName()).append("["); + sb.append("channels=").append(channels); + sb.append("]"); + return sb.toString(); + } + + @Override + public void close() { + state.close(); + } +} diff --git a/x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/aggregation/X-IrateAggregator.java.st b/x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/aggregation/X-IrateAggregator.java.st new file mode 100644 index 0000000000000..4300c9f6c6c73 --- /dev/null +++ b/x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/aggregation/X-IrateAggregator.java.st @@ -0,0 +1,215 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License + * 2.0; you may not use this file except in compliance with the Elastic License + * 2.0. + */ + +package org.elasticsearch.compute.aggregation; + +// begin generated imports +import org.apache.lucene.util.Accountable; +import org.apache.lucene.util.RamUsageEstimator; +import org.elasticsearch.common.breaker.CircuitBreaker; +import org.elasticsearch.common.util.BigArrays; +import org.elasticsearch.common.util.ObjectArray; +import org.elasticsearch.compute.ann.GroupingAggregator; +import org.elasticsearch.compute.ann.IntermediateState; +import org.elasticsearch.compute.data.Block; +import org.elasticsearch.compute.data.BlockFactory; +import org.elasticsearch.compute.data.DoubleBlock; +import org.elasticsearch.compute.data.DoubleVector; +import org.elasticsearch.compute.data.FloatBlock; +import org.elasticsearch.compute.data.IntBlock; +import org.elasticsearch.compute.data.IntVector; +import org.elasticsearch.compute.data.LongBlock; +import org.elasticsearch.compute.operator.DriverContext; +import org.elasticsearch.core.Releasable; +import org.elasticsearch.core.Releasables; +// end generated imports + +/** + * A rate grouping aggregation definition for $type$. + * This class is generated. Edit `X-IrateAggregator.java.st` instead. + */ +@GroupingAggregator( + value = { @IntermediateState(name = "timestamps", type = "LONG_BLOCK"), @IntermediateState(name = "values", type = "$TYPE$_BLOCK") } +) +public class Irate$Type$Aggregator { + public static $Type$IrateGroupingState initGrouping(DriverContext driverContext) { + return new $Type$IrateGroupingState(driverContext.bigArrays(), driverContext.breaker()); + } + + public static void combine($Type$IrateGroupingState current, int groupId, $type$ value, long timestamp) { + current.ensureCapacity(groupId); + current.append(groupId, timestamp, value); + } + + public static void combineIntermediate( + $Type$IrateGroupingState current, + int groupId, + LongBlock timestamps, + $Type$Block values, + int otherPosition + ) { + current.combine(groupId, timestamps, values, otherPosition); + } + + public static Block evaluateFinal($Type$IrateGroupingState state, IntVector selected, GroupingAggregatorEvaluationContext evalContext) { + return state.evaluateFinal(selected, evalContext); + } + + private static class $Type$IrateState { + static final long BASE_RAM_USAGE = RamUsageEstimator.sizeOfObject($Type$IrateState.class); + long lastTimestamp; + long secondLastTimestamp = -1; + $type$ lastValue; + $type$ secondLastValue; + boolean hasSecond; + + $Type$IrateState(long lastTimestamp, $type$ lastValue) { + this.lastTimestamp = lastTimestamp; + this.lastValue = lastValue; + this.hasSecond = false; + } + + long bytesUsed() { + return BASE_RAM_USAGE; + } + } + + public static final class $Type$IrateGroupingState implements Releasable, Accountable, GroupingAggregatorState { + private ObjectArray<$Type$IrateState> states; + private final BigArrays bigArrays; + private final CircuitBreaker breaker; + private long stateBytes; // for individual states + + $Type$IrateGroupingState(BigArrays bigArrays, CircuitBreaker breaker) { + this.bigArrays = bigArrays; + this.breaker = breaker; + this.states = bigArrays.newObjectArray(1); + } + + void ensureCapacity(int groupId) { + states = bigArrays.grow(states, groupId + 1); + } + + void adjustBreaker(long bytes) { + breaker.addEstimateBytesAndMaybeBreak(bytes, "<>"); + stateBytes += bytes; + assert stateBytes >= 0 : stateBytes; + } + + void append(int groupId, long timestamp, $type$ value) { + var state = states.get(groupId); + if (state == null) { + state = new $Type$IrateState(timestamp, value); + states.set(groupId, state); + adjustBreaker(state.bytesUsed()); + } else { + // We only need the last two values, but we need to keep them sorted by timestamp. + if (timestamp > state.lastTimestamp) { + // new timestamp is the most recent + state.secondLastTimestamp = state.lastTimestamp; + state.secondLastValue = state.lastValue; + state.lastTimestamp = timestamp; + state.lastValue = value; + state.hasSecond = true; + } else if (timestamp > state.secondLastTimestamp) { + // new timestamp is the second most recent + state.secondLastTimestamp = timestamp; + state.secondLastValue = value; + state.hasSecond = true; + } // else: ignore, too old + } + } + + void combine(int groupId, LongBlock timestamps, $Type$Block values, int otherPosition) { + final int valueCount = timestamps.getValueCount(otherPosition); + if (valueCount == 0) { + return; + } + final int firstTs = timestamps.getFirstValueIndex(otherPosition); + final int firstIndex = values.getFirstValueIndex(otherPosition); + ensureCapacity(groupId); + append(groupId, timestamps.getLong(firstTs), values.get$Type$(firstIndex)); + if (valueCount > 1) { + ensureCapacity(groupId); + append(groupId, timestamps.getLong(firstTs + 1), values.get$Type$(firstIndex + 1)); + } + } + + @Override + public long ramBytesUsed() { + return states.ramBytesUsed() + stateBytes; + } + + @Override + public void close() { + Releasables.close(states, () -> adjustBreaker(-stateBytes)); + } + + @Override + public void toIntermediate(Block[] blocks, int offset, IntVector selected, DriverContext driverContext) { + assert blocks.length >= offset + 2 : "blocks=" + blocks.length + ",offset=" + offset; + final BlockFactory blockFactory = driverContext.blockFactory(); + final int positionCount = selected.getPositionCount(); + try ( + LongBlock.Builder timestamps = blockFactory.newLongBlockBuilder(positionCount * 2); + $Type$Block.Builder values = blockFactory.new$Type$BlockBuilder(positionCount * 2); + ) { + for (int i = 0; i < positionCount; i++) { + final var groupId = selected.getInt(i); + final var state = groupId < states.size() ? states.get(groupId) : null; + if (state != null) { + timestamps.beginPositionEntry(); + timestamps.appendLong(state.lastTimestamp); + if (state.hasSecond) { + timestamps.appendLong(state.secondLastTimestamp); + } + timestamps.endPositionEntry(); + + values.beginPositionEntry(); + values.append$Type$(state.lastValue); + if (state.hasSecond) { + values.append$Type$(state.secondLastValue); + } + values.endPositionEntry(); + } else { + timestamps.appendNull(); + values.appendNull(); + } + } + blocks[offset] = timestamps.build(); + blocks[offset + 1] = values.build(); + } + } + + Block evaluateFinal(IntVector selected, GroupingAggregatorEvaluationContext evalContext) { + int positionCount = selected.getPositionCount(); + try (DoubleBlock.Builder rates = evalContext.blockFactory().newDoubleBlockBuilder(positionCount)) { + for (int p = 0; p < positionCount; p++) { + final var groupId = selected.getInt(p); + final var state = groupId < states.size() ? states.get(groupId) : null; + if (state == null || state.hasSecond == false) { + rates.appendNull(); + continue; + } + // When the last value is less than the previous one, we assume a reset + // and use the last value directly. + final double ydiff = state.lastValue >= state.secondLastValue + ? state.lastValue - state.secondLastValue + : state.lastValue; + final long xdiff = state.lastTimestamp - state.secondLastTimestamp; + rates.appendDouble(ydiff / xdiff * 1000); + } + return rates.build(); + } + } + + @Override + public void enableGroupIdTracking(SeenGroupIds seenGroupIds) { + // noop - we handle the null states inside `toIntermediate` and `evaluateFinal` + } + } +} diff --git a/x-pack/plugin/esql/qa/testFixtures/src/main/resources/k8s-timeseries-irate.csv-spec b/x-pack/plugin/esql/qa/testFixtures/src/main/resources/k8s-timeseries-irate.csv-spec new file mode 100644 index 0000000000000..2fb818caad04d --- /dev/null +++ b/x-pack/plugin/esql/qa/testFixtures/src/main/resources/k8s-timeseries-irate.csv-spec @@ -0,0 +1,212 @@ +irate_of_long_no_grouping +required_capability: metrics_command +TS k8s +| STATS irate_bytes_in=avg(irate(network.total_bytes_in)) BY time_bucket = bucket(@timestamp,1minute) +| SORT irate_bytes_in DESC, time_bucket DESC | LIMIT 10; + +irate_bytes_in:double | time_bucket:datetime +null | 2024-05-10T00:01:00.000Z +140.58333333333331 | 2024-05-10T00:02:00.000Z +134.5314713064713 | 2024-05-10T00:09:00.000Z +116.41911764705883 | 2024-05-10T00:22:00.000Z +112.83333333333333 | 2024-05-10T00:00:00.000Z +93.43529411764706 | 2024-05-10T00:14:00.000Z +88.6875 | 2024-05-10T00:11:00.000Z +78.83333333333333 | 2024-05-10T00:13:00.000Z +71.04464285714286 | 2024-05-10T00:15:00.000Z +51.58823529411765 | 2024-05-10T00:19:00.000Z + +; + +irate_of_long_grouping +required_capability: metrics_command +TS k8s +| STATS irate_bytes_in=avg(irate(network.total_bytes_in)) BY cluster, time_bucket = bucket(@timestamp,5minute) +| SORT irate_bytes_in DESC, time_bucket, cluster | LIMIT 10; + +irate_bytes_in:double | cluster:keyword | time_bucket:datetime +284.63440860215053 | qa | 2024-05-10T00:05:00.000Z +119.52228682170542 | prod | 2024-05-10T00:20:00.000Z +62.32120383036936 | prod | 2024-05-10T00:10:00.000Z +31.92871485943775 | prod | 2024-05-10T00:00:00.000Z +30.83898647284474 | staging | 2024-05-10T00:00:00.000Z +28.57226890756303 | qa | 2024-05-10T00:15:00.000Z +26.055199430199433 | staging | 2024-05-10T00:05:00.000Z +21.898989322941418 | staging | 2024-05-10T00:15:00.000Z +14.23272442880286 | qa | 2024-05-10T00:00:00.000Z +10.889987485115794 | staging | 2024-05-10T00:10:00.000Z + + +; + +irate_of_double_no_grouping +required_capability: metrics_command +TS k8s +| STATS irate_cost=sum(irate(network.total_cost)) BY time_bucket = bucket(@timestamp,1minute) +| SORT irate_cost DESC, time_bucket | LIMIT 10; + +irate_cost:double | time_bucket:datetime +null | 2024-05-10T00:01:00.000Z +7.836832264957264 | 2024-05-10T00:09:00.000Z +3.590324074074074 | 2024-05-10T00:17:00.000Z +2.6708333333333334 | 2024-05-10T00:02:00.000Z +2.2916666666666665 | 2024-05-10T00:08:00.000Z +2.265625 | 2024-05-10T00:11:00.000Z +2.2481617647058822 | 2024-05-10T00:22:00.000Z +2.020833333333333 | 2024-05-10T00:00:00.000Z +1.951470588235294 | 2024-05-10T00:14:00.000Z +1.8680555555555556 | 2024-05-10T00:13:00.000Z + +; + +irate_with_filtering +required_capability: metrics_command +TS k8s | WHERE pod == "one" +| STATS irate_bytes_in = sum(irate(network.total_bytes_in)) BY cluster, time_bucket = bucket(@timestamp, 10minute) +| SORT time_bucket, cluster | LIMIT 10; + +irate_bytes_in:double | cluster:keyword | time_bucket:datetime +0.07692307692307693 | prod | 2024-05-10T00:00:00.000Z +830.0 | qa | 2024-05-10T00:00:00.000Z +31.375 | staging | 2024-05-10T00:00:00.000Z +9.854545454545454 | prod | 2024-05-10T00:10:00.000Z +18.700000000000003 | qa | 2024-05-10T00:10:00.000Z +0.023952095808383235 | staging | 2024-05-10T00:10:00.000Z +232.75 | prod | 2024-05-10T00:20:00.000Z +3.2698412698412698 | qa | 2024-05-10T00:20:00.000Z +4.407407407407407 | staging | 2024-05-10T00:20:00.000Z + + +; + +eval_on_irate +required_capability: metrics_command +TS k8s +| STATS irate_bytes = avg(irate(network.total_bytes_in)) BY cluster, time_bucket = bucket(@timestamp, 10minute) +| EVAL irate_kb = irate_bytes / 1024.0 +| LIMIT 10 | SORT time_bucket, cluster ; + +irate_bytes:double | cluster:keyword | time_bucket:datetime | irate_kb:double +4.37482276552044 | prod | 2024-05-10T00:00:00.000Z | 0.004272287856953555 +284.63440860215053 | qa | 2024-05-10T00:00:00.000Z | 0.2779632896505376 +26.055199430199433 | staging | 2024-05-10T00:00:00.000Z | 0.025444530693554134 +9.893214497920377 | prod | 2024-05-10T00:10:00.000Z | 0.009661342283125368 +28.57226890756303 | qa | 2024-05-10T00:10:00.000Z | 0.02790260635504202 +21.898989322941418 | staging | 2024-05-10T00:10:00.000Z | 0.02138573176068498 +119.52228682170542 | prod | 2024-05-10T00:20:00.000Z | 0.1167209832243217 +4.428024083196497 | qa | 2024-05-10T00:20:00.000Z | 0.0043242422687465795 +1.5050835148874364 | staging | 2024-05-10T00:20:00.000Z | 0.0014698081200072621 + +; + +irate_of_aggregate_metric +required_capability: metrics_command +TS k8s-downsampled +| STATS sum_bytes = sum(irate(network.total_bytes_in)), + max_bytes = max(irate(network.total_bytes_in)), + min_bytes = min(irate(network.total_bytes_in)), + avg_bytes = avg(irate(network.total_bytes_in)) BY time_bucket = bucket(@timestamp, 30minute) +| SORT time_bucket | LIMIT 10; + +sum_bytes:double | max_bytes:double | min_bytes:double | avg_bytes:double | time_bucket:datetime +1.145 | 0.39 | 0.008333333333333333 | 0.12722222222222224 | 2024-05-09T23:30:00.000Z + +; + +irate_of_expression +required_capability: metrics_command +TS k8s +| STATS irate_bytes_in=avg(irate(network.total_bytes_in) + 10) BY time_bucket = bucket(@timestamp,1minute) +| SORT irate_bytes_in DESC, time_bucket DESC | LIMIT 10; + +irate_bytes_in:double | time_bucket:datetime +null | 2024-05-10T00:01:00.000Z +150.58333333333331 | 2024-05-10T00:02:00.000Z +144.5314713064713 | 2024-05-10T00:09:00.000Z +126.41911764705883 | 2024-05-10T00:22:00.000Z +122.83333333333333 | 2024-05-10T00:00:00.000Z +103.43529411764706 | 2024-05-10T00:14:00.000Z +98.6875 | 2024-05-10T00:11:00.000Z +88.83333333333333 | 2024-05-10T00:13:00.000Z +81.04464285714286 | 2024-05-10T00:15:00.000Z +61.58823529411765 | 2024-05-10T00:19:00.000Z + +; + +irate_combined_avg +required_capability: metrics_command +TS k8s +| STATS avg_irate_bytes = avg(irate(network.total_bytes_in)), avg_irate_cost = avg(irate(network.total_cost)) BY cluster, time_bucket = bucket(@timestamp, 10minute) +| EVAL ratio = avg_irate_bytes / avg_irate_cost +| SORT time_bucket, cluster | LIMIT 10; + +avg_irate_bytes:double | avg_irate_cost:double | cluster:keyword | time_bucket:datetime | ratio:double +4.37482276552044 | 0.12927101967799642 | prod | 2024-05-10T00:00:00.000Z | 33.84225464004049 +284.63440860215053 | 2.112455197132616 | qa | 2024-05-10T00:00:00.000Z | 134.74103923647934 +26.055199430199433 | 0.46879451566951563 | staging | 2024-05-10T00:00:00.000Z | 55.57914727946065 +9.893214497920377 | 0.18585561497326206 | prod | 2024-05-10T00:10:00.000Z | 53.23064626992117 +28.57226890756303 | 0.20140056022408961 | qa | 2024-05-10T00:10:00.000Z | 141.8678720445063 +21.898989322941418 | 0.2425173462598612 | staging | 2024-05-10T00:10:00.000Z | 90.29865146007454 +119.52228682170542 | 1.0260416666666667 | prod | 2024-05-10T00:20:00.000Z | 116.48872624247431 +4.428024083196497 | 0.0808531746031746 | qa | 2024-05-10T00:20:00.000Z | 54.76623651364503 +1.5050835148874364 | 0.12549927378358752 | staging | 2024-05-10T00:20:00.000Z | 11.992766726943941 +; + +irate_combined_sum +required_capability: metrics_command +TS k8s +| STATS sum_irate_bytes = sum(irate(network.total_bytes_in)), sum_irate_cost = sum(irate(network.total_cost)) BY cluster, time_bucket = bucket(@timestamp, 10minute) +| EVAL ratio = sum_irate_bytes / sum_irate_cost +| SORT time_bucket, cluster | LIMIT 10; + +sum_irate_bytes:double | sum_irate_cost:double | cluster:keyword | time_bucket:datetime | ratio:double +13.12446829656132 | 0.38781305903398927 | prod | 2024-05-10T00:00:00.000Z | 33.84225464004049 +853.9032258064516 | 6.337365591397849 | qa | 2024-05-10T00:00:00.000Z | 134.74103923647934 +78.1655982905983 | 1.4063835470085468 | staging | 2024-05-10T00:00:00.000Z | 55.57914727946065 +29.679643493761134 | 0.5575668449197861 | prod | 2024-05-10T00:10:00.000Z | 53.23064626992118 +85.71680672268909 | 0.6042016806722689 | qa | 2024-05-10T00:10:00.000Z | 141.86787204450627 +65.69696796882425 | 0.7275520387795836 | staging | 2024-05-10T00:10:00.000Z | 90.29865146007454 +239.04457364341084 | 2.0520833333333335 | prod | 2024-05-10T00:20:00.000Z | 116.48872624247431 +8.856048166392995 | 0.1617063492063492 | qa | 2024-05-10T00:20:00.000Z | 54.76623651364503 +4.515250544662309 | 0.37649782135076254 | staging | 2024-05-10T00:20:00.000Z | 11.992766726943941 +; + +irate_of_ratio +required_capability: metrics_command +TS k8s +| STATS irate_of_ratio = sum(irate(network.total_cost) / irate(network.total_bytes_in)) BY cluster, time_bucket = bucket(@timestamp, 10minute) +| SORT time_bucket, cluster | LIMIT 10; + +irate_of_ratio:double | cluster:keyword | time_bucket:datetime +0.7377812779572093 | prod | 2024-05-10T00:00:00.000Z +0.15560960316361233 | qa | 2024-05-10T00:00:00.000Z +0.05581954089819596 | staging | 2024-05-10T00:00:00.000Z +3.088611728967339 | prod | 2024-05-10T00:10:00.000Z +0.019280051363230983 | qa | 2024-05-10T00:10:00.000Z +0.17121614905482155 | staging | 2024-05-10T00:10:00.000Z +0.021697562872698986 | prod | 2024-05-10T00:20:00.000Z +0.04152807743018099 | qa | 2024-05-10T00:20:00.000Z +3.7694327731092434 | staging | 2024-05-10T00:20:00.000Z + +; + +irate_of_long_grouping_1min_nulls +required_capability: metrics_command +TS k8s +| STATS irate_bytes_in=avg(irate(network.total_bytes_in)) BY cluster, time_bucket = bucket(@timestamp,2minute) +| SORT irate_bytes_in NULLS FIRST, time_bucket, cluster | LIMIT 10; + +irate_bytes_in:double | cluster:keyword | time_bucket:datetime +null | qa | 2024-05-10T00:00:00.000Z +null | staging | 2024-05-10T00:00:00.000Z +null | prod | 2024-05-10T00:06:00.000Z +null | prod | 2024-05-10T00:10:00.000Z +null | staging | 2024-05-10T00:10:00.000Z +null | qa | 2024-05-10T00:22:00.000Z +0.08823529411764706 | staging | 2024-05-10T00:22:00.000Z +0.3 | prod | 2024-05-10T00:04:00.000Z +0.875 | qa | 2024-05-10T00:12:00.000Z +4.300925925925926 | qa | 2024-05-10T00:02:00.000Z + +; + diff --git a/x-pack/plugin/esql/src/internalClusterTest/java/org/elasticsearch/xpack/esql/action/RandomizedTimeSeriesIT.java b/x-pack/plugin/esql/src/internalClusterTest/java/org/elasticsearch/xpack/esql/action/RandomizedTimeSeriesIT.java index 4ae65eadf88e5..afc4d42634dd0 100644 --- a/x-pack/plugin/esql/src/internalClusterTest/java/org/elasticsearch/xpack/esql/action/RandomizedTimeSeriesIT.java +++ b/x-pack/plugin/esql/src/internalClusterTest/java/org/elasticsearch/xpack/esql/action/RandomizedTimeSeriesIT.java @@ -56,8 +56,8 @@ @SuppressWarnings("unchecked") @ESIntegTestCase.ClusterScope(maxNumDataNodes = 1) public class RandomizedTimeSeriesIT extends AbstractEsqlIntegTestCase { - private static final Long NUM_DOCS = 500L; - private static final Long TIME_RANGE_SECONDS = 900L; + private static final Long NUM_DOCS = 2000L; + private static final Long TIME_RANGE_SECONDS = 3600L; private static final String DATASTREAM_NAME = "tsit_ds"; private static final Integer SECONDS_IN_WINDOW = 60; private static final List> WINDOW_OPTIONS = List.of( @@ -71,6 +71,10 @@ public class RandomizedTimeSeriesIT extends AbstractEsqlIntegTestCase { Tuple.tuple("30 minutes", 1800), Tuple.tuple("1 hour", 3600) ); + private static final List> DELTA_AGG_OPTIONS = List.of( + Tuple.tuple("rate", DeltaAgg.RATE), + Tuple.tuple("irate", DeltaAgg.IRATE) + ); private List documents; private TSDataGenerationHelper dataGenerationHelper; @@ -211,6 +215,19 @@ static List getRowKey(List row, List groupingAttributes, return rowKey; } + static Integer getTimestampIndex(String esqlQuery) { + // first we get the stats command after the pipe + var statsIndex = esqlQuery.indexOf("| STATS"); + var nextPipe = esqlQuery.indexOf("|", statsIndex + 1); + + var statsCommand = esqlQuery.substring(statsIndex, nextPipe); + // then we count the number of commas before "BY " + var byTbucketIndex = statsCommand.indexOf("BY "); + var statsPart = statsCommand.substring(0, byTbucketIndex); + // the number of columns is the number of commas + 1 + return (int) statsPart.chars().filter(ch -> ch == ',').count() + 1; + } + @Override public EsqlQueryResponse run(EsqlQueryRequest request) { assumeTrue("time series available in snapshot builds only", Build.current().isSnapshot()); @@ -243,12 +260,18 @@ public int compareToFindingMax(RateRange o) { } } + enum DeltaAgg { + RATE, + IRATE + } + // A record that holds min, max, avg, count and sum of rates calculated from a timeseries. record RateStats(Long count, RateRange max, RateRange avg, RateRange min, RateRange sum) {} - static RateStats calculateRateAggregation( + static RateStats calculateDeltaAggregation( Collection>>> allTimeseries, - Integer secondsInWindow + Integer secondsInWindow, + DeltaAgg deltaAgg ) { List allRates = allTimeseries.stream().map(timeseries -> { if (timeseries.size() < 2) { @@ -258,6 +281,15 @@ static RateStats calculateRateAggregation( timeseries.sort((t1, t2) -> t1.v2().v1().compareTo(t2.v2().v1())); var firstTs = timeseries.getFirst().v2().v1(); var lastTs = timeseries.getLast().v2().v1(); + if (deltaAgg.equals(DeltaAgg.IRATE)) { + var lastVal = timeseries.getLast().v2().v2(); + var secondLastVal = timeseries.get(timeseries.size() - 2).v2().v2(); + var irate = (lastVal >= secondLastVal ? lastVal - secondLastVal : lastVal) / (lastTs.toEpochMilli() - timeseries.get( + timeseries.size() - 2 + ).v2().v1().toEpochMilli()) * 1000; + return new RateRange(irate * 0.999, irate * 1.001); // Add 0.1% tolerance + } + assert deltaAgg == DeltaAgg.RATE; Double lastValue = null; Double counterGrowth = 0.0; for (Tuple> point : timeseries) { @@ -365,7 +397,8 @@ void assertNoFailedWindows(List failedWindows, List> rows) * The test checks that the count, max, min, and avg values of the rate metric - and calculates * the same values from the documents in the group. */ - public void testRateGroupBySubset() { + public void testRateSomethingSomething() { + var deltaAgg = ESTestCase.randomFrom(DELTA_AGG_OPTIONS); var window = ESTestCase.randomFrom(WINDOW_OPTIONS); var windowSize = window.v2(); var windowStr = window.v1(); @@ -373,25 +406,25 @@ public void testRateGroupBySubset() { var dimensionsStr = dimensions.isEmpty() ? "" : ", " + dimensions.stream().map(d -> "attributes." + d).collect(Collectors.joining(", ")); - try (var resp = run(String.format(Locale.ROOT, """ + var query = String.format(Locale.ROOT, """ TS %s - | STATS count(rate(metrics.counterl_hdd.bytes.read)), - max(rate(metrics.counterl_hdd.bytes.read)), - avg(rate(metrics.counterl_hdd.bytes.read)), - min(rate(metrics.counterl_hdd.bytes.read)), - sum(rate(metrics.counterl_hdd.bytes.read)) + | STATS count((metrics.counterl_hdd.bytes.read)), + max((metrics.counterl_hdd.bytes.read)), + avg((metrics.counterl_hdd.bytes.read)), + min((metrics.counterl_hdd.bytes.read)), + sum((metrics.counterl_hdd.bytes.read)) BY tbucket=bucket(@timestamp, %s) %s | SORT tbucket - | LIMIT 1000 - """, DATASTREAM_NAME, windowStr, dimensionsStr))) { + """, DATASTREAM_NAME, windowStr, dimensionsStr).replaceAll("", deltaAgg.v1()); + try (var resp = run(query)) { List> rows = consumeRows(resp); List failedWindows = new ArrayList<>(); var groups = groupedRows(documents, dimensions, windowSize); for (List row : rows) { - var rowKey = getRowKey(row, dimensions, 5); + var rowKey = getRowKey(row, dimensions, getTimestampIndex(query)); var windowDataPoints = groups.get(rowKey); var docsPerTimeseries = groupByTimeseries(windowDataPoints, "counterl_hdd.bytes.read"); - var rateAgg = calculateRateAggregation(docsPerTimeseries.values(), windowSize); + var rateAgg = calculateDeltaAggregation(docsPerTimeseries.values(), windowSize, deltaAgg.v2()); try { assertThat(row.getFirst(), equalTo(rateAgg.count)); checkWithin((Double) row.get(1), rateAgg.max); @@ -422,7 +455,6 @@ public void testRateGroupByNothing() { min(rate(metrics.counterl_hdd.bytes.read)) BY tbucket=bucket(@timestamp, 1 minute) | SORT tbucket - | LIMIT 1000 """, DATASTREAM_NAME))) { List> rows = consumeRows(resp); List failedWindows = new ArrayList<>(); @@ -430,7 +462,7 @@ public void testRateGroupByNothing() { var windowStart = windowStart(row.get(4), SECONDS_IN_WINDOW); var windowDataPoints = groups.get(List.of(Long.toString(windowStart))); var docsPerTimeseries = groupByTimeseries(windowDataPoints, "counterl_hdd.bytes.read"); - var rateAgg = calculateRateAggregation(docsPerTimeseries.values(), SECONDS_IN_WINDOW); + var rateAgg = calculateDeltaAggregation(docsPerTimeseries.values(), SECONDS_IN_WINDOW, DeltaAgg.RATE); try { assertThat(row.getFirst(), equalTo(rateAgg.count)); checkWithin((Double) row.get(1), rateAgg.max); @@ -469,12 +501,12 @@ public void testGaugeGroupByRandomAndRandomAgg() { %s BY tbucket=bucket(@timestamp, %s) %s | SORT tbucket - | LIMIT 1000""", DATASTREAM_NAME, metricName, aggExpression, windowStr, dimensionsStr); + """, DATASTREAM_NAME, metricName, aggExpression, windowStr, dimensionsStr); try (EsqlQueryResponse resp = run(query)) { var groups = groupedRows(documents, dimensions, windowSize); List> rows = consumeRows(resp); for (List row : rows) { - var rowKey = getRowKey(row, dimensions, 1); + var rowKey = getRowKey(row, dimensions, getTimestampIndex(query)); var tsGroups = groupByTimeseries(groups.get(rowKey), metricName); Object expectedVal = aggregatePerTimeseries(tsGroups, selectedAggs.get(0), selectedAggs.get(1)); Double actualVal = switch (row.get(0)) { @@ -524,7 +556,7 @@ public void testGaugeGroupByRandomAndRandomAgg() { public void testGroupBySubset() { var dimensions = ESTestCase.randomNonEmptySubsetOf(dataGenerationHelper.attributesForMetrics); var dimensionsStr = dimensions.stream().map(d -> "attributes." + d).collect(Collectors.joining(", ")); - try (EsqlQueryResponse resp = run(String.format(Locale.ROOT, """ + var query = String.format(Locale.ROOT, """ TS %s | STATS max(max_over_time(metrics.gaugel_hdd.bytes.used)), @@ -535,11 +567,12 @@ public void testGroupBySubset() { count(count_over_time(metrics.gaugel_hdd.bytes.used)) BY tbucket=bucket(@timestamp, 1 minute), %s | SORT tbucket - | LIMIT 1000""", DATASTREAM_NAME, dimensionsStr))) { + """, DATASTREAM_NAME, dimensionsStr); + try (EsqlQueryResponse resp = run(query)) { var groups = groupedRows(documents, dimensions, 60); List> rows = consumeRows(resp); for (List row : rows) { - var rowKey = getRowKey(row, dimensions, 6); + var rowKey = getRowKey(row, dimensions, getTimestampIndex(query)); var tsGroups = groupByTimeseries(groups.get(rowKey), "gaugel_hdd.bytes.used"); Function toDouble = cell -> switch (cell) { case Long l -> l.doubleValue(); @@ -576,7 +609,7 @@ public void testGroupByNothing() { count(count_over_time(metrics.gaugel_hdd.bytes.used)) BY tbucket=bucket(@timestamp, 1 minute) | SORT tbucket - | LIMIT 1000""", DATASTREAM_NAME))) { + """, DATASTREAM_NAME))) { List> rows = consumeRows(resp); var groups = groupedRows(documents, List.of(), 60); for (List row : rows) { diff --git a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/expression/function/EsqlFunctionRegistry.java b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/expression/function/EsqlFunctionRegistry.java index ab570e4833fb4..4f6c87eb3ec77 100644 --- a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/expression/function/EsqlFunctionRegistry.java +++ b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/expression/function/EsqlFunctionRegistry.java @@ -27,6 +27,7 @@ import org.elasticsearch.xpack.esql.expression.function.aggregate.CountOverTime; import org.elasticsearch.xpack.esql.expression.function.aggregate.First; import org.elasticsearch.xpack.esql.expression.function.aggregate.FirstOverTime; +import org.elasticsearch.xpack.esql.expression.function.aggregate.Irate; import org.elasticsearch.xpack.esql.expression.function.aggregate.Last; import org.elasticsearch.xpack.esql.expression.function.aggregate.LastOverTime; import org.elasticsearch.xpack.esql.expression.function.aggregate.Max; @@ -505,6 +506,7 @@ private static FunctionDefinition[][] snapshotFunctions() { def(First.class, bi(First::new), "first"), def(Last.class, bi(Last::new), "last"), def(Rate.class, uni(Rate::new), "rate"), + def(Irate.class, uni(Irate::new), "irate"), def(MaxOverTime.class, uni(MaxOverTime::new), "max_over_time"), def(MinOverTime.class, uni(MinOverTime::new), "min_over_time"), def(SumOverTime.class, uni(SumOverTime::new), "sum_over_time"), diff --git a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/expression/function/aggregate/AggregateWritables.java b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/expression/function/aggregate/AggregateWritables.java index bc2ddc90591ef..fd799c6f47128 100644 --- a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/expression/function/aggregate/AggregateWritables.java +++ b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/expression/function/aggregate/AggregateWritables.java @@ -25,6 +25,7 @@ public static List getNamedWriteables() { Min.ENTRY, Percentile.ENTRY, Rate.ENTRY, + Irate.ENTRY, Sample.ENTRY, SpatialCentroid.ENTRY, SpatialExtent.ENTRY, diff --git a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/expression/function/aggregate/Irate.java b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/expression/function/aggregate/Irate.java new file mode 100644 index 0000000000000..203b0f0043f94 --- /dev/null +++ b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/expression/function/aggregate/Irate.java @@ -0,0 +1,139 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License + * 2.0; you may not use this file except in compliance with the Elastic License + * 2.0. + */ + +package org.elasticsearch.xpack.esql.expression.function.aggregate; + +import org.elasticsearch.common.io.stream.NamedWriteableRegistry; +import org.elasticsearch.common.io.stream.StreamInput; +import org.elasticsearch.compute.aggregation.AggregatorFunctionSupplier; +import org.elasticsearch.compute.aggregation.IrateDoubleAggregatorFunctionSupplier; +import org.elasticsearch.compute.aggregation.IrateIntAggregatorFunctionSupplier; +import org.elasticsearch.compute.aggregation.IrateLongAggregatorFunctionSupplier; +import org.elasticsearch.xpack.esql.EsqlIllegalArgumentException; +import org.elasticsearch.xpack.esql.core.expression.Expression; +import org.elasticsearch.xpack.esql.core.expression.Literal; +import org.elasticsearch.xpack.esql.core.expression.UnresolvedAttribute; +import org.elasticsearch.xpack.esql.core.tree.NodeInfo; +import org.elasticsearch.xpack.esql.core.tree.Source; +import org.elasticsearch.xpack.esql.core.type.DataType; +import org.elasticsearch.xpack.esql.expression.function.Example; +import org.elasticsearch.xpack.esql.expression.function.FunctionAppliesTo; +import org.elasticsearch.xpack.esql.expression.function.FunctionAppliesToLifecycle; +import org.elasticsearch.xpack.esql.expression.function.FunctionInfo; +import org.elasticsearch.xpack.esql.expression.function.FunctionType; +import org.elasticsearch.xpack.esql.expression.function.OptionalArgument; +import org.elasticsearch.xpack.esql.expression.function.Param; +import org.elasticsearch.xpack.esql.io.stream.PlanStreamInput; +import org.elasticsearch.xpack.esql.planner.ToAggregator; + +import java.io.IOException; +import java.util.List; + +import static org.elasticsearch.xpack.esql.core.expression.TypeResolutions.ParamOrdinal.FIRST; +import static org.elasticsearch.xpack.esql.core.expression.TypeResolutions.isType; + +public class Irate extends TimeSeriesAggregateFunction implements OptionalArgument, ToAggregator { + public static final NamedWriteableRegistry.Entry ENTRY = new NamedWriteableRegistry.Entry(Expression.class, "Irate", Irate::new); + + private final Expression timestamp; + + @FunctionInfo( + type = FunctionType.TIME_SERIES_AGGREGATE, + returnType = { "double" }, + description = "The irate of a counter field. irate is the per-second rate of increase between the last two data points (" + + "it ignores all but the last two data points in each time period). " + + "This function is very similar to rate, but is more responsive to recent changes in the rate of increase.", + appliesTo = { @FunctionAppliesTo(lifeCycle = FunctionAppliesToLifecycle.UNAVAILABLE) }, + note = "Available with the [TS](/reference/query-languages/esql/commands/source-commands.md#esql-ts) command in snapshot builds", + examples = { @Example(file = "k8s-timeseries", tag = "irate") } + ) + public Irate(Source source, @Param(name = "field", type = { "counter_long", "counter_integer", "counter_double" }) Expression field) { + this(source, field, new UnresolvedAttribute(source, "@timestamp")); + } + + public Irate( + Source source, + @Param(name = "field", type = { "counter_long", "counter_integer", "counter_double" }) Expression field, + Expression timestamp + ) { + this(source, field, Literal.TRUE, timestamp); + } + + // compatibility constructor used when reading from the stream + private Irate(Source source, Expression field, Expression filter, List children) { + this(source, field, filter, children.getFirst()); + } + + private Irate(Source source, Expression field, Expression filter, Expression timestamp) { + super(source, field, filter, List.of(timestamp)); + this.timestamp = timestamp; + } + + public Irate(StreamInput in) throws IOException { + this( + Source.readFrom((PlanStreamInput) in), + in.readNamedWriteable(Expression.class), + in.readNamedWriteable(Expression.class), + in.readNamedWriteableCollectionAsList(Expression.class) + ); + } + + @Override + public String getWriteableName() { + return ENTRY.name; + } + + @Override + protected NodeInfo info() { + return NodeInfo.create(this, Irate::new, field(), timestamp); + } + + @Override + public Irate replaceChildren(List newChildren) { + if (newChildren.size() != 3) { + assert false : "expected 3 children for field, filter, @timestamp; got " + newChildren; + throw new IllegalArgumentException("expected 3 children for field, filter, @timestamp; got " + newChildren); + } + return new Irate(source(), newChildren.get(0), newChildren.get(1), newChildren.get(2)); + } + + @Override + public Irate withFilter(Expression filter) { + return new Irate(source(), field(), filter, timestamp); + } + + @Override + public DataType dataType() { + return DataType.DOUBLE; + } + + @Override + protected TypeResolution resolveType() { + return isType(field(), dt -> DataType.isCounter(dt), sourceText(), FIRST, "counter_long", "counter_integer", "counter_double"); + } + + @Override + public AggregatorFunctionSupplier supplier() { + final DataType type = field().dataType(); + return switch (type) { + case COUNTER_LONG -> new IrateLongAggregatorFunctionSupplier(); + case COUNTER_INTEGER -> new IrateIntAggregatorFunctionSupplier(); + case COUNTER_DOUBLE -> new IrateDoubleAggregatorFunctionSupplier(); + default -> throw EsqlIllegalArgumentException.illegalDataType(type); + }; + } + + @Override + public Irate perTimeSeriesAggregation() { + return this; + } + + @Override + public String toString() { + return "irate(" + field() + ")"; + } +} diff --git a/x-pack/plugin/src/yamlRestTest/resources/rest-api-spec/test/esql/60_usage.yml b/x-pack/plugin/src/yamlRestTest/resources/rest-api-spec/test/esql/60_usage.yml index 90eb44b36c17d..605baef08a4e5 100644 --- a/x-pack/plugin/src/yamlRestTest/resources/rest-api-spec/test/esql/60_usage.yml +++ b/x-pack/plugin/src/yamlRestTest/resources/rest-api-spec/test/esql/60_usage.yml @@ -129,8 +129,7 @@ setup: - match: {esql.functions.coalesce: $functions_coalesce} - gt: {esql.functions.categorize: $functions_categorize} # Testing for the entire function set isn't feasible, so we just check that we return the correct count as an approximation. - - length: {esql.functions: 173} # check the "sister" test below for a likely update to the same esql.functions length check - + - length: {esql.functions: 174} # check the "sister" test below for a likely update to the same esql.functions length check --- "Basic ESQL usage output (telemetry) non-snapshot version": - requires: