Skip to content

Commit 2ba29f0

Browse files
committed
introduce AggregateMetricDoubleVector?
1 parent 87ac9d6 commit 2ba29f0

File tree

3 files changed

+123
-33
lines changed

3 files changed

+123
-33
lines changed

x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/data/AggregateMetricDoubleBlockBuilder.java

Lines changed: 65 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -205,4 +205,69 @@ public TransportVersion getMinimalSupportedVersion() {
205205
}
206206

207207
}
208+
209+
public static class AggregateMetricDoubleVectorBuilder extends AbstractBlockBuilder {
210+
private final DoubleBlockBuilder valuesBuilder;
211+
212+
public AggregateMetricDoubleVectorBuilder(int estimatedSize, BlockFactory blockFactory) {
213+
super(blockFactory);
214+
valuesBuilder = new DoubleBlockBuilder(estimatedSize, blockFactory);
215+
}
216+
217+
@Override
218+
protected int valuesLength() {
219+
throw new UnsupportedOperationException("Not available on aggregate_metric_double");
220+
}
221+
222+
@Override
223+
protected void growValuesArray(int newSize) {
224+
throw new UnsupportedOperationException("Not available on aggregate_metric_double");
225+
}
226+
227+
@Override
228+
protected int elementSize() {
229+
throw new UnsupportedOperationException("Not available on aggregate_metric_double");
230+
}
231+
232+
@Override
233+
public Block.Builder copyFrom(Block block, int beginInclusive, int endExclusive) {
234+
// TODO
235+
return null;
236+
}
237+
238+
@Override
239+
public Block.Builder mvOrdering(Block.MvOrdering mvOrdering) {
240+
// TODO
241+
return null;
242+
}
243+
244+
public void appendValue(double value) {
245+
valuesBuilder.appendDouble(value);
246+
}
247+
248+
@Override
249+
public Block build() {
250+
Block[] blocks = new Block[4];
251+
Block block = null;
252+
ConstantIntVector countVector = null;
253+
boolean success = false;
254+
try {
255+
finish();
256+
block = valuesBuilder.build();
257+
countVector = new ConstantIntVector(1, block.getPositionCount(), blockFactory);
258+
blocks[Metric.MIN.getIndex()] = block;
259+
blocks[Metric.MAX.getIndex()] = block;
260+
blocks[Metric.SUM.getIndex()] = block;
261+
blocks[Metric.COUNT.getIndex()] = countVector.asBlock();
262+
CompositeBlock compositeBlock = new CompositeBlock(blocks);
263+
success = true;
264+
return compositeBlock;
265+
} finally {
266+
if (success == false) {
267+
Releasables.closeExpectNoException(block, countVector);
268+
}
269+
}
270+
}
271+
272+
}
208273
}

x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/data/BlockFactory.java

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -436,6 +436,10 @@ public AggregateMetricDoubleBlockBuilder newAggregateMetricDoubleBlockBuilder(in
436436
return new AggregateMetricDoubleBlockBuilder(estimatedSize, this);
437437
}
438438

