Skip to content

Commit 5b62b70

Browse files
authored
Handle serialization of null blocks in AggregateMetricDoubleBlock (#138539) (#138615)
Backport of #138539 to 9.2 If a sub-block of an aggregate_metric_double block is a ConstantNullBlock, serialization fails because DoubleBlock.readFrom or IntBlock.readFrom are not compatible with ConstantNullBlock.writeTo. Relates #126653 Closes #135863
1 parent 8975916 commit 5b62b70

File tree

6 files changed

+93
-7
lines changed

6 files changed

+93
-7
lines changed

docs/changelog/138539.yaml

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,5 @@
1+
pr: 138539
2+
summary: Handle serialization of null blocks in `AggregateMetricDoubleBlock`
3+
area: ES|QL
4+
type: bug
5+
issues: []
Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1 @@
1+
9227000,9185010
Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1 +1 @@
1-
index_created_transport_version,9185009
1+
aggregate_metric_double_typed_block,9185010
Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1 +1 @@
1-
index_created_transport_version,9221000
1+
aggregate_metric_double_typed_block,9227000

x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/data/AggregateMetricDoubleArrayBlock.java

Lines changed: 19 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,7 @@
77

88
package org.elasticsearch.compute.data;
99

10+
import org.elasticsearch.TransportVersion;
1011
import org.elasticsearch.common.io.stream.StreamInput;
1112
import org.elasticsearch.common.io.stream.StreamOutput;
1213
import org.elasticsearch.common.unit.ByteSizeValue;
@@ -19,6 +20,8 @@
1920
import java.util.stream.Stream;
2021

2122
public final class AggregateMetricDoubleArrayBlock extends AbstractNonThreadSafeRefCounted implements AggregateMetricDoubleBlock {
23+
public static final TransportVersion WRITE_TYPED_BLOCK = TransportVersion.fromName("aggregate_metric_double_typed_block");
24+
2225
private final DoubleBlock minBlock;
2326
private final DoubleBlock maxBlock;
2427
private final DoubleBlock sumBlock;
@@ -236,7 +239,11 @@ public AggregateMetricDoubleBlock expand() {
236239
@Override
237240
public void writeTo(StreamOutput out) throws IOException {
238241
for (Block block : List.of(minBlock, maxBlock, sumBlock, countBlock)) {
239-
block.writeTo(out);
242+
if (out.getTransportVersion().supports(WRITE_TYPED_BLOCK)) {
243+
Block.writeTypedBlock(block, out);
244+
} else {
245+
block.writeTo(out);
246+
}
240247
}
241248
}
242249

@@ -248,10 +255,17 @@ public static Block readFrom(StreamInput in) throws IOException {
248255
IntBlock countBlock = null;
249256
BlockStreamInput blockStreamInput = (BlockStreamInput) in;
250257
try {
251-
minBlock = DoubleBlock.readFrom(blockStreamInput);
252-
maxBlock = DoubleBlock.readFrom(blockStreamInput);
253-
sumBlock = DoubleBlock.readFrom(blockStreamInput);
254-
countBlock = IntBlock.readFrom(blockStreamInput);
258+
if (in.getTransportVersion().supports(WRITE_TYPED_BLOCK)) {
259+
minBlock = (DoubleBlock) Block.readTypedBlock(blockStreamInput);
260+
maxBlock = (DoubleBlock) Block.readTypedBlock(blockStreamInput);
261+
sumBlock = (DoubleBlock) Block.readTypedBlock(blockStreamInput);
262+
countBlock = (IntBlock) Block.readTypedBlock(blockStreamInput);
263+
} else {
264+
minBlock = DoubleBlock.readFrom(blockStreamInput);
265+
maxBlock = DoubleBlock.readFrom(blockStreamInput);
266+
sumBlock = DoubleBlock.readFrom(blockStreamInput);
267+
countBlock = IntBlock.readFrom(blockStreamInput);
268+
}
255269
AggregateMetricDoubleArrayBlock result = new AggregateMetricDoubleArrayBlock(minBlock, maxBlock, sumBlock, countBlock);
256270
success = true;
257271
return result;

x-pack/plugin/esql/compute/src/test/java/org/elasticsearch/compute/data/BlockSerializationTests.java

Lines changed: 66 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -440,6 +440,72 @@ public void testCompositeBlock() throws Exception {
440440
}
441441
}
442442

443+
public void testAggregateMetricDouble() throws IOException {
444+
final int positionCount = randomIntBetween(1, 1000);
445+
DoubleBlock minBlock = (DoubleBlock) RandomBlock.randomBlock(
446+
blockFactory,
447+
randomFrom(ElementType.DOUBLE, ElementType.NULL),
448+
positionCount,
449+
true,
450+
0,
451+
1,
452+
0,
453+
0
454+
).block();
455+
456+
DoubleBlock maxBlock = (DoubleBlock) RandomBlock.randomBlock(
457+
blockFactory,
458+
randomFrom(ElementType.DOUBLE, ElementType.NULL),
459+
positionCount,
460+
true,
461+
0,
462+
1,
463+
0,
464+
0
465+
).block();
466+
467+
DoubleBlock suBlock = (DoubleBlock) RandomBlock.randomBlock(
468+
blockFactory,
469+
randomFrom(ElementType.DOUBLE, ElementType.NULL),
470+
positionCount,
471+
true,
472+
0,
473+
1,
474+
0,
475+
0
476+
).block();
477+
478+
IntBlock countBlock = (IntBlock) RandomBlock.randomBlock(
479+
blockFactory,
480+
randomFrom(ElementType.INT, ElementType.NULL),
481+
positionCount,
482+
true,
483+
0,
484+
1,
485+
0,
486+
0
487+
).block();
488+
489+
try (var origBlock = new AggregateMetricDoubleArrayBlock(minBlock, maxBlock, suBlock, countBlock)) {
490+
try (
491+
AggregateMetricDoubleBlock deserBlock = serializeDeserializeBlockWithVersion(
492+
origBlock,
493+
TransportVersionUtils.randomVersionBetween(
494+
random(),
495+
AggregateMetricDoubleArrayBlock.WRITE_TYPED_BLOCK,
496+
TransportVersion.current()
497+
)
498+
)
499+
) {
500+
assertThat(deserBlock.minBlock(), equalTo(minBlock));
501+
assertThat(deserBlock.minBlock(), equalTo(minBlock));
502+
assertThat(deserBlock.minBlock(), equalTo(minBlock));
503+
assertThat(deserBlock.minBlock(), equalTo(minBlock));
504+
EqualsHashCodeTestUtils.checkEqualsAndHashCode(deserBlock, unused -> deserBlock);
505+
}
506+
}
507+
}
508+
443509
static BytesRef randomBytesRef() {
444510
return new BytesRef(randomAlphaOfLengthBetween(0, 10));
445511
}

0 commit comments

Comments
 (0)