Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -417,6 +417,8 @@ interface BlockFactory {
SingletonOrdinalsBuilder singletonOrdinalsBuilder(SortedDocValues ordinals, int count);

// TODO support non-singleton ords

AggregateDoubleMetricBuilder aggregateDoubleMetricBuilder(int count);
}

/**
Expand Down Expand Up @@ -501,4 +503,10 @@ interface SingletonOrdinalsBuilder extends Builder {
*/
SingletonOrdinalsBuilder appendOrd(int value);
}

interface AggregateDoubleMetricBuilder extends Builder {

AggregateDoubleMetricBuilder append(double min, double max, double sum, int valueCount);

}
}
Original file line number Diff line number Diff line change
Expand Up @@ -147,6 +147,11 @@ public SingletonOrdsBuilder appendOrd(int value) {
}
return new SingletonOrdsBuilder();
}

@Override
public BlockLoader.AggregateDoubleMetricBuilder aggregateDoubleMetricBuilder(int count) {
return null;
}
};
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,4 +14,5 @@
public class EsqlCorePlugin extends Plugin implements ExtensiblePlugin {

public static final FeatureFlag SEMANTIC_TEXT_FEATURE_FLAG = new FeatureFlag("esql_semantic_text");
public static final FeatureFlag AGGREGATE_METRIC_DOUBLE_FEATURE_FLAG = new FeatureFlag("esql_aggregate_metric_double");
}
Original file line number Diff line number Diff line change
Expand Up @@ -307,7 +307,9 @@ public enum DataType {
* loaded from the index and ESQL will load these fields as strings without their attached
* chunks or embeddings.
*/
SEMANTIC_TEXT(builder().esType("semantic_text").unknownSize());
SEMANTIC_TEXT(builder().esType("semantic_text").unknownSize()),

AGGREGATE_METRIC_DOUBLE(builder().esType("aggregate_metric_double").unknownSize());

/**
* Types that are actively being built. These types are not returned
Expand All @@ -316,7 +318,8 @@ public enum DataType {
* check that sending them to a function produces a sane error message.
*/
public static final Map<DataType, FeatureFlag> UNDER_CONSTRUCTION = Map.ofEntries(
Map.entry(SEMANTIC_TEXT, EsqlCorePlugin.SEMANTIC_TEXT_FEATURE_FLAG)
Map.entry(SEMANTIC_TEXT, EsqlCorePlugin.SEMANTIC_TEXT_FEATURE_FLAG),
Map.entry(AGGREGATE_METRIC_DOUBLE, EsqlCorePlugin.AGGREGATE_METRIC_DOUBLE_FEATURE_FLAG)
);

private final String typeName;
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,199 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License
* 2.0; you may not use this file except in compliance with the Elastic License
* 2.0.
*/
package org.elasticsearch.compute.aggregation;

import org.elasticsearch.compute.data.Block;
import org.elasticsearch.compute.data.BooleanBlock;
import org.elasticsearch.compute.data.BooleanVector;
import org.elasticsearch.compute.data.CompositeBlock;
import org.elasticsearch.compute.data.DoubleBlock;
import org.elasticsearch.compute.data.DoubleVector;
import org.elasticsearch.compute.data.ElementType;
import org.elasticsearch.compute.data.Page;
import org.elasticsearch.compute.operator.DriverContext;

import java.util.List;

/**
* {@link AggregatorFunctionSupplier} implementation for {@link MaxDoubleAggregator} with aggregated double metric field.
*/
public final class MaxAggregatedMetricDoubleAggregatorFunctionSupplier implements AggregatorFunctionSupplier {

private static final int MAX_COMPOSITE_SUBBLOCK = 1;

private final List<Integer> channels;

public MaxAggregatedMetricDoubleAggregatorFunctionSupplier(List<Integer> channels) {
this.channels = channels;
}

@Override
public AggregatorFunction aggregator(DriverContext driverContext) {
// Copied from MaxDoubleAggregatorFunction, see change comments for actual differences:
final DoubleState state = new DoubleState(MaxDoubleAggregator.init());
return new AggregatorFunction() {

static final List<IntermediateStateDesc> INTERMEDIATE_STATE_DESC = List.of(
new IntermediateStateDesc("max", ElementType.DOUBLE),
new IntermediateStateDesc("seen", ElementType.BOOLEAN)
);

public static List<IntermediateStateDesc> intermediateStateDesc() {
return INTERMEDIATE_STATE_DESC;
}

@Override
public int intermediateBlockCount() {
return INTERMEDIATE_STATE_DESC.size();
}

@Override
public void addRawInput(Page page, BooleanVector mask) {
if (mask.allFalse()) {
// Entire page masked away
return;
}
if (mask.allTrue()) {
// No masking
// CHANGE:
CompositeBlock compositeBlock = page.getBlock(channels.get(0));
DoubleBlock block = compositeBlock.getBlock(MAX_COMPOSITE_SUBBLOCK);
// END CHANGE:
DoubleVector vector = block.asVector();
if (vector != null) {
addRawVector(vector);
} else {
addRawBlock(block);
}
return;
}
// Some positions masked away, others kept
// CHANGE:
CompositeBlock compositeBlock = page.getBlock(channels.get(0));
DoubleBlock block = compositeBlock.getBlock(MAX_COMPOSITE_SUBBLOCK);
// END CHANGE:
DoubleVector vector = block.asVector();
if (vector != null) {
addRawVector(vector, mask);
} else {
addRawBlock(block, mask);
}
}

private void addRawVector(DoubleVector vector) {
state.seen(true);
for (int i = 0; i < vector.getPositionCount(); i++) {
state.doubleValue(MaxDoubleAggregator.combine(state.doubleValue(), vector.getDouble(i)));
}
}

private void addRawVector(DoubleVector vector, BooleanVector mask) {
state.seen(true);
for (int i = 0; i < vector.getPositionCount(); i++) {
if (mask.getBoolean(i) == false) {
continue;
}
state.doubleValue(MaxDoubleAggregator.combine(state.doubleValue(), vector.getDouble(i)));
}
}

private void addRawBlock(DoubleBlock block) {
for (int p = 0; p < block.getPositionCount(); p++) {
if (block.isNull(p)) {
continue;
}
state.seen(true);
int start = block.getFirstValueIndex(p);
int end = start + block.getValueCount(p);
for (int i = start; i < end; i++) {
state.doubleValue(MaxDoubleAggregator.combine(state.doubleValue(), block.getDouble(i)));
}
}
}

private void addRawBlock(DoubleBlock block, BooleanVector mask) {
for (int p = 0; p < block.getPositionCount(); p++) {
if (mask.getBoolean(p) == false) {
continue;
}
if (block.isNull(p)) {
continue;
}
state.seen(true);
int start = block.getFirstValueIndex(p);
int end = start + block.getValueCount(p);
for (int i = start; i < end; i++) {
state.doubleValue(MaxDoubleAggregator.combine(state.doubleValue(), block.getDouble(i)));
}
}
}

@Override
public void addIntermediateInput(Page page) {
assert channels.size() == intermediateBlockCount();
assert page.getBlockCount() >= channels.get(0) + intermediateStateDesc().size();
Block maxUncast = page.getBlock(channels.get(0));
if (maxUncast.areAllValuesNull()) {
return;
}
DoubleVector max = ((DoubleBlock) maxUncast).asVector();
assert max.getPositionCount() == 1;
Block seenUncast = page.getBlock(channels.get(1));
if (seenUncast.areAllValuesNull()) {
return;
}
BooleanVector seen = ((BooleanBlock) seenUncast).asVector();
assert seen.getPositionCount() == 1;
if (seen.getBoolean(0)) {
state.doubleValue(MaxDoubleAggregator.combine(state.doubleValue(), max.getDouble(0)));
state.seen(true);
}
}

@Override
public void evaluateIntermediate(Block[] blocks, int offset, DriverContext driverContext) {
state.toIntermediate(blocks, offset, driverContext);
}

@Override
public void evaluateFinal(Block[] blocks, int offset, DriverContext driverContext) {
if (state.seen() == false) {
blocks[offset] = driverContext.blockFactory().newConstantNullBlock(1);
return;
}
blocks[offset] = driverContext.blockFactory().newConstantDoubleBlockWith(state.doubleValue(), 1);
}

@Override
public String toString() {
StringBuilder sb = new StringBuilder();
sb.append(getClass().getSimpleName()).append("[");
sb.append("channels=").append(channels);
sb.append("]");
return sb.toString();
}

@Override
public void close() {
state.close();
}

};
}

@Override
public MaxDoubleGroupingAggregatorFunction groupingAggregator(DriverContext driverContext) {
// TODO:
throw new UnsupportedOperationException("grouping aggregator is not supported yet");
// return MaxDoubleGroupingAggregatorFunction.create(channels, driverContext);
}

@Override
public String describe() {
return "max of aggregated doubles";
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,82 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License
* 2.0; you may not use this file except in compliance with the Elastic License
* 2.0.
*/

package org.elasticsearch.compute.data;

import org.elasticsearch.index.mapper.BlockLoader;

public class AggregateDoubleMetricBlockBuilder extends AbstractBlockBuilder implements BlockLoader.AggregateDoubleMetricBuilder {

private final DoubleBlockBuilder minBuilder;
private final DoubleBlockBuilder maxBuilder;
private final DoubleBlockBuilder sumBuilder;
private final IntBlockBuilder countBuilder;

public AggregateDoubleMetricBlockBuilder(int estimatedSize, BlockFactory blockFactory) {
super(blockFactory);
minBuilder = new DoubleBlockBuilder(estimatedSize, blockFactory);
maxBuilder = new DoubleBlockBuilder(estimatedSize, blockFactory);
sumBuilder = new DoubleBlockBuilder(estimatedSize, blockFactory);
countBuilder = new IntBlockBuilder(estimatedSize, blockFactory);
}

@Override
protected int valuesLength() {
return minBuilder.valuesLength();
}

@Override
protected void growValuesArray(int newSize) {
minBuilder.growValuesArray(newSize);
maxBuilder.growValuesArray(newSize);
sumBuilder.growValuesArray(newSize);
countBuilder.growValuesArray(newSize);
}

@Override
protected int elementSize() {
return minBuilder.elementSize() + maxBuilder.elementSize() + sumBuilder.elementSize() + countBuilder.elementSize();
}

@Override
public Block.Builder copyFrom(Block block, int beginInclusive, int endExclusive) {
CompositeBlock composite = (CompositeBlock) block;
minBuilder.copyFrom(composite.getBlock(0), beginInclusive, endExclusive);
maxBuilder.copyFrom(composite.getBlock(1), beginInclusive, endExclusive);
sumBuilder.copyFrom(composite.getBlock(2), beginInclusive, endExclusive);
countBuilder.copyFrom(composite.getBlock(3), beginInclusive, endExclusive);
return this;
}

@Override
public Block.Builder mvOrdering(Block.MvOrdering mvOrdering) {
minBuilder.mvOrdering(mvOrdering);
maxBuilder.mvOrdering(mvOrdering);
sumBuilder.mvOrdering(mvOrdering);
countBuilder.mvOrdering(mvOrdering);
return this;
}

@Override
public Block build() {
Block[] blocks = new Block[4];
blocks[0] = minBuilder.build();
blocks[1] = maxBuilder.build();
blocks[2] = sumBuilder.build();
blocks[3] = countBuilder.build();
return new CompositeBlock(blocks);
}

@Override
public BlockLoader.AggregateDoubleMetricBuilder append(double min, double max, double sum, int valueCount) {
minBuilder.appendDouble(min);
maxBuilder.appendDouble(max);
sumBuilder.appendDouble(sum);
countBuilder.appendInt(valueCount);
return this;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -432,6 +432,10 @@ public Block newConstantNullBlock(int positions) {
return b;
}

public AggregateDoubleMetricBlockBuilder newAggregatedDoubleMetricBlockBuilder(int estimatedSize) {
return new AggregateDoubleMetricBlockBuilder(estimatedSize, this);
}

/**
* Returns the maximum number of bytes that a Block should be backed by a primitive array before switching to using BigArrays.
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -268,6 +268,8 @@ private static Object valueAtOffset(Block block, int offset) {
case BOOLEAN -> ((BooleanBlock) block).getBoolean(offset);
case BYTES_REF -> BytesRef.deepCopyOf(((BytesRefBlock) block).getBytesRef(offset, new BytesRef()));
case DOUBLE -> ((DoubleBlock) block).getDouble(offset);
// TODO: include all aggregate double metric sub fields. Currently hardcoded to max sub field, which is always the second block:
case AGGREGATED_DOUBLE_METRIC -> ((DoubleBlock) ((CompositeBlock) block).getBlock(1)).getDouble(offset);
case FLOAT -> ((FloatBlock) block).getFloat(offset);
case INT -> ((IntBlock) block).getInt(offset);
case LONG -> ((LongBlock) block).getLong(offset);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -86,22 +86,30 @@ public int getPositionCount() {

@Override
public int getTotalValueCount() {
throw new UnsupportedOperationException("Composite block");
// TODO: this works for aggregate metric double fields, because the four blocks are guarenteed to have the same length / positions.
// So check just for one sub block is sufficient.
return blocks[0].getTotalValueCount();
}

@Override
public int getFirstValueIndex(int position) {
throw new UnsupportedOperationException("Composite block");
// TODO: this works for aggregate metric double fields, because the four blocks are guarenteed to have the same length / positions.
// So check just for one sub block is sufficient.
return blocks[0].getFirstValueIndex(position);
}

@Override
public int getValueCount(int position) {
throw new UnsupportedOperationException("Composite block");
// TODO: this works for aggregate metric double fields, because the four blocks are guarenteed to have the same length / positions.
// So check just for one sub block is sufficient.
return blocks[0].getValueCount(position);
}

@Override
public boolean isNull(int position) {
throw new UnsupportedOperationException("Composite block");
// TODO: this works for aggregate metric double fields, because the four blocks are guarenteed to have the same length / positions.
// So check just for one sub block is sufficient.
return blocks[0].isNull(position);
}

@Override
Expand Down
Loading