Skip to content

Commit 2ccfd46

Browse files
committed
introduce AggregateMetricDoubleVector?
1 parent 227de7d commit 2ccfd46

File tree

3 files changed

+122
-31
lines changed

3 files changed

+122
-31
lines changed

x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/data/AggregateMetricDoubleBlockBuilder.java

Lines changed: 65 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -205,4 +205,69 @@ public TransportVersion getMinimalSupportedVersion() {
205205
}
206206

207207
}
208+
209+
public static class AggregateMetricDoubleVectorBuilder extends AbstractBlockBuilder {
210+
private final DoubleBlockBuilder valuesBuilder;
211+
212+
public AggregateMetricDoubleVectorBuilder(int estimatedSize, BlockFactory blockFactory) {
213+
super(blockFactory);
214+
valuesBuilder = new DoubleBlockBuilder(estimatedSize, blockFactory);
215+
}
216+
217+
@Override
218+
protected int valuesLength() {
219+
throw new UnsupportedOperationException("Not available on aggregate_metric_double");
220+
}
221+
222+
@Override
223+
protected void growValuesArray(int newSize) {
224+
throw new UnsupportedOperationException("Not available on aggregate_metric_double");
225+
}
226+
227+
@Override
228+
protected int elementSize() {
229+
throw new UnsupportedOperationException("Not available on aggregate_metric_double");
230+
}
231+
232+
@Override
233+
public Block.Builder copyFrom(Block block, int beginInclusive, int endExclusive) {
234+
// TODO
235+
return null;
236+
}
237+
238+
@Override
239+
public Block.Builder mvOrdering(Block.MvOrdering mvOrdering) {
240+
// TODO
241+
return null;
242+
}
243+
244+
public void appendValue(double value) {
245+
valuesBuilder.appendDouble(value);
246+
}
247+
248+
@Override
249+
public Block build() {
250+
Block[] blocks = new Block[4];
251+
Block block = null;
252+
ConstantIntVector countVector = null;
253+
boolean success = false;
254+
try {
255+
finish();
256+
block = valuesBuilder.build();
257+
countVector = new ConstantIntVector(1, block.getPositionCount(), blockFactory);
258+
blocks[Metric.MIN.getIndex()] = block;
259+
blocks[Metric.MAX.getIndex()] = block;
260+
blocks[Metric.SUM.getIndex()] = block;
261+
blocks[Metric.COUNT.getIndex()] = countVector.asBlock();
262+
CompositeBlock compositeBlock = new CompositeBlock(blocks);
263+
success = true;
264+
return compositeBlock;
265+
} finally {
266+
if (success == false) {
267+
Releasables.closeExpectNoException(block, countVector);
268+
}
269+
}
270+
}
271+
272+
}
208273
}

x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/data/BlockFactory.java

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -436,6 +436,10 @@ public AggregateMetricDoubleBlockBuilder newAggregateMetricDoubleBlockBuilder(in
436436
return new AggregateMetricDoubleBlockBuilder(estimatedSize, this);
437437
}
438438