439+
public AggregateMetricDoubleBlockBuilder.AggregateMetricDoubleVectorBuilder newAggregateMetricDoubleVectorBuilder(int estimatedSize) {
440+
return new AggregateMetricDoubleBlockBuilder.AggregateMetricDoubleVectorBuilder(estimatedSize, this);
441+
}
442+
439443
public final Block newConstantAggregateMetricDoubleBlock(
440444
AggregateMetricDoubleBlockBuilder.AggregateMetricDoubleLiteral value,
441445
int positions

x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/expression/function/scalar/convert/ToAggregateMetricDouble.java

Lines changed: 54 additions & 33 deletions
Original file line numberDiff line numberDiff line change
@@ -12,9 +12,11 @@
1212
import org.elasticsearch.compute.data.AggregateMetricDoubleBlockBuilder;
1313
import org.elasticsearch.compute.data.Block;
1414
import org.elasticsearch.compute.data.DoubleBlock;
15+
import org.elasticsearch.compute.data.DoubleVector;
1516
import org.elasticsearch.compute.data.IntBlock;
1617
import org.elasticsearch.compute.data.LongBlock;
1718
import org.elasticsearch.compute.data.Page;
19+
import org.elasticsearch.compute.data.Vector;
1820
import org.elasticsearch.compute.operator.DriverContext;
1921
import org.elasticsearch.compute.operator.EvalOperator;
2022
import org.elasticsearch.core.Releasables;
@@ -25,7 +27,6 @@
2527
import org.elasticsearch.xpack.esql.core.type.DataType;
2628
import org.elasticsearch.xpack.esql.expression.function.FunctionInfo;
2729
import org.elasticsearch.xpack.esql.expression.function.Param;
28-
import org.elasticsearch.xpack.esql.expression.function.scalar.math.Cast;
2930
import org.elasticsearch.xpack.esql.type.EsqlDataTypeConverter;
3031

3132
import java.io.IOException;
@@ -131,40 +132,60 @@ public EvalOperator.ExpressionEvaluator get(DriverContext context) {
131132
final EvalOperator.ExpressionEvaluator eval = fieldEvaluator.get(context);
132133

133134
return new EvalOperator.ExpressionEvaluator() {
135+
private Block evalBlock(Block block) {
136+
int positionCount = block.getPositionCount();
137+
DoubleBlock doubleBlock = (DoubleBlock) block;
138+
try (
139+
AggregateMetricDoubleBlockBuilder result = context.blockFactory()
140+
.newAggregateMetricDoubleBlockBuilder(positionCount)
141+
) {
142+
CompensatedSum sum = new CompensatedSum();
143+
for (int p = 0; p < positionCount; p++) {
144+
int valueCount = doubleBlock.getValueCount(p);
145+
int start = doubleBlock.getFirstValueIndex(p);
146+
int end = start + valueCount;
147+
if (valueCount == 0) {
148+
result.appendNull();
149+
continue;
150+
}
151+
double min = Double.POSITIVE_INFINITY;
152+
double max = Double.NEGATIVE_INFINITY;
153+
for (int i = start; i < end; i++) {
154+
double current = doubleBlock.getDouble(i);
155+
min = Math.min(min, current);
156+
max = Math.max(max, current);
157+
sum.add(current);
158+
}
159+
result.min().appendDouble(min);
160+
result.max().appendDouble(max);
161+
result.sum().appendDouble(sum.value());
162+
result.count().appendInt(valueCount);
163+
sum.reset(0, 0);
164+
}
165+
return result.build();
166+
}
167+
}
168+
169+
private Block evalVector(Vector vector) {
170+
int positionCount = vector.getPositionCount();
171+
DoubleVector doubleVector = (DoubleVector) vector;
172+
try (
173+
AggregateMetricDoubleBlockBuilder.AggregateMetricDoubleVectorBuilder builder = context.blockFactory()
174+
.newAggregateMetricDoubleVectorBuilder(positionCount)
175+
) {
176+
for (int p = 0; p < positionCount; p++) {
177+
double value = doubleVector.getDouble(p);
178+
builder.appendValue(value);
179+
}
180+
return builder.build();
181+
}
182+
}
183+
134184
@Override
135185
public Block eval(Page page) {
136186
try (Block block = eval.eval(page)) {
137-
int positionCount = block.getPositionCount();
138-
DoubleBlock doubleBlock = (DoubleBlock) block;
139-
try (
140-
AggregateMetricDoubleBlockBuilder result = context.blockFactory()
141-
.newAggregateMetricDoubleBlockBuilder(positionCount)
142-
) {
143-
CompensatedSum sum = new CompensatedSum();
144-
for (int p = 0; p < positionCount; p++) {
145-
int valueCount = doubleBlock.getValueCount(p);
146-
int start = doubleBlock.getFirstValueIndex(p);
147-
int end = start + valueCount;
148-
if (valueCount == 0) {
149-
result.appendNull();
150-
continue;
151-
}
152-
double min = Double.POSITIVE_INFINITY;
153-
double max = Double.NEGATIVE_INFINITY;
154-
for (int i = start; i < end; i++) {
155-
double current = doubleBlock.getDouble(i);
156-
min = Math.min(min, current);
157-
max = Math.max(max, current);
158-
sum.add(current);
159-
}
160-
result.min().appendDouble(min);
161-
result.max().appendDouble(max);
162-
result.sum().appendDouble(sum.value());
163-
result.count().appendInt(valueCount);
164-
sum.reset(0, 0);
165-
}
166-
return result.build();
167-
}
187+
Vector vector = block.asVector();
188+
return vector == null ? evalBlock(block) : evalVector(vector);
168189
}
169190
}
170191

@@ -175,7 +196,7 @@ public void close() {
175196

176197
@Override
177198
public String toString() {
178-
return "ToAggregateMetricDoubleFromDoubleEvaluator[field=" + eval + "]";
199+
return "ToAggregateMetricDoubleEvaluator[field=" + eval + "]";
179200
}
180201
};
181202
}

0 commit comments

Comments
 (0)