Skip to content

Commit 6ce22f3

Browse files
committed
convert AggMetricBlock and CompositeBlock in readTypedBlock, todos, and moving functions
1 parent 865a57b commit 6ce22f3

File tree

7 files changed

+49
-61
lines changed

7 files changed

+49
-61
lines changed

x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/data/AggregateMetricDoubleBlock.java

Lines changed: 34 additions & 23 deletions
Original file line numberDiff line numberDiff line change
@@ -55,6 +55,15 @@ public static AggregateMetricDoubleBlock fromCompositeBlock(CompositeBlock block
5555
return new AggregateMetricDoubleBlock(min, max, sum, count);
5656
}
5757

58+
public static CompositeBlock toCompositeBlock(AggregateMetricDoubleBlock block) {
59+
final Block[] blocks = new Block[4];
60+
blocks[AggregateMetricDoubleBlockBuilder.Metric.MIN.getIndex()] = block.minBlock();
61+
blocks[AggregateMetricDoubleBlockBuilder.Metric.MAX.getIndex()] = block.maxBlock();
62+
blocks[AggregateMetricDoubleBlockBuilder.Metric.SUM.getIndex()] = block.sumBlock();
63+
blocks[AggregateMetricDoubleBlockBuilder.Metric.COUNT.getIndex()] = block.countBlock();
64+
return new CompositeBlock(blocks);
65+
}
66+
5867
@Override
5968
protected void closeInternal() {
6069
Releasables.close(minBlock, maxBlock, sumBlock, countBlock);
@@ -193,18 +202,42 @@ public ReleasableIterator<? extends Block> lookup(IntBlock positions, ByteSizeVa
193202

194203
@Override
195204
public MvOrdering mvOrdering() {
205+
// TODO: determine based on sub-blocks
196206
return MvOrdering.UNORDERED;
197207
}
198208

199209
@Override
200210
public Block expand() {
211+
// TODO: support
201212
throw new UnsupportedOperationException("AggregateMetricDoubleBlock");
202213
}
203214

204215
@Override
205216
public void writeTo(StreamOutput out) throws IOException {
206217
for (Block block : List.of(minBlock, maxBlock, sumBlock, countBlock)) {
207-
Block.writeTypedBlock(block, out);
218+
block.writeTo(out);
219+
}
220+
}
221+
222+
public static Block readFrom(StreamInput in) throws IOException {
223+
boolean success = false;
224+
DoubleBlock minBlock = null;
225+
DoubleBlock maxBlock = null;
226+
DoubleBlock sumBlock = null;
227+
IntBlock countBlock = null;
228+
BlockStreamInput blockStreamInput = (BlockStreamInput) in;
229+
try {
230+
minBlock = DoubleBlock.readFrom(blockStreamInput);
231+
maxBlock = DoubleBlock.readFrom(blockStreamInput);
232+
sumBlock = DoubleBlock.readFrom(blockStreamInput);
233+
countBlock = IntBlock.readFrom(blockStreamInput);
234+
AggregateMetricDoubleBlock result = new AggregateMetricDoubleBlock(minBlock, maxBlock, sumBlock, countBlock);
235+
success = true;
236+
return result;
237+
} finally {
238+
if (success == false) {
239+
Releasables.close(minBlock, maxBlock, sumBlock, countBlock);
240+
}
208241
}
209242
}
210243

@@ -236,28 +269,6 @@ public int hashCode() {
236269
);
237270
}
238271

239-
public static Block readFrom(StreamInput in) throws IOException {
240-
boolean success = false;
241-
DoubleBlock minBlock = null;
242-
DoubleBlock maxBlock = null;
243-
DoubleBlock sumBlock = null;
244-
IntBlock countBlock = null;
245-
BlockStreamInput blockStreamInput = (BlockStreamInput) in;
246-
try {
247-
minBlock = (DoubleBlock) Block.readTypedBlock(blockStreamInput);
248-
maxBlock = (DoubleBlock) Block.readTypedBlock(blockStreamInput);
249-
sumBlock = (DoubleBlock) Block.readTypedBlock(blockStreamInput);
250-
countBlock = (IntBlock) Block.readTypedBlock(blockStreamInput);
251-
AggregateMetricDoubleBlock result = new AggregateMetricDoubleBlock(minBlock, maxBlock, sumBlock, countBlock);
252-
success = true;
253-
return result;
254-
} finally {
255-
if (success == false) {
256-
Releasables.close(minBlock, maxBlock, sumBlock, countBlock);
257-
}
258-
}
259-
}
260-
261272
public DoubleBlock minBlock() {
262273
return minBlock;
263274
}

x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/data/Block.java

Lines changed: 10 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -345,13 +345,11 @@ static Block[] buildAll(Block.Builder... builders) {
345345
*/
346346
static void writeTypedBlock(Block block, StreamOutput out) throws IOException {
347347
if (out.getTransportVersion().before(TransportVersions.AGGREGATE_METRIC_DOUBLE_BLOCK)
348-
&& block.elementType() == ElementType.AGGREGATE_METRIC_DOUBLE) {
349-
ElementType.COMPOSITE.writeTo(out);
350-
CompositeBlock.fromAggregateMetricDoubleBlock((AggregateMetricDoubleBlock) block).writeTo(out);
351-
} else {
352-
block.elementType().writeTo(out);
353-
block.writeTo(out);
348+
&& block instanceof AggregateMetricDoubleBlock aggregateMetricDoubleBlock) {
349+
block = AggregateMetricDoubleBlock.toCompositeBlock(aggregateMetricDoubleBlock);
354350
}
351+
block.elementType().writeTo(out);
352+
block.writeTo(out);
355353
}
356354

357355
/**
@@ -360,7 +358,12 @@ static void writeTypedBlock(Block block, StreamOutput out) throws IOException {
360358
*/
361359
static Block readTypedBlock(BlockStreamInput in) throws IOException {
362360
ElementType elementType = ElementType.readFrom(in);
363-
return elementType.reader.readBlock(in);
361+
Block block = elementType.reader.readBlock(in);
362+
if (in.getTransportVersion().before(TransportVersions.AGGREGATE_METRIC_DOUBLE_BLOCK)
363+
&& block instanceof CompositeBlock compositeBlock) {
364+
block = AggregateMetricDoubleBlock.fromCompositeBlock(compositeBlock);
365+
}
366+
return block;
364367
}
365368

366369
/**

x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/data/CompositeBlock.java

Lines changed: 0 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -167,15 +167,6 @@ public CompositeBlock filter(int... positions) {
167167
}
168168
}
169169

170-
public static CompositeBlock fromAggregateMetricDoubleBlock(AggregateMetricDoubleBlock block) {
171-
final Block[] blocks = new Block[4];
172-
blocks[AggregateMetricDoubleBlockBuilder.Metric.MIN.getIndex()] = block.minBlock();
173-
blocks[AggregateMetricDoubleBlockBuilder.Metric.MAX.getIndex()] = block.maxBlock();
174-
blocks[AggregateMetricDoubleBlockBuilder.Metric.SUM.getIndex()] = block.sumBlock();
175-
blocks[AggregateMetricDoubleBlockBuilder.Metric.COUNT.getIndex()] = block.countBlock();
176-
return new CompositeBlock(blocks);
177-
}
178-
179170
@Override
180171
public Block keepMask(BooleanVector mask) {
181172
CompositeBlock result = null;

x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/operator/topn/ValueExtractor.java

Lines changed: 1 addition & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -11,7 +11,6 @@
1111
import org.elasticsearch.compute.data.Block;
1212
import org.elasticsearch.compute.data.BooleanBlock;
1313
import org.elasticsearch.compute.data.BytesRefBlock;
14-
import org.elasticsearch.compute.data.CompositeBlock;
1514
import org.elasticsearch.compute.data.DocBlock;
1615
import org.elasticsearch.compute.data.DoubleBlock;
1716
import org.elasticsearch.compute.data.ElementType;
@@ -27,17 +26,12 @@ interface ValueExtractor {
2726
void writeValue(BreakingBytesRefBuilder values, int position);
2827

2928
static ValueExtractor extractorFor(ElementType elementType, TopNEncoder encoder, boolean inKey, Block block) {
30-
if (false == (elementType == block.elementType()
31-
|| ElementType.NULL == block.elementType()
32-
|| ElementType.COMPOSITE == block.elementType())) {
29+
if (false == (elementType == block.elementType() || ElementType.NULL == block.elementType())) {
3330
// While this maybe should be an IllegalArgumentException, it's important to throw an exception that causes a 500 response.
3431
// If we reach here, that's a bug. Arguably, the operators are in an illegal state because the layout doesn't match the
3532
// actual pages.
3633
throw new IllegalStateException("Expected [" + elementType + "] but was [" + block.elementType() + "]");
3734
}
38-
if (elementType == ElementType.AGGREGATE_METRIC_DOUBLE && block.elementType() == ElementType.COMPOSITE) {
39-
block = AggregateMetricDoubleBlock.fromCompositeBlock((CompositeBlock) block);
40-
}
4135
return switch (block.elementType()) {
4236
case BOOLEAN -> ValueExtractorForBoolean.extractorFor(encoder, inKey, (BooleanBlock) block);
4337
case BYTES_REF -> ValueExtractorForBytesRef.extractorFor(encoder, inKey, (BytesRefBlock) block);

x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/action/ResponseXContentUtils.java

Lines changed: 0 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -10,13 +10,10 @@
1010
import org.apache.lucene.util.BytesRef;
1111
import org.elasticsearch.common.collect.Iterators;
1212
import org.elasticsearch.common.xcontent.ChunkedToXContentHelper;
13-
import org.elasticsearch.compute.data.AggregateMetricDoubleBlock;
1413
import org.elasticsearch.compute.data.Block;
15-
import org.elasticsearch.compute.data.CompositeBlock;
1614
import org.elasticsearch.compute.data.Page;
1715
import org.elasticsearch.xcontent.ToXContent;
1816
import org.elasticsearch.xpack.core.esql.action.ColumnInfo;
19-
import org.elasticsearch.xpack.esql.core.type.DataType;
2017

2118
import java.util.Collections;
2219
import java.util.Iterator;
@@ -108,9 +105,6 @@ static Iterator<? extends ToXContent> rowValues(List<ColumnInfoImpl> columns, Li
108105
final PositionToXContent[] toXContents = new PositionToXContent[columnCount];
109106
for (int column = 0; column < columnCount; column++) {
110107
Block block = page.getBlock(column);
111-
if (columns.get(column).type() == DataType.AGGREGATE_METRIC_DOUBLE && block instanceof CompositeBlock compositeBlock) {
112-
block = AggregateMetricDoubleBlock.fromCompositeBlock(compositeBlock);
113-
}
114108
toXContents[column] = PositionToXContent.positionToXContent(columns.get(column), block, scratch);
115109
}
116110
return Iterators.forRange(0, page.getPositionCount(), position -> (builder, params) -> {

x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/expression/function/scalar/convert/FromAggregateMetricDouble.java

Lines changed: 3 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -13,7 +13,6 @@
1313
import org.elasticsearch.compute.data.AggregateMetricDoubleBlock;
1414
import org.elasticsearch.compute.data.AggregateMetricDoubleBlockBuilder;
1515
import org.elasticsearch.compute.data.Block;
16-
import org.elasticsearch.compute.data.CompositeBlock;
1716
import org.elasticsearch.compute.data.Page;
1817
import org.elasticsearch.compute.operator.DriverContext;
1918
import org.elasticsearch.compute.operator.EvalOperator;
@@ -146,10 +145,9 @@ public Block eval(Page page) {
146145
return block;
147146
}
148147
try {
149-
AggregateMetricDoubleBlock aggBlock = block instanceof AggregateMetricDoubleBlock
150-
? (AggregateMetricDoubleBlock) block
151-
: AggregateMetricDoubleBlock.fromCompositeBlock((CompositeBlock) block);
152-
Block resultBlock = aggBlock.getMetricBlock(((Number) subfieldIndex.fold(FoldContext.small())).intValue());
148+
Block resultBlock = ((AggregateMetricDoubleBlock) block).getMetricBlock(
149+
((Number) subfieldIndex.fold(FoldContext.small())).intValue()
150+
);
153151
resultBlock.incRef();
154152
return resultBlock;
155153
} finally {

x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/expression/function/scalar/convert/ToStringFromAggregateMetricDoubleEvaluator.java

Lines changed: 1 addition & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -11,7 +11,6 @@
1111
import org.elasticsearch.compute.data.AggregateMetricDoubleBlock;
1212
import org.elasticsearch.compute.data.Block;
1313
import org.elasticsearch.compute.data.BytesRefBlock;
14-
import org.elasticsearch.compute.data.CompositeBlock;
1514
import org.elasticsearch.compute.data.Vector;
1615
import org.elasticsearch.compute.operator.DriverContext;
1716
import org.elasticsearch.compute.operator.EvalOperator;
@@ -44,9 +43,7 @@ private static BytesRef evalValue(AggregateMetricDoubleBlock aggBlock, int index
4443

4544
@Override
4645
public Block evalBlock(Block b) {
47-
AggregateMetricDoubleBlock block = b instanceof AggregateMetricDoubleBlock
48-
? (AggregateMetricDoubleBlock) b
49-
: AggregateMetricDoubleBlock.fromCompositeBlock((CompositeBlock) b);
46+
AggregateMetricDoubleBlock block = (AggregateMetricDoubleBlock) b;
5047
int positionCount = block.getPositionCount();
5148
try (BytesRefBlock.Builder builder = driverContext.blockFactory().newBytesRefBlockBuilder(positionCount)) {
5249
for (int p = 0; p < positionCount; p++) {

0 commit comments

Comments
 (0)