439+
public AggregateMetricDoubleBlockBuilder.AggregateMetricDoubleVectorBuilder newAggregateMetricDoubleVectorBuilder(int estimatedSize) {
440+
return new AggregateMetricDoubleBlockBuilder.AggregateMetricDoubleVectorBuilder(estimatedSize, this);
441+
}
442+
439443
public final Block newConstantAggregateMetricDoubleBlock(
440444
AggregateMetricDoubleBlockBuilder.AggregateMetricDoubleLiteral value,
441445
int positions

x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/expression/function/scalar/convert/ToAggregateMetricDouble.java

Lines changed: 53 additions & 31 deletions
Original file line numberDiff line numberDiff line change
@@ -12,9 +12,11 @@
1212
import org.elasticsearch.compute.data.AggregateMetricDoubleBlockBuilder;
1313
import org.elasticsearch.compute.data.Block;
1414
import org.elasticsearch.compute.data.DoubleBlock;
15+
import org.elasticsearch.compute.data.DoubleVector;
1516
import org.elasticsearch.compute.data.IntBlock;
1617
import org.elasticsearch.compute.data.LongBlock;
1718
import org.elasticsearch.compute.data.Page;
19+
import org.elasticsearch.compute.data.Vector;
1820
import org.elasticsearch.compute.operator.DriverContext;
1921
import org.elasticsearch.compute.operator.EvalOperator;
2022
import org.elasticsearch.core.Releasables;
@@ -130,40 +132,60 @@ public EvalOperator.ExpressionEvaluator get(DriverContext context) {
130132
final EvalOperator.ExpressionEvaluator eval = fieldEvaluator.get(context);
131133

132134
return new EvalOperator.ExpressionEvaluator() {
135+
private Block evalBlock(Block block) {
136+
int positionCount = block.getPositionCount();
137+
DoubleBlock doubleBlock = (DoubleBlock) block;
138+
try (
139+
AggregateMetricDoubleBlockBuilder result = context.blockFactory()
140+
.newAggregateMetricDoubleBlockBuilder(positionCount)
141+
) {
142+
CompensatedSum sum = new CompensatedSum();
143+
for (int p = 0; p < positionCount; p++) {
144+
int valueCount = doubleBlock.getValueCount(p);
145+
int start = doubleBlock.getFirstValueIndex(p);
146+
int end = start + valueCount;
147+
if (valueCount == 0) {
148+
result.appendNull();
149+
continue;
150+
}
151+
double min = Double.POSITIVE_INFINITY;
152+
double max = Double.NEGATIVE_INFINITY;
153+
for (int i = start; i < end; i++) {
154+
double current = doubleBlock.getDouble(i);
155+
min = Math.min(min, current);
156+
max = Math.max(max, current);
157+
sum.add(current);
158+
}
159+
result.min().appendDouble(min);
160+
result.max().appendDouble(max);
161+
result.sum().appendDouble(sum.value());
162+
result.count().appendInt(valueCount);
163+
sum.reset(0, 0);
164+
}
165+
return result.build();
166+
}
167+
}
168+
169+
private Block evalVector(Vector vector) {
170+
int positionCount = vector.getPositionCount();
171+
DoubleVector doubleVector = (DoubleVector) vector;
172+
try (
173+
AggregateMetricDoubleBlockBuilder.AggregateMetricDoubleVectorBuilder builder = context.blockFactory()
174+
.newAggregateMetricDoubleVectorBuilder(positionCount)
175+
) {
176+
for (int p = 0; p < positionCount; p++) {
177+
double value = doubleVector.getDouble(p);
178+
builder.appendValue(value);
179+
}
180+
return builder.build();
181+
}
182+
}
183+
133184
@Override
134185
public Block eval(Page page) {
135186
try (Block block = eval.eval(page)) {
136-
int positionCount = block.getPositionCount();
137-
DoubleBlock doubleBlock = (DoubleBlock) block;
138-
try (
139-
AggregateMetricDoubleBlockBuilder result = context.blockFactory()
140-
.newAggregateMetricDoubleBlockBuilder(positionCount)
141-
) {
142-
CompensatedSum sum = new CompensatedSum();
143-
for (int p = 0; p < positionCount; p++) {
144-
int valueCount = doubleBlock.getValueCount(p);
145-
int start = doubleBlock.getFirstValueIndex(p);
146-
int end = start + valueCount;
147-
if (valueCount == 0) {
148-
result.appendNull();
149-
continue;
150-
}
151-
double min = Double.POSITIVE_INFINITY;
152-
double max = Double.NEGATIVE_INFINITY;
153-
for (int i = start; i < end; i++) {
154-
double current = doubleBlock.getDouble(i);
155-
min = Math.min(min, current);
156-
max = Math.max(max, current);
157-
sum.add(current);
158-
}
159-
result.min().appendDouble(min);
160-
result.max().appendDouble(max);
161-
result.sum().appendDouble(sum.value());
162-
result.count().appendInt(valueCount);
163-
sum.reset(0, 0);
164-
}
165-
return result.build();
166-
}
187+
Vector vector = block.asVector();
188+
return vector == null ? evalBlock(block) : evalVector(vector);
167189
}
168190
}
169191

0 commit comments

Comments
 (0)