diff --git a/x-pack/plugin/esql/compute/src/main/generated/org/elasticsearch/compute/aggregation/FirstValueBytesRefAggregatorFunction.java b/x-pack/plugin/esql/compute/src/main/generated/org/elasticsearch/compute/aggregation/FirstValueBytesRefAggregatorFunction.java new file mode 100644 index 0000000000000..aac146ecacc6c --- /dev/null +++ b/x-pack/plugin/esql/compute/src/main/generated/org/elasticsearch/compute/aggregation/FirstValueBytesRefAggregatorFunction.java @@ -0,0 +1,195 @@ +// Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one +// or more contributor license agreements. Licensed under the Elastic License +// 2.0; you may not use this file except in compliance with the Elastic License +// 2.0. +package org.elasticsearch.compute.aggregation; + +import java.lang.Integer; +import java.lang.Override; +import java.lang.String; +import java.lang.StringBuilder; +import java.util.List; +import org.apache.lucene.util.BytesRef; +import org.elasticsearch.compute.data.Block; +import org.elasticsearch.compute.data.BooleanBlock; +import org.elasticsearch.compute.data.BooleanVector; +import org.elasticsearch.compute.data.BytesRefBlock; +import org.elasticsearch.compute.data.BytesRefVector; +import org.elasticsearch.compute.data.ElementType; +import org.elasticsearch.compute.data.LongBlock; +import org.elasticsearch.compute.data.LongVector; +import org.elasticsearch.compute.data.Page; +import org.elasticsearch.compute.operator.DriverContext; + +/** + * {@link AggregatorFunction} implementation for {@link FirstValueBytesRefAggregator}. + * This class is generated. Edit {@code AggregatorImplementer} instead. + */ +public final class FirstValueBytesRefAggregatorFunction implements AggregatorFunction { + private static final List INTERMEDIATE_STATE_DESC = List.of( + new IntermediateStateDesc("value", ElementType.BYTES_REF), + new IntermediateStateDesc("timestamp", ElementType.LONG), + new IntermediateStateDesc("seen", ElementType.BOOLEAN) ); + + private final DriverContext driverContext; + + private final FirstValueBytesRefAggregator.FirstValueLongSingleState state; + + private final List channels; + + public FirstValueBytesRefAggregatorFunction(DriverContext driverContext, List channels, + FirstValueBytesRefAggregator.FirstValueLongSingleState state) { + this.driverContext = driverContext; + this.channels = channels; + this.state = state; + } + + public static FirstValueBytesRefAggregatorFunction create(DriverContext driverContext, + List channels) { + return new FirstValueBytesRefAggregatorFunction(driverContext, channels, FirstValueBytesRefAggregator.initSingle(driverContext)); + } + + public static List intermediateStateDesc() { + return INTERMEDIATE_STATE_DESC; + } + + @Override + public int intermediateBlockCount() { + return INTERMEDIATE_STATE_DESC.size(); + } + + @Override + public void addRawInput(Page page, BooleanVector mask) { + if (mask.allFalse()) { + // Entire page masked away + return; + } + if (mask.allTrue()) { + // No masking + BytesRefBlock block = page.getBlock(channels.get(0)); + BytesRefVector vector = block.asVector(); + LongBlock timestampsBlock = page.getBlock(channels.get(1)); + LongVector timestampsVector = timestampsBlock.asVector(); + if (timestampsVector == null) { + throw new IllegalStateException("expected @timestamp vector; but got a block"); + } + if (vector != null) { + addRawVector(vector, timestampsVector); + } else { + addRawBlock(block, timestampsVector); + } + return; + } + // Some positions masked away, others kept + BytesRefBlock block = page.getBlock(channels.get(0)); + BytesRefVector vector = block.asVector(); + LongBlock timestampsBlock = page.getBlock(channels.get(1)); + LongVector timestampsVector = timestampsBlock.asVector(); + if (timestampsVector == null) { + throw new IllegalStateException("expected @timestamp vector; but got a block"); + } + if (vector != null) { + addRawVector(vector, timestampsVector, mask); + } else { + addRawBlock(block, timestampsVector, mask); + } + } + + private void addRawVector(BytesRefVector vector, LongVector timestamps) { + BytesRef scratch = new BytesRef(); + for (int i = 0; i < vector.getPositionCount(); i++) { + FirstValueBytesRefAggregator.combine(state, timestamps.getLong(i), vector.getBytesRef(i, scratch)); + } + } + + private void addRawVector(BytesRefVector vector, LongVector timestamps, BooleanVector mask) { + BytesRef scratch = new BytesRef(); + for (int i = 0; i < vector.getPositionCount(); i++) { + if (mask.getBoolean(i) == false) { + continue; + } + FirstValueBytesRefAggregator.combine(state, timestamps.getLong(i), vector.getBytesRef(i, scratch)); + } + } + + private void addRawBlock(BytesRefBlock block, LongVector timestamps) { + BytesRef scratch = new BytesRef(); + for (int p = 0; p < block.getPositionCount(); p++) { + if (block.isNull(p)) { + continue; + } + int start = block.getFirstValueIndex(p); + int end = start + block.getValueCount(p); + for (int i = start; i < end; i++) { + FirstValueBytesRefAggregator.combine(state, timestamps.getLong(i), block.getBytesRef(i, scratch)); + } + } + } + + private void addRawBlock(BytesRefBlock block, LongVector timestamps, BooleanVector mask) { + BytesRef scratch = new BytesRef(); + for (int p = 0; p < block.getPositionCount(); p++) { + if (mask.getBoolean(p) == false) { + continue; + } + if (block.isNull(p)) { + continue; + } + int start = block.getFirstValueIndex(p); + int end = start + block.getValueCount(p); + for (int i = start; i < end; i++) { + FirstValueBytesRefAggregator.combine(state, timestamps.getLong(i), block.getBytesRef(i, scratch)); + } + } + } + + @Override + public void addIntermediateInput(Page page) { + assert channels.size() == intermediateBlockCount(); + assert page.getBlockCount() >= channels.get(0) + intermediateStateDesc().size(); + Block valueUncast = page.getBlock(channels.get(0)); + if (valueUncast.areAllValuesNull()) { + return; + } + BytesRefVector value = ((BytesRefBlock) valueUncast).asVector(); + assert value.getPositionCount() == 1; + Block timestampUncast = page.getBlock(channels.get(1)); + if (timestampUncast.areAllValuesNull()) { + return; + } + LongVector timestamp = ((LongBlock) timestampUncast).asVector(); + assert timestamp.getPositionCount() == 1; + Block seenUncast = page.getBlock(channels.get(2)); + if (seenUncast.areAllValuesNull()) { + return; + } + BooleanVector seen = ((BooleanBlock) seenUncast).asVector(); + assert seen.getPositionCount() == 1; + BytesRef scratch = new BytesRef(); + FirstValueBytesRefAggregator.combineIntermediate(state, value.getBytesRef(0, scratch), timestamp.getLong(0), seen.getBoolean(0)); + } + + @Override + public void evaluateIntermediate(Block[] blocks, int offset, DriverContext driverContext) { + state.toIntermediate(blocks, offset, driverContext); + } + + @Override + public void evaluateFinal(Block[] blocks, int offset, DriverContext driverContext) { + blocks[offset] = FirstValueBytesRefAggregator.evaluateFinal(state, driverContext); + } + + @Override + public String toString() { + StringBuilder sb = new StringBuilder(); + sb.append(getClass().getSimpleName()).append("["); + sb.append("channels=").append(channels); + sb.append("]"); + return sb.toString(); + } + + @Override + public void close() { + state.close(); + } +} diff --git a/x-pack/plugin/esql/compute/src/main/generated/org/elasticsearch/compute/aggregation/FirstValueBytesRefAggregatorFunctionSupplier.java b/x-pack/plugin/esql/compute/src/main/generated/org/elasticsearch/compute/aggregation/FirstValueBytesRefAggregatorFunctionSupplier.java new file mode 100644 index 0000000000000..70dfa538d02fe --- /dev/null +++ b/x-pack/plugin/esql/compute/src/main/generated/org/elasticsearch/compute/aggregation/FirstValueBytesRefAggregatorFunctionSupplier.java @@ -0,0 +1,47 @@ +// Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one +// or more contributor license agreements. Licensed under the Elastic License +// 2.0; you may not use this file except in compliance with the Elastic License +// 2.0. +package org.elasticsearch.compute.aggregation; + +import java.lang.Integer; +import java.lang.Override; +import java.lang.String; +import java.util.List; +import org.elasticsearch.compute.operator.DriverContext; + +/** + * {@link AggregatorFunctionSupplier} implementation for {@link FirstValueBytesRefAggregator}. + * This class is generated. Edit {@code AggregatorFunctionSupplierImplementer} instead. + */ +public final class FirstValueBytesRefAggregatorFunctionSupplier implements AggregatorFunctionSupplier { + public FirstValueBytesRefAggregatorFunctionSupplier() { + } + + @Override + public List nonGroupingIntermediateStateDesc() { + return FirstValueBytesRefAggregatorFunction.intermediateStateDesc(); + } + + @Override + public List groupingIntermediateStateDesc() { + return FirstValueBytesRefGroupingAggregatorFunction.intermediateStateDesc(); + } + + @Override + public FirstValueBytesRefAggregatorFunction aggregator(DriverContext driverContext, + List channels) { + return FirstValueBytesRefAggregatorFunction.create(driverContext, channels); + } + + @Override + public FirstValueBytesRefGroupingAggregatorFunction groupingAggregator( + DriverContext driverContext, List channels) { + return FirstValueBytesRefGroupingAggregatorFunction.create(channels, driverContext); + } + + @Override + public String describe() { + return "first_value of bytes"; + } +} diff --git a/x-pack/plugin/esql/compute/src/main/generated/org/elasticsearch/compute/aggregation/FirstValueBytesRefGroupingAggregatorFunction.java b/x-pack/plugin/esql/compute/src/main/generated/org/elasticsearch/compute/aggregation/FirstValueBytesRefGroupingAggregatorFunction.java new file mode 100644 index 0000000000000..aec4a39bfa2ce --- /dev/null +++ b/x-pack/plugin/esql/compute/src/main/generated/org/elasticsearch/compute/aggregation/FirstValueBytesRefGroupingAggregatorFunction.java @@ -0,0 +1,242 @@ +// Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one +// or more contributor license agreements. Licensed under the Elastic License +// 2.0; you may not use this file except in compliance with the Elastic License +// 2.0. +package org.elasticsearch.compute.aggregation; + +import java.lang.Integer; +import java.lang.Override; +import java.lang.String; +import java.lang.StringBuilder; +import java.util.List; +import org.apache.lucene.util.BytesRef; +import org.elasticsearch.compute.data.Block; +import org.elasticsearch.compute.data.BooleanBlock; +import org.elasticsearch.compute.data.BooleanVector; +import org.elasticsearch.compute.data.BytesRefBlock; +import org.elasticsearch.compute.data.BytesRefVector; +import org.elasticsearch.compute.data.ElementType; +import org.elasticsearch.compute.data.IntBlock; +import org.elasticsearch.compute.data.IntVector; +import org.elasticsearch.compute.data.LongBlock; +import org.elasticsearch.compute.data.LongVector; +import org.elasticsearch.compute.data.Page; +import org.elasticsearch.compute.operator.DriverContext; + +/** + * {@link GroupingAggregatorFunction} implementation for {@link FirstValueBytesRefAggregator}. + * This class is generated. Edit {@code GroupingAggregatorImplementer} instead. + */ +public final class FirstValueBytesRefGroupingAggregatorFunction implements GroupingAggregatorFunction { + private static final List INTERMEDIATE_STATE_DESC = List.of( + new IntermediateStateDesc("value", ElementType.BYTES_REF), + new IntermediateStateDesc("timestamp", ElementType.LONG), + new IntermediateStateDesc("seen", ElementType.BOOLEAN) ); + + private final FirstValueBytesRefAggregator.FirstValueLongGroupingState state; + + private final List channels; + + private final DriverContext driverContext; + + public FirstValueBytesRefGroupingAggregatorFunction(List channels, + FirstValueBytesRefAggregator.FirstValueLongGroupingState state, DriverContext driverContext) { + this.channels = channels; + this.state = state; + this.driverContext = driverContext; + } + + public static FirstValueBytesRefGroupingAggregatorFunction create(List channels, + DriverContext driverContext) { + return new FirstValueBytesRefGroupingAggregatorFunction(channels, FirstValueBytesRefAggregator.initGrouping(driverContext), driverContext); + } + + public static List intermediateStateDesc() { + return INTERMEDIATE_STATE_DESC; + } + + @Override + public int intermediateBlockCount() { + return INTERMEDIATE_STATE_DESC.size(); + } + + @Override + public GroupingAggregatorFunction.AddInput prepareProcessPage(SeenGroupIds seenGroupIds, + Page page) { + BytesRefBlock valuesBlock = page.getBlock(channels.get(0)); + BytesRefVector valuesVector = valuesBlock.asVector(); + LongBlock timestampsBlock = page.getBlock(channels.get(1)); + LongVector timestampsVector = timestampsBlock.asVector(); + if (timestampsVector == null) { + throw new IllegalStateException("expected @timestamp vector; but got a block"); + } + if (valuesVector == null) { + if (valuesBlock.mayHaveNulls()) { + state.enableGroupIdTracking(seenGroupIds); + } + return new GroupingAggregatorFunction.AddInput() { + @Override + public void add(int positionOffset, IntBlock groupIds) { + addRawInput(positionOffset, groupIds, valuesBlock, timestampsVector); + } + + @Override + public void add(int positionOffset, IntVector groupIds) { + addRawInput(positionOffset, groupIds, valuesBlock, timestampsVector); + } + + @Override + public void close() { + } + }; + } + return new GroupingAggregatorFunction.AddInput() { + @Override + public void add(int positionOffset, IntBlock groupIds) { + addRawInput(positionOffset, groupIds, valuesVector, timestampsVector); + } + + @Override + public void add(int positionOffset, IntVector groupIds) { + addRawInput(positionOffset, groupIds, valuesVector, timestampsVector); + } + + @Override + public void close() { + } + }; + } + + private void addRawInput(int positionOffset, IntVector groups, BytesRefBlock values, + LongVector timestamps) { + BytesRef scratch = new BytesRef(); + for (int groupPosition = 0; groupPosition < groups.getPositionCount(); groupPosition++) { + int groupId = groups.getInt(groupPosition); + if (values.isNull(groupPosition + positionOffset)) { + continue; + } + int valuesStart = values.getFirstValueIndex(groupPosition + positionOffset); + int valuesEnd = valuesStart + values.getValueCount(groupPosition + positionOffset); + for (int v = valuesStart; v < valuesEnd; v++) { + FirstValueBytesRefAggregator.combine(state, groupId, timestamps.getLong(v), values.getBytesRef(v, scratch)); + } + } + } + + private void addRawInput(int positionOffset, IntVector groups, BytesRefVector values, + LongVector timestamps) { + BytesRef scratch = new BytesRef(); + for (int groupPosition = 0; groupPosition < groups.getPositionCount(); groupPosition++) { + int groupId = groups.getInt(groupPosition); + var valuePosition = groupPosition + positionOffset; + FirstValueBytesRefAggregator.combine(state, groupId, timestamps.getLong(valuePosition), values.getBytesRef(valuePosition, scratch)); + } + } + + private void addRawInput(int positionOffset, IntBlock groups, BytesRefBlock values, + LongVector timestamps) { + BytesRef scratch = new BytesRef(); + for (int groupPosition = 0; groupPosition < groups.getPositionCount(); groupPosition++) { + if (groups.isNull(groupPosition)) { + continue; + } + int groupStart = groups.getFirstValueIndex(groupPosition); + int groupEnd = groupStart + groups.getValueCount(groupPosition); + for (int g = groupStart; g < groupEnd; g++) { + int groupId = groups.getInt(g); + if (values.isNull(groupPosition + positionOffset)) { + continue; + } + int valuesStart = values.getFirstValueIndex(groupPosition + positionOffset); + int valuesEnd = valuesStart + values.getValueCount(groupPosition + positionOffset); + for (int v = valuesStart; v < valuesEnd; v++) { + FirstValueBytesRefAggregator.combine(state, groupId, timestamps.getLong(v), values.getBytesRef(v, scratch)); + } + } + } + } + + private void addRawInput(int positionOffset, IntBlock groups, BytesRefVector values, + LongVector timestamps) { + BytesRef scratch = new BytesRef(); + for (int groupPosition = 0; groupPosition < groups.getPositionCount(); groupPosition++) { + if (groups.isNull(groupPosition)) { + continue; + } + int groupStart = groups.getFirstValueIndex(groupPosition); + int groupEnd = groupStart + groups.getValueCount(groupPosition); + for (int g = groupStart; g < groupEnd; g++) { + int groupId = groups.getInt(g); + var valuePosition = groupPosition + positionOffset; + FirstValueBytesRefAggregator.combine(state, groupId, timestamps.getLong(valuePosition), values.getBytesRef(valuePosition, scratch)); + } + } + } + + @Override + public void selectedMayContainUnseenGroups(SeenGroupIds seenGroupIds) { + state.enableGroupIdTracking(seenGroupIds); + } + + @Override + public void addIntermediateInput(int positionOffset, IntVector groups, Page page) { + state.enableGroupIdTracking(new SeenGroupIds.Empty()); + assert channels.size() == intermediateBlockCount(); + Block valueUncast = page.getBlock(channels.get(0)); + if (valueUncast.areAllValuesNull()) { + return; + } + BytesRefVector value = ((BytesRefBlock) valueUncast).asVector(); + Block timestampUncast = page.getBlock(channels.get(1)); + if (timestampUncast.areAllValuesNull()) { + return; + } + LongVector timestamp = ((LongBlock) timestampUncast).asVector(); + Block seenUncast = page.getBlock(channels.get(2)); + if (seenUncast.areAllValuesNull()) { + return; + } + BooleanVector seen = ((BooleanBlock) seenUncast).asVector(); + assert value.getPositionCount() == timestamp.getPositionCount() && value.getPositionCount() == seen.getPositionCount(); + BytesRef scratch = new BytesRef(); + for (int groupPosition = 0; groupPosition < groups.getPositionCount(); groupPosition++) { + int groupId = groups.getInt(groupPosition); + FirstValueBytesRefAggregator.combineIntermediate(state, groupId, value.getBytesRef(groupPosition + positionOffset, scratch), timestamp.getLong(groupPosition + positionOffset), seen.getBoolean(groupPosition + positionOffset)); + } + } + + @Override + public void addIntermediateRowInput(int groupId, GroupingAggregatorFunction input, int position) { + if (input.getClass() != getClass()) { + throw new IllegalArgumentException("expected " + getClass() + "; got " + input.getClass()); + } + FirstValueBytesRefAggregator.FirstValueLongGroupingState inState = ((FirstValueBytesRefGroupingAggregatorFunction) input).state; + state.enableGroupIdTracking(new SeenGroupIds.Empty()); + FirstValueBytesRefAggregator.combineStates(state, groupId, inState, position); + } + + @Override + public void evaluateIntermediate(Block[] blocks, int offset, IntVector selected) { + state.toIntermediate(blocks, offset, selected, driverContext); + } + + @Override + public void evaluateFinal(Block[] blocks, int offset, IntVector selected, + DriverContext driverContext) { + blocks[offset] = FirstValueBytesRefAggregator.evaluateFinal(state, selected, driverContext); + } + + @Override + public String toString() { + StringBuilder sb = new StringBuilder(); + sb.append(getClass().getSimpleName()).append("["); + sb.append("channels=").append(channels); + sb.append("]"); + return sb.toString(); + } + + @Override + public void close() { + state.close(); + } +} diff --git a/x-pack/plugin/esql/compute/src/main/generated/org/elasticsearch/compute/aggregation/FirstValueLongAggregatorFunction.java b/x-pack/plugin/esql/compute/src/main/generated/org/elasticsearch/compute/aggregation/FirstValueLongAggregatorFunction.java new file mode 100644 index 0000000000000..162bef97688b8 --- /dev/null +++ b/x-pack/plugin/esql/compute/src/main/generated/org/elasticsearch/compute/aggregation/FirstValueLongAggregatorFunction.java @@ -0,0 +1,187 @@ +// Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one +// or more contributor license agreements. Licensed under the Elastic License +// 2.0; you may not use this file except in compliance with the Elastic License +// 2.0. +package org.elasticsearch.compute.aggregation; + +import java.lang.Integer; +import java.lang.Override; +import java.lang.String; +import java.lang.StringBuilder; +import java.util.List; +import org.elasticsearch.compute.data.Block; +import org.elasticsearch.compute.data.BooleanBlock; +import org.elasticsearch.compute.data.BooleanVector; +import org.elasticsearch.compute.data.ElementType; +import org.elasticsearch.compute.data.LongBlock; +import org.elasticsearch.compute.data.LongVector; +import org.elasticsearch.compute.data.Page; +import org.elasticsearch.compute.operator.DriverContext; + +/** + * {@link AggregatorFunction} implementation for {@link FirstValueLongAggregator}. + * This class is generated. Edit {@code AggregatorImplementer} instead. + */ +public final class FirstValueLongAggregatorFunction implements AggregatorFunction { + private static final List INTERMEDIATE_STATE_DESC = List.of( + new IntermediateStateDesc("value", ElementType.LONG), + new IntermediateStateDesc("timestamp", ElementType.LONG), + new IntermediateStateDesc("seen", ElementType.BOOLEAN) ); + + private final DriverContext driverContext; + + private final FirstValueLongAggregator.FirstValueLongSingleState state; + + private final List channels; + + public FirstValueLongAggregatorFunction(DriverContext driverContext, List channels, + FirstValueLongAggregator.FirstValueLongSingleState state) { + this.driverContext = driverContext; + this.channels = channels; + this.state = state; + } + + public static FirstValueLongAggregatorFunction create(DriverContext driverContext, + List channels) { + return new FirstValueLongAggregatorFunction(driverContext, channels, FirstValueLongAggregator.initSingle()); + } + + public static List intermediateStateDesc() { + return INTERMEDIATE_STATE_DESC; + } + + @Override + public int intermediateBlockCount() { + return INTERMEDIATE_STATE_DESC.size(); + } + + @Override + public void addRawInput(Page page, BooleanVector mask) { + if (mask.allFalse()) { + // Entire page masked away + return; + } + if (mask.allTrue()) { + // No masking + LongBlock block = page.getBlock(channels.get(0)); + LongVector vector = block.asVector(); + LongBlock timestampsBlock = page.getBlock(channels.get(1)); + LongVector timestampsVector = timestampsBlock.asVector(); + if (timestampsVector == null) { + throw new IllegalStateException("expected @timestamp vector; but got a block"); + } + if (vector != null) { + addRawVector(vector, timestampsVector); + } else { + addRawBlock(block, timestampsVector); + } + return; + } + // Some positions masked away, others kept + LongBlock block = page.getBlock(channels.get(0)); + LongVector vector = block.asVector(); + LongBlock timestampsBlock = page.getBlock(channels.get(1)); + LongVector timestampsVector = timestampsBlock.asVector(); + if (timestampsVector == null) { + throw new IllegalStateException("expected @timestamp vector; but got a block"); + } + if (vector != null) { + addRawVector(vector, timestampsVector, mask); + } else { + addRawBlock(block, timestampsVector, mask); + } + } + + private void addRawVector(LongVector vector, LongVector timestamps) { + for (int i = 0; i < vector.getPositionCount(); i++) { + FirstValueLongAggregator.combine(state, timestamps.getLong(i), vector.getLong(i)); + } + } + + private void addRawVector(LongVector vector, LongVector timestamps, BooleanVector mask) { + for (int i = 0; i < vector.getPositionCount(); i++) { + if (mask.getBoolean(i) == false) { + continue; + } + FirstValueLongAggregator.combine(state, timestamps.getLong(i), vector.getLong(i)); + } + } + + private void addRawBlock(LongBlock block, LongVector timestamps) { + for (int p = 0; p < block.getPositionCount(); p++) { + if (block.isNull(p)) { + continue; + } + int start = block.getFirstValueIndex(p); + int end = start + block.getValueCount(p); + for (int i = start; i < end; i++) { + FirstValueLongAggregator.combine(state, timestamps.getLong(i), block.getLong(i)); + } + } + } + + private void addRawBlock(LongBlock block, LongVector timestamps, BooleanVector mask) { + for (int p = 0; p < block.getPositionCount(); p++) { + if (mask.getBoolean(p) == false) { + continue; + } + if (block.isNull(p)) { + continue; + } + int start = block.getFirstValueIndex(p); + int end = start + block.getValueCount(p); + for (int i = start; i < end; i++) { + FirstValueLongAggregator.combine(state, timestamps.getLong(i), block.getLong(i)); + } + } + } + + @Override + public void addIntermediateInput(Page page) { + assert channels.size() == intermediateBlockCount(); + assert page.getBlockCount() >= channels.get(0) + intermediateStateDesc().size(); + Block valueUncast = page.getBlock(channels.get(0)); + if (valueUncast.areAllValuesNull()) { + return; + } + LongVector value = ((LongBlock) valueUncast).asVector(); + assert value.getPositionCount() == 1; + Block timestampUncast = page.getBlock(channels.get(1)); + if (timestampUncast.areAllValuesNull()) { + return; + } + LongVector timestamp = ((LongBlock) timestampUncast).asVector(); + assert timestamp.getPositionCount() == 1; + Block seenUncast = page.getBlock(channels.get(2)); + if (seenUncast.areAllValuesNull()) { + return; + } + BooleanVector seen = ((BooleanBlock) seenUncast).asVector(); + assert seen.getPositionCount() == 1; + FirstValueLongAggregator.combineIntermediate(state, value.getLong(0), timestamp.getLong(0), seen.getBoolean(0)); + } + + @Override + public void evaluateIntermediate(Block[] blocks, int offset, DriverContext driverContext) { + state.toIntermediate(blocks, offset, driverContext); + } + + @Override + public void evaluateFinal(Block[] blocks, int offset, DriverContext driverContext) { + blocks[offset] = FirstValueLongAggregator.evaluateFinal(state, driverContext); + } + + @Override + public String toString() { + StringBuilder sb = new StringBuilder(); + sb.append(getClass().getSimpleName()).append("["); + sb.append("channels=").append(channels); + sb.append("]"); + return sb.toString(); + } + + @Override + public void close() { + state.close(); + } +} diff --git a/x-pack/plugin/esql/compute/src/main/generated/org/elasticsearch/compute/aggregation/FirstValueLongAggregatorFunctionSupplier.java b/x-pack/plugin/esql/compute/src/main/generated/org/elasticsearch/compute/aggregation/FirstValueLongAggregatorFunctionSupplier.java new file mode 100644 index 0000000000000..d8e09b0da7242 --- /dev/null +++ b/x-pack/plugin/esql/compute/src/main/generated/org/elasticsearch/compute/aggregation/FirstValueLongAggregatorFunctionSupplier.java @@ -0,0 +1,47 @@ +// Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one +// or more contributor license agreements. Licensed under the Elastic License +// 2.0; you may not use this file except in compliance with the Elastic License +// 2.0. +package org.elasticsearch.compute.aggregation; + +import java.lang.Integer; +import java.lang.Override; +import java.lang.String; +import java.util.List; +import org.elasticsearch.compute.operator.DriverContext; + +/** + * {@link AggregatorFunctionSupplier} implementation for {@link FirstValueLongAggregator}. + * This class is generated. Edit {@code AggregatorFunctionSupplierImplementer} instead. + */ +public final class FirstValueLongAggregatorFunctionSupplier implements AggregatorFunctionSupplier { + public FirstValueLongAggregatorFunctionSupplier() { + } + + @Override + public List nonGroupingIntermediateStateDesc() { + return FirstValueLongAggregatorFunction.intermediateStateDesc(); + } + + @Override + public List groupingIntermediateStateDesc() { + return FirstValueLongGroupingAggregatorFunction.intermediateStateDesc(); + } + + @Override + public FirstValueLongAggregatorFunction aggregator(DriverContext driverContext, + List channels) { + return FirstValueLongAggregatorFunction.create(driverContext, channels); + } + + @Override + public FirstValueLongGroupingAggregatorFunction groupingAggregator(DriverContext driverContext, + List channels) { + return FirstValueLongGroupingAggregatorFunction.create(channels, driverContext); + } + + @Override + public String describe() { + return "first_value of longs"; + } +} diff --git a/x-pack/plugin/esql/compute/src/main/generated/org/elasticsearch/compute/aggregation/FirstValueLongGroupingAggregatorFunction.java b/x-pack/plugin/esql/compute/src/main/generated/org/elasticsearch/compute/aggregation/FirstValueLongGroupingAggregatorFunction.java new file mode 100644 index 0000000000000..06a79902f1101 --- /dev/null +++ b/x-pack/plugin/esql/compute/src/main/generated/org/elasticsearch/compute/aggregation/FirstValueLongGroupingAggregatorFunction.java @@ -0,0 +1,234 @@ +// Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one +// or more contributor license agreements. Licensed under the Elastic License +// 2.0; you may not use this file except in compliance with the Elastic License +// 2.0. +package org.elasticsearch.compute.aggregation; + +import java.lang.Integer; +import java.lang.Override; +import java.lang.String; +import java.lang.StringBuilder; +import java.util.List; +import org.elasticsearch.compute.data.Block; +import org.elasticsearch.compute.data.BooleanBlock; +import org.elasticsearch.compute.data.BooleanVector; +import org.elasticsearch.compute.data.ElementType; +import org.elasticsearch.compute.data.IntBlock; +import org.elasticsearch.compute.data.IntVector; +import org.elasticsearch.compute.data.LongBlock; +import org.elasticsearch.compute.data.LongVector; +import org.elasticsearch.compute.data.Page; +import org.elasticsearch.compute.operator.DriverContext; + +/** + * {@link GroupingAggregatorFunction} implementation for {@link FirstValueLongAggregator}. + * This class is generated. Edit {@code GroupingAggregatorImplementer} instead. + */ +public final class FirstValueLongGroupingAggregatorFunction implements GroupingAggregatorFunction { + private static final List INTERMEDIATE_STATE_DESC = List.of( + new IntermediateStateDesc("value", ElementType.LONG), + new IntermediateStateDesc("timestamp", ElementType.LONG), + new IntermediateStateDesc("seen", ElementType.BOOLEAN) ); + + private final FirstValueLongAggregator.FirstValueLongGroupingState state; + + private final List channels; + + private final DriverContext driverContext; + + public FirstValueLongGroupingAggregatorFunction(List channels, + FirstValueLongAggregator.FirstValueLongGroupingState state, DriverContext driverContext) { + this.channels = channels; + this.state = state; + this.driverContext = driverContext; + } + + public static FirstValueLongGroupingAggregatorFunction create(List channels, + DriverContext driverContext) { + return new FirstValueLongGroupingAggregatorFunction(channels, FirstValueLongAggregator.initGrouping(driverContext), driverContext); + } + + public static List intermediateStateDesc() { + return INTERMEDIATE_STATE_DESC; + } + + @Override + public int intermediateBlockCount() { + return INTERMEDIATE_STATE_DESC.size(); + } + + @Override + public GroupingAggregatorFunction.AddInput prepareProcessPage(SeenGroupIds seenGroupIds, + Page page) { + LongBlock valuesBlock = page.getBlock(channels.get(0)); + LongVector valuesVector = valuesBlock.asVector(); + LongBlock timestampsBlock = page.getBlock(channels.get(1)); + LongVector timestampsVector = timestampsBlock.asVector(); + if (timestampsVector == null) { + throw new IllegalStateException("expected @timestamp vector; but got a block"); + } + if (valuesVector == null) { + if (valuesBlock.mayHaveNulls()) { + state.enableGroupIdTracking(seenGroupIds); + } + return new GroupingAggregatorFunction.AddInput() { + @Override + public void add(int positionOffset, IntBlock groupIds) { + addRawInput(positionOffset, groupIds, valuesBlock, timestampsVector); + } + + @Override + public void add(int positionOffset, IntVector groupIds) { + addRawInput(positionOffset, groupIds, valuesBlock, timestampsVector); + } + + @Override + public void close() { + } + }; + } + return new GroupingAggregatorFunction.AddInput() { + @Override + public void add(int positionOffset, IntBlock groupIds) { + addRawInput(positionOffset, groupIds, valuesVector, timestampsVector); + } + + @Override + public void add(int positionOffset, IntVector groupIds) { + addRawInput(positionOffset, groupIds, valuesVector, timestampsVector); + } + + @Override + public void close() { + } + }; + } + + private void addRawInput(int positionOffset, IntVector groups, LongBlock values, + LongVector timestamps) { + for (int groupPosition = 0; groupPosition < groups.getPositionCount(); groupPosition++) { + int groupId = groups.getInt(groupPosition); + if (values.isNull(groupPosition + positionOffset)) { + continue; + } + int valuesStart = values.getFirstValueIndex(groupPosition + positionOffset); + int valuesEnd = valuesStart + values.getValueCount(groupPosition + positionOffset); + for (int v = valuesStart; v < valuesEnd; v++) { + FirstValueLongAggregator.combine(state, groupId, timestamps.getLong(v), values.getLong(v)); + } + } + } + + private void addRawInput(int positionOffset, IntVector groups, LongVector values, + LongVector timestamps) { + for (int groupPosition = 0; groupPosition < groups.getPositionCount(); groupPosition++) { + int groupId = groups.getInt(groupPosition); + var valuePosition = groupPosition + positionOffset; + FirstValueLongAggregator.combine(state, groupId, timestamps.getLong(valuePosition), values.getLong(valuePosition)); + } + } + + private void addRawInput(int positionOffset, IntBlock groups, LongBlock values, + LongVector timestamps) { + for (int groupPosition = 0; groupPosition < groups.getPositionCount(); groupPosition++) { + if (groups.isNull(groupPosition)) { + continue; + } + int groupStart = groups.getFirstValueIndex(groupPosition); + int groupEnd = groupStart + groups.getValueCount(groupPosition); + for (int g = groupStart; g < groupEnd; g++) { + int groupId = groups.getInt(g); + if (values.isNull(groupPosition + positionOffset)) { + continue; + } + int valuesStart = values.getFirstValueIndex(groupPosition + positionOffset); + int valuesEnd = valuesStart + values.getValueCount(groupPosition + positionOffset); + for (int v = valuesStart; v < valuesEnd; v++) { + FirstValueLongAggregator.combine(state, groupId, timestamps.getLong(v), values.getLong(v)); + } + } + } + } + + private void addRawInput(int positionOffset, IntBlock groups, LongVector values, + LongVector timestamps) { + for (int groupPosition = 0; groupPosition < groups.getPositionCount(); groupPosition++) { + if (groups.isNull(groupPosition)) { + continue; + } + int groupStart = groups.getFirstValueIndex(groupPosition); + int groupEnd = groupStart + groups.getValueCount(groupPosition); + for (int g = groupStart; g < groupEnd; g++) { + int groupId = groups.getInt(g); + var valuePosition = groupPosition + positionOffset; + FirstValueLongAggregator.combine(state, groupId, timestamps.getLong(valuePosition), values.getLong(valuePosition)); + } + } + } + + @Override + public void selectedMayContainUnseenGroups(SeenGroupIds seenGroupIds) { + state.enableGroupIdTracking(seenGroupIds); + } + + @Override + public void addIntermediateInput(int positionOffset, IntVector groups, Page page) { + state.enableGroupIdTracking(new SeenGroupIds.Empty()); + assert channels.size() == intermediateBlockCount(); + Block valueUncast = page.getBlock(channels.get(0)); + if (valueUncast.areAllValuesNull()) { + return; + } + LongVector value = ((LongBlock) valueUncast).asVector(); + Block timestampUncast = page.getBlock(channels.get(1)); + if (timestampUncast.areAllValuesNull()) { + return; + } + LongVector timestamp = ((LongBlock) timestampUncast).asVector(); + Block seenUncast = page.getBlock(channels.get(2)); + if (seenUncast.areAllValuesNull()) { + return; + } + BooleanVector seen = ((BooleanBlock) seenUncast).asVector(); + assert value.getPositionCount() == timestamp.getPositionCount() && value.getPositionCount() == seen.getPositionCount(); + for (int groupPosition = 0; groupPosition < groups.getPositionCount(); groupPosition++) { + int groupId = groups.getInt(groupPosition); + FirstValueLongAggregator.combineIntermediate(state, groupId, value.getLong(groupPosition + positionOffset), timestamp.getLong(groupPosition + positionOffset), seen.getBoolean(groupPosition + positionOffset)); + } + } + + @Override + public void addIntermediateRowInput(int groupId, GroupingAggregatorFunction input, int position) { + if (input.getClass() != getClass()) { + throw new IllegalArgumentException("expected " + getClass() + "; got " + input.getClass()); + } + FirstValueLongAggregator.FirstValueLongGroupingState inState = ((FirstValueLongGroupingAggregatorFunction) input).state; + state.enableGroupIdTracking(new SeenGroupIds.Empty()); + FirstValueLongAggregator.combineStates(state, groupId, inState, position); + } + + @Override + public void evaluateIntermediate(Block[] blocks, int offset, IntVector selected) { + state.toIntermediate(blocks, offset, selected, driverContext); + } + + @Override + public void evaluateFinal(Block[] blocks, int offset, IntVector selected, + DriverContext driverContext) { + blocks[offset] = FirstValueLongAggregator.evaluateFinal(state, selected, driverContext); + } + + @Override + public String toString() { + StringBuilder sb = new StringBuilder(); + sb.append(getClass().getSimpleName()).append("["); + sb.append("channels=").append(channels); + sb.append("]"); + return sb.toString(); + } + + @Override + public void close() { + state.close(); + } +} diff --git a/x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/aggregation/FirstValueBytesRefAggregator.java b/x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/aggregation/FirstValueBytesRefAggregator.java new file mode 100644 index 0000000000000..9e2d3f9e6e8a1 --- /dev/null +++ b/x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/aggregation/FirstValueBytesRefAggregator.java @@ -0,0 +1,242 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License + * 2.0; you may not use this file except in compliance with the Elastic License + * 2.0. + */ + +package org.elasticsearch.compute.aggregation; + +import org.apache.lucene.util.BytesRef; +import org.elasticsearch.common.breaker.CircuitBreaker; +import org.elasticsearch.common.util.BigArrays; +import org.elasticsearch.common.util.BitArray; +import org.elasticsearch.common.util.LongArray; +import org.elasticsearch.common.util.ObjectArray; +import org.elasticsearch.compute.ann.Aggregator; +import org.elasticsearch.compute.ann.GroupingAggregator; +import org.elasticsearch.compute.ann.IntermediateState; +import org.elasticsearch.compute.data.Block; +import org.elasticsearch.compute.data.IntVector; +import org.elasticsearch.compute.operator.BreakingBytesRefBuilder; +import org.elasticsearch.compute.operator.DriverContext; +import org.elasticsearch.core.Releasables; + +@Aggregator( + value = { + @IntermediateState(name = "value", type = "BYTES_REF"), + @IntermediateState(name = "timestamp", type = "LONG"), + @IntermediateState(name = "seen", type = "BOOLEAN") }, + includeTimestamps = true +) +@GroupingAggregator(includeTimestamps = true) +public class FirstValueBytesRefAggregator { + + // single + + public static FirstValueLongSingleState initSingle(DriverContext driverContext) { + return new FirstValueLongSingleState(driverContext.breaker()); + } + + public static void combine(FirstValueLongSingleState state, long timestamp, BytesRef value) { + state.add(value, timestamp); + } + + public static void combineIntermediate(FirstValueLongSingleState current, BytesRef value, long timestamp, boolean seen) { + if (seen) { + current.add(value, timestamp); + } + } + + public static Block evaluateFinal(FirstValueLongSingleState state, DriverContext driverContext) { + return state.toFinal(driverContext); + } + + // grouping + + public static FirstValueLongGroupingState initGrouping(DriverContext driverContext) { + return new FirstValueLongGroupingState(driverContext.bigArrays(), driverContext.breaker()); + } + + public static void combine(FirstValueLongGroupingState state, int groupId, long timestamp, BytesRef value) { + state.add(groupId, value, timestamp); + } + + public static void combineIntermediate(FirstValueLongGroupingState current, int groupId, BytesRef value, long timestamp, boolean seen) { + if (seen) { + current.add(groupId, value, timestamp); + } + } + + public static void combineStates( + FirstValueLongGroupingState state, + int groupId, + FirstValueLongGroupingState otherState, + int otherGroupId + ) { + if (otherState.hasValue(otherGroupId)) { + state.add(groupId, otherState.valueState.get(otherGroupId).bytesRefView(), otherState.timestampState.get(otherGroupId)); + } + } + + public static Block evaluateFinal(FirstValueLongGroupingState state, IntVector selected, DriverContext driverContext) { + return state.toFinal(driverContext, selected); + } + + public static class FirstValueLongSingleState implements AggregatorState { + + private final BreakingBytesRefBuilder value; + private long timestamp = Long.MAX_VALUE; + private boolean seen = false; + + public FirstValueLongSingleState(CircuitBreaker breaker) { + this.value = new BreakingBytesRefBuilder(breaker, "first_value_bytes_ref_aggregator"); + } + + public void add(BytesRef value, long timestamp) { + if (seen == false || timestamp < this.timestamp) { + this.seen = true; + this.value.grow(value.length); + this.value.setLength(value.length); + System.arraycopy(value.bytes, value.offset, this.value.bytes(), 0, value.length); + this.timestamp = timestamp; + } + } + + @Override + public void toIntermediate(Block[] blocks, int offset, DriverContext driverContext) { + blocks[offset] = driverContext.blockFactory().newConstantBytesRefBlockWith(value.bytesRefView(), 1); + blocks[offset + 1] = driverContext.blockFactory().newConstantLongBlockWith(timestamp, 1); + blocks[offset + 2] = driverContext.blockFactory().newConstantBooleanBlockWith(seen, 1); + } + + public Block toFinal(DriverContext driverContext) { + return seen + ? driverContext.blockFactory().newConstantBytesRefBlockWith(value.bytesRefView(), 1) + : driverContext.blockFactory().newConstantNullBlock(1); + } + + @Override + public void close() { + Releasables.close(value); + } + } + + public static class FirstValueLongGroupingState implements GroupingAggregatorState { + + private final BigArrays bigArrays; + private final CircuitBreaker breaker; + + private ObjectArray valueState; + private LongArray timestampState; + private BitArray seen; + + public FirstValueLongGroupingState(BigArrays bigArrays, CircuitBreaker breaker) { + this.bigArrays = bigArrays; + this.breaker = breaker; + valueState = bigArrays.newObjectArray(1); + timestampState = bigArrays.newLongArray(1, false); + seen = null; + } + + public void add(int groupId, BytesRef value, long timestamp) { + if (hasValue(groupId) == false || timestamp < getTimestamp(groupId)) { + ensureCapacity(groupId); + var currentBuilder = valueState.get(groupId); + if (currentBuilder == null) { + currentBuilder = new BreakingBytesRefBuilder(breaker, "values", value.length); + valueState.set(groupId, currentBuilder); + } + currentBuilder.copyBytes(value); + timestampState.set(groupId, timestamp); + if (seen != null) { + seen.set(groupId); + } + } + } + + @Override + public void enableGroupIdTracking(SeenGroupIds seen) { + if (this.seen == null) { + this.seen = seen.seenGroupIds(bigArrays); + } + } + + public boolean hasValue(int groupId) { + return groupId < valueState.size() && valueState.get(groupId) != null; + } + + public long getTimestamp(int groupId) { + return groupId < timestampState.size() ? timestampState.get(groupId) : Long.MAX_VALUE; + } + + private void ensureCapacity(int groupId) { + if (groupId >= valueState.size()) { + valueState = bigArrays.grow(valueState, groupId + 1); + } + if (groupId >= timestampState.size()) { + long prevSize = timestampState.size(); + timestampState = bigArrays.grow(timestampState, groupId + 1); + timestampState.fill(prevSize, timestampState.size(), Long.MAX_VALUE); + } + } + + @Override + public void toIntermediate(Block[] blocks, int offset, IntVector selected, DriverContext driverContext) { + assert blocks.length >= offset + 3; + try ( + var valuesBuilder = driverContext.blockFactory().newBytesRefVectorBuilder(selected.getPositionCount()); + var timestampBuilder = driverContext.blockFactory().newLongBlockBuilder(selected.getPositionCount()); + var hasValueBuilder = driverContext.blockFactory().newBooleanVectorFixedBuilder(selected.getPositionCount()) + ) { + var emptyBytesRef = new BytesRef(); + for (int i = 0; i < selected.getPositionCount(); i++) { + int group = selected.getInt(i); + if (hasValue(group)) { + valuesBuilder.appendBytesRef(valueState.get(group).bytesRefView()); + timestampBuilder.appendLong(timestampState.get(group)); + } else { + valuesBuilder.appendBytesRef(emptyBytesRef); + timestampBuilder.appendNull(); + } + hasValueBuilder.appendBoolean(i, hasValue(group)); + } + blocks[offset] = valuesBuilder.build().asBlock(); + blocks[offset + 1] = timestampBuilder.build(); + blocks[offset + 2] = hasValueBuilder.build().asBlock(); + } + } + + public Block toFinal(DriverContext driverContext, IntVector selected) { + if (seen != null) { + try (var builder = driverContext.blockFactory().newBytesRefBlockBuilder(selected.getPositionCount())) { + for (int i = 0; i < selected.getPositionCount(); i++) { + int group = selected.getInt(i); + if (seen.get(group)) { + builder.appendBytesRef(valueState.get(group).bytesRefView()); + } else { + builder.appendNull(); + } + } + return builder.build(); + } + } else { + try (var builder = driverContext.blockFactory().newBytesRefVectorBuilder(selected.getPositionCount())) { + for (int i = 0; i < selected.getPositionCount(); i++) { + int group = selected.getInt(i); + builder.appendBytesRef(valueState.get(group).bytesRefView()); + } + return builder.build().asBlock(); + } + } + } + + @Override + public void close() { + for (int i = 0; i < valueState.size(); i++) { + Releasables.closeWhileHandlingException(valueState.get(i)); + } + Releasables.close(valueState, timestampState, seen); + } + } +} diff --git a/x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/aggregation/FirstValueLongAggregator.java b/x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/aggregation/FirstValueLongAggregator.java new file mode 100644 index 0000000000000..13d7474b99987 --- /dev/null +++ b/x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/aggregation/FirstValueLongAggregator.java @@ -0,0 +1,224 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License + * 2.0; you may not use this file except in compliance with the Elastic License + * 2.0. + */ + +package org.elasticsearch.compute.aggregation; + +import org.elasticsearch.common.breaker.CircuitBreaker; +import org.elasticsearch.common.util.BigArrays; +import org.elasticsearch.common.util.BitArray; +import org.elasticsearch.common.util.LongArray; +import org.elasticsearch.compute.ann.Aggregator; +import org.elasticsearch.compute.ann.GroupingAggregator; +import org.elasticsearch.compute.ann.IntermediateState; +import org.elasticsearch.compute.data.Block; +import org.elasticsearch.compute.data.IntVector; +import org.elasticsearch.compute.operator.DriverContext; +import org.elasticsearch.core.Releasables; + +@Aggregator( + value = { + @IntermediateState(name = "value", type = "LONG"), + @IntermediateState(name = "timestamp", type = "LONG"), + @IntermediateState(name = "seen", type = "BOOLEAN") }, + includeTimestamps = true +) +@GroupingAggregator(includeTimestamps = true) +public class FirstValueLongAggregator { + + // single + + public static FirstValueLongSingleState initSingle() { + return new FirstValueLongSingleState(); + } + + public static void combine(FirstValueLongSingleState state, long timestamp, long value) { + state.add(value, timestamp); + } + + public static void combineIntermediate(FirstValueLongSingleState current, long value, long timestamp, boolean seen) { + if (seen) { + current.add(value, timestamp); + } + } + + public static Block evaluateFinal(FirstValueLongSingleState state, DriverContext driverContext) { + return state.toFinal(driverContext); + } + + // grouping + + public static FirstValueLongGroupingState initGrouping(DriverContext driverContext) { + return new FirstValueLongGroupingState(driverContext.bigArrays(), driverContext.breaker()); + } + + public static void combine(FirstValueLongGroupingState state, int groupId, long timestamp, long value) { + state.add(groupId, value, timestamp); + } + + public static void combineIntermediate(FirstValueLongGroupingState current, int groupId, long value, long timestamp, boolean seen) { + if (seen) { + current.add(groupId, value, timestamp); + } + } + + public static void combineStates( + FirstValueLongGroupingState state, + int groupId, + FirstValueLongGroupingState otherState, + int otherGroupId + ) { + if (otherState.hasValue(otherGroupId)) { + state.add(groupId, otherState.valueState.get(otherGroupId), otherState.timestampState.get(otherGroupId)); + } + } + + public static Block evaluateFinal(FirstValueLongGroupingState state, IntVector selected, DriverContext driverContext) { + return state.toFinal(driverContext, selected); + } + + public static class FirstValueLongSingleState implements AggregatorState { + + private long value = 0; + private long timestamp = Long.MAX_VALUE; + private boolean seen = false; + + public void add(long value, long timestamp) { + if (seen == false || timestamp < this.timestamp) { + this.seen = true; + this.value = value; + this.timestamp = timestamp; + } + } + + @Override + public void toIntermediate(Block[] blocks, int offset, DriverContext driverContext) { + blocks[offset] = driverContext.blockFactory().newConstantLongBlockWith(value, 1); + blocks[offset + 1] = driverContext.blockFactory().newConstantLongBlockWith(timestamp, 1); + blocks[offset + 2] = driverContext.blockFactory().newConstantBooleanBlockWith(seen, 1); + } + + public Block toFinal(DriverContext driverContext) { + return seen + ? driverContext.blockFactory().newConstantLongBlockWith(value, 1) + : driverContext.blockFactory().newConstantNullBlock(1); + } + + @Override + public void close() {} + } + + public static class FirstValueLongGroupingState implements GroupingAggregatorState { + + private final BigArrays bigArrays; + private final CircuitBreaker breaker; + + private LongArray valueState; + private LongArray timestampState; + private BitArray seen; + + public FirstValueLongGroupingState(BigArrays bigArrays, CircuitBreaker breaker) { + this.bigArrays = bigArrays; + this.breaker = breaker; + valueState = bigArrays.newLongArray(1, false); + timestampState = bigArrays.newLongArray(1, false); + seen = null; + } + + public void add(int groupId, long value, long timestamp) { + if (hasValue(groupId) == false || timestamp < getTimestamp(groupId)) { + ensureCapacity(groupId); + valueState.set(groupId, value); + timestampState.set(groupId, timestamp); + if (seen != null) { + seen.set(groupId); + } + } + } + + @Override + public void enableGroupIdTracking(SeenGroupIds seen) { + if (this.seen == null) { + this.seen = seen.seenGroupIds(bigArrays); + } + } + + public boolean hasValue(int groupId) { + return groupId < valueState.size() && valueState.get(groupId) != Long.MAX_VALUE; + } + + public long getTimestamp(int groupId) { + return groupId < timestampState.size() ? timestampState.get(groupId) : Long.MAX_VALUE; + } + + private void ensureCapacity(int groupId) { + if (groupId >= valueState.size()) { + long prevSize1 = valueState.size(); + valueState = bigArrays.grow(valueState, groupId + 1); + valueState.fill(prevSize1, valueState.size(), Long.MAX_VALUE); + } + if (groupId >= timestampState.size()) { + long prevSize2 = timestampState.size(); + timestampState = bigArrays.grow(timestampState, groupId + 1); + timestampState.fill(prevSize2, timestampState.size(), Long.MAX_VALUE); + } + } + + @Override + public void toIntermediate(Block[] blocks, int offset, IntVector selected, DriverContext driverContext) { + assert blocks.length >= offset + 3; + try ( + var valuesBuilder = driverContext.blockFactory().newLongBlockBuilder(selected.getPositionCount()); + var timestampBuilder = driverContext.blockFactory().newLongBlockBuilder(selected.getPositionCount()); + var hasValueBuilder = driverContext.blockFactory().newBooleanVectorFixedBuilder(selected.getPositionCount()) + ) { + for (int i = 0; i < selected.getPositionCount(); i++) { + int group = selected.getInt(i); + if (hasValue(group)) { + valuesBuilder.appendLong(valueState.get(group)); + timestampBuilder.appendLong(timestampState.get(group)); + } else { + valuesBuilder.appendNull(); + timestampBuilder.appendNull(); + } + hasValueBuilder.appendBoolean(i, hasValue(group)); + } + blocks[offset] = valuesBuilder.build(); + blocks[offset + 1] = timestampBuilder.build(); + blocks[offset + 2] = hasValueBuilder.build().asBlock(); + } + } + + public Block toFinal(DriverContext driverContext, IntVector selected) { + if (seen != null) { + try (var builder = driverContext.blockFactory().newLongBlockBuilder(selected.getPositionCount())) { + for (int i = 0; i < selected.getPositionCount(); i++) { + int group = selected.getInt(i); + if (seen.get(group)) { + builder.appendLong(valueState.get(group)); + } else { + builder.appendNull(); + } + } + return builder.build(); + } + } else { + try (var builder = driverContext.blockFactory().newLongBlockBuilder(selected.getPositionCount())) { + for (int i = 0; i < selected.getPositionCount(); i++) { + int group = selected.getInt(i); + builder.appendLong(valueState.get(group)); + } + return builder.build(); + } + } + } + + @Override + public void close() { + Releasables.close(valueState, timestampState, seen); + } + } +} diff --git a/x-pack/plugin/esql/qa/testFixtures/src/main/resources/first_last.csv-spec b/x-pack/plugin/esql/qa/testFixtures/src/main/resources/first_last.csv-spec new file mode 100644 index 0000000000000..48b72593545c7 --- /dev/null +++ b/x-pack/plugin/esql/qa/testFixtures/src/main/resources/first_last.csv-spec @@ -0,0 +1,48 @@ +first_no_grouping +required_capability: fn_first_last +FROM sample_data +| STATS event_duration = first_value(event_duration), message = first_value(message) +; + +event_duration:long | message:keyword +3450233 | Connected to 10.1.0.3 +; + + +first_no_grouping_with_timestamp +required_capability: fn_first_last +FROM sample_data +| STATS event_duration = first_value(event_duration, @timestamp), message = first_value(message, @timestamp) +; + +event_duration:long | message:keyword +3450233 | Connected to 10.1.0.3 +; + + +first_grouped +required_capability: fn_first_last +FROM sample_data +| STATS event_duration = first_value(event_duration), message = first_value(message) BY hourly = BUCKET(@timestamp, 1 hour) +| SORT hourly +| KEEP hourly, event_duration, message +; + +hourly:date | event_duration:long | message:keyword +2023-10-23T12:00:00.000Z | 3450233 | Connected to 10.1.0.3 +2023-10-23T13:00:00.000Z | 1232382 | Disconnected +; + + +first_grouped_with_timestamp +required_capability: fn_first_last +FROM sample_data +| STATS event_duration = first_value(event_duration, @timestamp), message = first_value(message, @timestamp) BY hourly = BUCKET(@timestamp, 1 hour) +| SORT hourly +| KEEP hourly, event_duration, message +; + +hourly:date | event_duration:long | message:keyword +2023-10-23T12:00:00.000Z | 3450233 | Connected to 10.1.0.3 +2023-10-23T13:00:00.000Z | 1232382 | Disconnected +; diff --git a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/action/EsqlCapabilities.java b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/action/EsqlCapabilities.java index 980e5402ea560..4d557e4d6fc38 100644 --- a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/action/EsqlCapabilities.java +++ b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/action/EsqlCapabilities.java @@ -204,6 +204,11 @@ public enum Cap { */ FN_ROUND_UL_FIXES, + /** + * Support for function {@code FIRST} and {@code LAST}. + */ + FN_FIRST_LAST, + /** * All functions that take TEXT should never emit TEXT, only KEYWORD. #114334 */ diff --git a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/expression/function/EsqlFunctionRegistry.java b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/expression/function/EsqlFunctionRegistry.java index 0535beab3e780..009f849a66cb7 100644 --- a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/expression/function/EsqlFunctionRegistry.java +++ b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/expression/function/EsqlFunctionRegistry.java @@ -20,6 +20,7 @@ import org.elasticsearch.xpack.esql.expression.function.aggregate.Avg; import org.elasticsearch.xpack.esql.expression.function.aggregate.Count; import org.elasticsearch.xpack.esql.expression.function.aggregate.CountDistinct; +import org.elasticsearch.xpack.esql.expression.function.aggregate.FirstValue; import org.elasticsearch.xpack.esql.expression.function.aggregate.Max; import org.elasticsearch.xpack.esql.expression.function.aggregate.Median; import org.elasticsearch.xpack.esql.expression.function.aggregate.MedianAbsoluteDeviation; @@ -293,6 +294,8 @@ private static FunctionDefinition[][] functions() { def(Avg.class, uni(Avg::new), "avg"), def(Count.class, uni(Count::new), "count"), def(CountDistinct.class, bi(CountDistinct::new), "count_distinct"), + def(FirstValue.class, bi(FirstValue::new), "first_value"), + // def(Last.class, uni(Last::new), "last"), def(Max.class, uni(Max::new), "max"), def(Median.class, uni(Median::new), "median"), def(MedianAbsoluteDeviation.class, uni(MedianAbsoluteDeviation::new), "median_absolute_deviation"), diff --git a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/expression/function/aggregate/AggregateWritables.java b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/expression/function/aggregate/AggregateWritables.java index db1d2a9e6f254..322da944a3217 100644 --- a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/expression/function/aggregate/AggregateWritables.java +++ b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/expression/function/aggregate/AggregateWritables.java @@ -18,6 +18,7 @@ public static List getNamedWriteables() { Avg.ENTRY, Count.ENTRY, CountDistinct.ENTRY, + FirstValue.ENTRY, Max.ENTRY, Median.ENTRY, MedianAbsoluteDeviation.ENTRY, diff --git a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/expression/function/aggregate/FirstValue.java b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/expression/function/aggregate/FirstValue.java new file mode 100644 index 0000000000000..c35700d010622 --- /dev/null +++ b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/expression/function/aggregate/FirstValue.java @@ -0,0 +1,141 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License + * 2.0; you may not use this file except in compliance with the Elastic License + * 2.0. + */ + +package org.elasticsearch.xpack.esql.expression.function.aggregate; + +import org.elasticsearch.common.io.stream.NamedWriteableRegistry; +import org.elasticsearch.common.io.stream.StreamInput; +import org.elasticsearch.compute.aggregation.AggregatorFunctionSupplier; +import org.elasticsearch.compute.aggregation.FirstValueBytesRefAggregatorFunctionSupplier; +import org.elasticsearch.compute.aggregation.FirstValueLongAggregatorFunctionSupplier; +import org.elasticsearch.core.Nullable; +import org.elasticsearch.xpack.esql.EsqlIllegalArgumentException; +import org.elasticsearch.xpack.esql.core.expression.Expression; +import org.elasticsearch.xpack.esql.core.expression.Literal; +import org.elasticsearch.xpack.esql.core.expression.TypeResolutions; +import org.elasticsearch.xpack.esql.core.expression.UnresolvedAttribute; +import org.elasticsearch.xpack.esql.core.tree.NodeInfo; +import org.elasticsearch.xpack.esql.core.tree.Source; +import org.elasticsearch.xpack.esql.core.type.DataType; +import org.elasticsearch.xpack.esql.expression.SurrogateExpression; +import org.elasticsearch.xpack.esql.expression.function.FunctionInfo; +import org.elasticsearch.xpack.esql.expression.function.FunctionType; +import org.elasticsearch.xpack.esql.expression.function.OptionalArgument; +import org.elasticsearch.xpack.esql.expression.function.Param; +import org.elasticsearch.xpack.esql.io.stream.PlanStreamInput; +import org.elasticsearch.xpack.esql.planner.ToAggregator; + +import java.io.IOException; +import java.util.List; +import java.util.Map; +import java.util.function.Supplier; + +import static org.elasticsearch.xpack.esql.core.expression.TypeResolutions.ParamOrdinal.DEFAULT; + +public class FirstValue extends AggregateFunction implements OptionalArgument, ToAggregator, SurrogateExpression { + + public static final NamedWriteableRegistry.Entry ENTRY = new NamedWriteableRegistry.Entry(Expression.class, "First", FirstValue::new); + + private static final Map> SUPPLIERS = Map.ofEntries( + Map.entry(DataType.LONG, FirstValueLongAggregatorFunctionSupplier::new), + Map.entry(DataType.KEYWORD, FirstValueBytesRefAggregatorFunctionSupplier::new), + Map.entry(DataType.TEXT, FirstValueBytesRefAggregatorFunctionSupplier::new), + Map.entry(DataType.SEMANTIC_TEXT, FirstValueBytesRefAggregatorFunctionSupplier::new), + Map.entry(DataType.VERSION, FirstValueBytesRefAggregatorFunctionSupplier::new) + ); + + @FunctionInfo( + returnType = { "long", "keyword" }, + description = "Picks the first value (by the timestamp) of the series.", + type = FunctionType.AGGREGATE, + examples = {} + ) + public FirstValue( + Source source, + @Param(name = "field", type = { "long", "keyword" }) Expression field, + @Nullable @Param( + optional = true, + name = "timestamp", + type = "long", + description = "Timestamp field to determine values order." + ) Expression timestamp + ) { + this(source, field, Literal.TRUE, timestamp != null ? List.of(timestamp) : List.of(new UnresolvedAttribute(source, "@timestamp"))); + } + + private FirstValue(StreamInput in) throws IOException { + this( + Source.readFrom((PlanStreamInput) in), + in.readNamedWriteable(Expression.class), + in.readNamedWriteable(Expression.class), + in.readNamedWriteableCollectionAsList(Expression.class) + ); + } + + private FirstValue(Source source, Expression field, Expression filter, List params) { + super(source, field, filter, params); + } + + @Override + public String getWriteableName() { + return ENTRY.name; + } + + @Override + protected NodeInfo info() { + return NodeInfo.create(this, FirstValue::new, field(), filter(), parameters()); + } + + @Override + public Expression replaceChildren(List newChildren) { + return new FirstValue(source(), newChildren.get(0), newChildren.get(1), newChildren.subList(2, newChildren.size())); + } + + @Override + public AggregateFunction withFilter(Expression filter) { + return new FirstValue(source(), field(), filter, parameters()); + } + + // TODO ensure BY is always timestamp/long + + @Override + public DataType dataType() { + return field().dataType().noText(); + } + + @Override + protected TypeResolution resolveType() { + return TypeResolutions.isType( + field(), + SUPPLIERS::containsKey, + sourceText(), + DEFAULT, + "representable except unsigned_long and spatial types" + ); + } + + @Override + public Expression surrogate() { + // TODO can this be optimized even further? + return field().foldable() ? field() : null; + } + + @Override + public AggregatorFunctionSupplier supplier() { + var type = field().dataType(); + var supplier = SUPPLIERS.get(type); + if (supplier == null) { + // If the type checking did its job, this should never happen + throw EsqlIllegalArgumentException.illegalDataType(type); + } + return supplier.get(); + } + + public Expression by() { + return parameters().isEmpty() ? null : parameters().getFirst(); + } +} diff --git a/x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/expression/AbstractExpressionSerializationTests.java b/x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/expression/AbstractExpressionSerializationTests.java index 050293e58c19d..46078b6de8d20 100644 --- a/x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/expression/AbstractExpressionSerializationTests.java +++ b/x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/expression/AbstractExpressionSerializationTests.java @@ -23,6 +23,16 @@ public static Expression mutateExpression(Expression expression) { return randomValueOtherThan(expression, AbstractExpressionSerializationTests::randomChild); } + public static Expression mutateNullableExpression(Expression expression) { + if (expression == null) { + return randomChild(); + } else if (randomBoolean()) { + return null; + } else { + return randomValueOtherThan(expression, AbstractExpressionSerializationTests::randomChild); + } + } + @Override protected final NamedWriteableRegistry getNamedWriteableRegistry() { return new NamedWriteableRegistry(ExpressionWritables.getNamedWriteables()); diff --git a/x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/expression/function/aggregate/FirstValueSerializationTests.java b/x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/expression/function/aggregate/FirstValueSerializationTests.java new file mode 100644 index 0000000000000..fc7d1cd0645e7 --- /dev/null +++ b/x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/expression/function/aggregate/FirstValueSerializationTests.java @@ -0,0 +1,38 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License + * 2.0; you may not use this file except in compliance with the Elastic License + * 2.0. + */ + +package org.elasticsearch.xpack.esql.expression.function.aggregate; + +import org.elasticsearch.xpack.esql.expression.AbstractExpressionSerializationTests; + +import java.io.IOException; + +public class FirstValueSerializationTests extends AbstractExpressionSerializationTests { + + @Override + protected FirstValue createTestInstance() { + return new FirstValue(randomSource(), randomChild(), randomBoolean() ? randomChild() : null); + } + + @Override + protected FirstValue mutateInstance(FirstValue instance) throws IOException { + var source = instance.source(); + var field = instance.field(); + var precision = instance.by(); + if (randomBoolean()) { + field = mutateExpression(field); + } else { + precision = mutateNullableExpression(precision); + } + return new FirstValue(source, field, precision); + } + + @Override + protected boolean alwaysEmptySource() { + return true; + } +} diff --git a/x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/expression/function/aggregate/FirstValueTests.java b/x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/expression/function/aggregate/FirstValueTests.java new file mode 100644 index 0000000000000..a9b8878c41bb7 --- /dev/null +++ b/x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/expression/function/aggregate/FirstValueTests.java @@ -0,0 +1,73 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License + * 2.0; you may not use this file except in compliance with the Elastic License + * 2.0. + */ + +package org.elasticsearch.xpack.esql.expression.function.aggregate; + +import com.carrotsearch.randomizedtesting.annotations.Name; +import com.carrotsearch.randomizedtesting.annotations.ParametersFactory; + +import org.apache.lucene.util.BytesRef; +import org.elasticsearch.xpack.esql.core.expression.Expression; +import org.elasticsearch.xpack.esql.core.tree.Source; +import org.elasticsearch.xpack.esql.core.type.DataType; +import org.elasticsearch.xpack.esql.expression.function.AbstractAggregationTestCase; +import org.elasticsearch.xpack.esql.expression.function.TestCaseSupplier; + +import java.util.ArrayList; +import java.util.List; +import java.util.function.Supplier; + +import static org.hamcrest.Matchers.equalTo; + +public class FirstValueTests extends AbstractAggregationTestCase { + public FirstValueTests(@Name("TestCase") Supplier testCaseSupplier) { + this.testCase = testCaseSupplier.get(); + } + + @ParametersFactory + public static Iterable parameters() { + var suppliers = new ArrayList(); + + suppliers.add( + new TestCaseSupplier( + "first(long, long): long", + List.of(DataType.LONG, DataType.LONG), + () -> new TestCaseSupplier.TestCase( + List.of( + TestCaseSupplier.TypedData.multiRow(List.of(1000L, 999L), DataType.LONG, "field"), + TestCaseSupplier.TypedData.multiRow(List.of(2025L, 2026L), DataType.LONG, "timestamp") + ), + "FirstValue[field=Attribute[channel=0], timestamp=Attribute[channel=1]]", + DataType.LONG, + equalTo(1000L) + ) + ) + ); + suppliers.add( + new TestCaseSupplier( + "first(keyword, long): keyword", + List.of(DataType.KEYWORD, DataType.LONG), + () -> new TestCaseSupplier.TestCase( + List.of( + TestCaseSupplier.TypedData.multiRow(List.of(new BytesRef("1000"), new BytesRef("999")), DataType.KEYWORD, "field"), + TestCaseSupplier.TypedData.multiRow(List.of(2025L, 2026L), DataType.LONG, "timestamp") + ), + "FirstValue[field=Attribute[channel=0], timestamp=Attribute[channel=1]]", + DataType.KEYWORD, + equalTo(new BytesRef("1000")) + ) + ) + ); + + return parameterSuppliersFromTypedDataWithDefaultChecksNoErrors(suppliers); + } + + @Override + protected Expression build(Source source, List args) { + return new FirstValue(source, args.get(0), args.get(1)); + } +}