Skip to content

Commit b6f876f

Browse files
authored
Serialize big array blocks (#106373)
Similar to ArrayBlocks, this change serializes the underlying structure of BigArrayBlocks.
1 parent 925a9a3 commit b6f876f

File tree

13 files changed

+156
-0
lines changed

13 files changed

+156
-0
lines changed

docs/changelog/106373.yaml

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,5 @@
1+
pr: 106373
2+
summary: Serialize big array blocks
3+
area: ES|QL
4+
type: enhancement
5+
issues: []

server/src/main/java/org/elasticsearch/TransportVersions.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -147,6 +147,7 @@ static TransportVersion def(int id) {
147147
public static final TransportVersion ESQL_EXTENDED_ENRICH_INPUT_TYPE = def(8_607_00_0);
148148
public static final TransportVersion ESQL_SERIALIZE_BIG_VECTOR = def(8_608_00_0);
149149
public static final TransportVersion AGGS_EXCLUDED_DELETED_DOCS = def(8_609_00_0);
150+
public static final TransportVersion ESQL_SERIALIZE_BIG_ARRAY = def(8_610_00_0);
150151

151152
/*
152153
* STOP! READ THIS FIRST! No, really,

x-pack/plugin/esql/compute/src/main/generated-src/org/elasticsearch/compute/data/BooleanBigArrayBlock.java

Lines changed: 25 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -8,9 +8,11 @@
88
package org.elasticsearch.compute.data;
99

1010
import org.apache.lucene.util.RamUsageEstimator;
11+
import org.elasticsearch.common.io.stream.StreamOutput;
1112
import org.elasticsearch.common.util.BitArray;
1213
import org.elasticsearch.core.Releasables;
1314

15+
import java.io.IOException;
1416
import java.util.BitSet;
1517

1618
/**
@@ -54,6 +56,29 @@ private BooleanBigArrayBlock(
5456
: firstValueIndexes[getPositionCount()] == vector.getPositionCount();
5557
}
5658

59+
static BooleanBigArrayBlock readArrayBlock(BlockFactory blockFactory, BlockStreamInput in) throws IOException {
60+
final SubFields sub = new SubFields(blockFactory, in);
61+
BooleanBigArrayVector vector = null;
62+
boolean success = false;
63+
try {
64+
vector = BooleanBigArrayVector.readArrayVector(sub.vectorPositions(), in, blockFactory);
65+
var block = new BooleanBigArrayBlock(vector, sub.positionCount, sub.firstValueIndexes, sub.nullsMask, sub.mvOrdering);
66+
blockFactory.adjustBreaker(block.ramBytesUsed() - vector.ramBytesUsed() - sub.bytesReserved);
67+
success = true;
68+
return block;
69+
} finally {
70+
if (success == false) {
71+
Releasables.close(vector);
72+
blockFactory.adjustBreaker(-sub.bytesReserved);
73+
}
74+
}
75+
}
76+
77+
void writeArrayBlock(StreamOutput out) throws IOException {
78+
writeSubFields(out);
79+
vector.writeArrayVector(vector.getPositionCount(), out);
80+
}
81+
5782
@Override
5883
public BooleanVector asVector() {
5984
return null;

x-pack/plugin/esql/compute/src/main/generated-src/org/elasticsearch/compute/data/BooleanBlock.java

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -55,6 +55,7 @@ private static BooleanBlock readFrom(BlockStreamInput in) throws IOException {
5555
case SERIALIZE_BLOCK_VALUES -> BooleanBlock.readValues(in);
5656
case SERIALIZE_BLOCK_VECTOR -> BooleanVector.readFrom(in.blockFactory(), in).asBlock();
5757
case SERIALIZE_BLOCK_ARRAY -> BooleanArrayBlock.readArrayBlock(in.blockFactory(), in);
58+
case SERIALIZE_BLOCK_BIG_ARRAY -> BooleanBigArrayBlock.readArrayBlock(in.blockFactory(), in);
5859
default -> {
5960
assert false : "invalid block serialization type " + serializationType;
6061
throw new IllegalStateException("invalid serialization type " + serializationType);
@@ -91,6 +92,9 @@ default void writeTo(StreamOutput out) throws IOException {
9192
} else if (version.onOrAfter(TransportVersions.ESQL_SERIALIZE_ARRAY_BLOCK) && this instanceof BooleanArrayBlock b) {
9293
out.writeByte(SERIALIZE_BLOCK_ARRAY);
9394
b.writeArrayBlock(out);
95+
} else if (version.onOrAfter(TransportVersions.ESQL_SERIALIZE_BIG_ARRAY) && this instanceof BooleanBigArrayBlock b) {
96+
out.writeByte(SERIALIZE_BLOCK_BIG_ARRAY);
97+
b.writeArrayBlock(out);
9498
} else {
9599
out.writeByte(SERIALIZE_BLOCK_VALUES);
96100
BooleanBlock.writeValues(this, out);

x-pack/plugin/esql/compute/src/main/generated-src/org/elasticsearch/compute/data/DoubleBigArrayBlock.java

Lines changed: 25 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -8,9 +8,11 @@
88
package org.elasticsearch.compute.data;
99

1010
import org.apache.lucene.util.RamUsageEstimator;
11+
import org.elasticsearch.common.io.stream.StreamOutput;
1112
import org.elasticsearch.common.util.DoubleArray;
1213
import org.elasticsearch.core.Releasables;
1314

15+
import java.io.IOException;
1416
import java.util.BitSet;
1517

1618
/**
@@ -54,6 +56,29 @@ private DoubleBigArrayBlock(
5456
: firstValueIndexes[getPositionCount()] == vector.getPositionCount();
5557
}
5658

59+
static DoubleBigArrayBlock readArrayBlock(BlockFactory blockFactory, BlockStreamInput in) throws IOException {
60+
final SubFields sub = new SubFields(blockFactory, in);
61+
DoubleBigArrayVector vector = null;
62+
boolean success = false;
63+
try {
64+
vector = DoubleBigArrayVector.readArrayVector(sub.vectorPositions(), in, blockFactory);
65+
var block = new DoubleBigArrayBlock(vector, sub.positionCount, sub.firstValueIndexes, sub.nullsMask, sub.mvOrdering);
66+
blockFactory.adjustBreaker(block.ramBytesUsed() - vector.ramBytesUsed() - sub.bytesReserved);
67+
success = true;
68+
return block;
69+
} finally {
70+
if (success == false) {
71+
Releasables.close(vector);
72+
blockFactory.adjustBreaker(-sub.bytesReserved);
73+
}
74+
}
75+
}
76+
77+
void writeArrayBlock(StreamOutput out) throws IOException {
78+
writeSubFields(out);
79+
vector.writeArrayVector(vector.getPositionCount(), out);
80+
}
81+
5782
@Override
5883
public DoubleVector asVector() {
5984
return null;

x-pack/plugin/esql/compute/src/main/generated-src/org/elasticsearch/compute/data/DoubleBlock.java

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -55,6 +55,7 @@ private static DoubleBlock readFrom(BlockStreamInput in) throws IOException {
5555
case SERIALIZE_BLOCK_VALUES -> DoubleBlock.readValues(in);
5656
case SERIALIZE_BLOCK_VECTOR -> DoubleVector.readFrom(in.blockFactory(), in).asBlock();
5757
case SERIALIZE_BLOCK_ARRAY -> DoubleArrayBlock.readArrayBlock(in.blockFactory(), in);
58+
case SERIALIZE_BLOCK_BIG_ARRAY -> DoubleBigArrayBlock.readArrayBlock(in.blockFactory(), in);
5859
default -> {
5960
assert false : "invalid block serialization type " + serializationType;
6061
throw new IllegalStateException("invalid serialization type " + serializationType);
@@ -91,6 +92,9 @@ default void writeTo(StreamOutput out) throws IOException {
9192
} else if (version.onOrAfter(TransportVersions.ESQL_SERIALIZE_ARRAY_BLOCK) && this instanceof DoubleArrayBlock b) {
9293
out.writeByte(SERIALIZE_BLOCK_ARRAY);
9394
b.writeArrayBlock(out);
95+
} else if (version.onOrAfter(TransportVersions.ESQL_SERIALIZE_BIG_ARRAY) && this instanceof DoubleBigArrayBlock b) {
96+
out.writeByte(SERIALIZE_BLOCK_BIG_ARRAY);
97+
b.writeArrayBlock(out);
9498
} else {
9599
out.writeByte(SERIALIZE_BLOCK_VALUES);
96100
DoubleBlock.writeValues(this, out);

x-pack/plugin/esql/compute/src/main/generated-src/org/elasticsearch/compute/data/IntBigArrayBlock.java

Lines changed: 25 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -8,9 +8,11 @@
88
package org.elasticsearch.compute.data;
99

1010
import org.apache.lucene.util.RamUsageEstimator;
11+
import org.elasticsearch.common.io.stream.StreamOutput;
1112
import org.elasticsearch.common.util.IntArray;
1213
import org.elasticsearch.core.Releasables;
1314

15+
import java.io.IOException;
1416
import java.util.BitSet;
1517

1618
/**
@@ -54,6 +56,29 @@ private IntBigArrayBlock(
5456
: firstValueIndexes[getPositionCount()] == vector.getPositionCount();
5557
}
5658

59+
static IntBigArrayBlock readArrayBlock(BlockFactory blockFactory, BlockStreamInput in) throws IOException {
60+
final SubFields sub = new SubFields(blockFactory, in);
61+
IntBigArrayVector vector = null;
62+
boolean success = false;
63+
try {
64+
vector = IntBigArrayVector.readArrayVector(sub.vectorPositions(), in, blockFactory);
65+
var block = new IntBigArrayBlock(vector, sub.positionCount, sub.firstValueIndexes, sub.nullsMask, sub.mvOrdering);
66+
blockFactory.adjustBreaker(block.ramBytesUsed() - vector.ramBytesUsed() - sub.bytesReserved);
67+
success = true;
68+
return block;
69+
} finally {
70+
if (success == false) {
71+
Releasables.close(vector);
72+
blockFactory.adjustBreaker(-sub.bytesReserved);
73+
}
74+
}
75+
}
76+
77+
void writeArrayBlock(StreamOutput out) throws IOException {
78+
writeSubFields(out);
79+
vector.writeArrayVector(vector.getPositionCount(), out);
80+
}
81+
5782
@Override
5883
public IntVector asVector() {
5984
return null;

x-pack/plugin/esql/compute/src/main/generated-src/org/elasticsearch/compute/data/IntBlock.java

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -55,6 +55,7 @@ private static IntBlock readFrom(BlockStreamInput in) throws IOException {
5555
case SERIALIZE_BLOCK_VALUES -> IntBlock.readValues(in);
5656
case SERIALIZE_BLOCK_VECTOR -> IntVector.readFrom(in.blockFactory(), in).asBlock();
5757
case SERIALIZE_BLOCK_ARRAY -> IntArrayBlock.readArrayBlock(in.blockFactory(), in);
58+
case SERIALIZE_BLOCK_BIG_ARRAY -> IntBigArrayBlock.readArrayBlock(in.blockFactory(), in);
5859
default -> {
5960
assert false : "invalid block serialization type " + serializationType;
6061
throw new IllegalStateException("invalid serialization type " + serializationType);
@@ -91,6 +92,9 @@ default void writeTo(StreamOutput out) throws IOException {
9192
} else if (version.onOrAfter(TransportVersions.ESQL_SERIALIZE_ARRAY_BLOCK) && this instanceof IntArrayBlock b) {
9293
out.writeByte(SERIALIZE_BLOCK_ARRAY);
9394
b.writeArrayBlock(out);
95+
} else if (version.onOrAfter(TransportVersions.ESQL_SERIALIZE_BIG_ARRAY) && this instanceof IntBigArrayBlock b) {
96+
out.writeByte(SERIALIZE_BLOCK_BIG_ARRAY);
97+
b.writeArrayBlock(out);
9498
} else {
9599
out.writeByte(SERIALIZE_BLOCK_VALUES);
96100
IntBlock.writeValues(this, out);

x-pack/plugin/esql/compute/src/main/generated-src/org/elasticsearch/compute/data/LongBigArrayBlock.java

Lines changed: 25 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -8,9 +8,11 @@
88
package org.elasticsearch.compute.data;
99

1010
import org.apache.lucene.util.RamUsageEstimator;
11+
import org.elasticsearch.common.io.stream.StreamOutput;
1112
import org.elasticsearch.common.util.LongArray;
1213
import org.elasticsearch.core.Releasables;
1314

15+
import java.io.IOException;
1416
import java.util.BitSet;
1517

1618
/**
@@ -54,6 +56,29 @@ private LongBigArrayBlock(
5456
: firstValueIndexes[getPositionCount()] == vector.getPositionCount();
5557
}
5658

59+
static LongBigArrayBlock readArrayBlock(BlockFactory blockFactory, BlockStreamInput in) throws IOException {
60+
final SubFields sub = new SubFields(blockFactory, in);
61+
LongBigArrayVector vector = null;
62+
boolean success = false;
63+
try {
64+
vector = LongBigArrayVector.readArrayVector(sub.vectorPositions(), in, blockFactory);
65+
var block = new LongBigArrayBlock(vector, sub.positionCount, sub.firstValueIndexes, sub.nullsMask, sub.mvOrdering);
66+
blockFactory.adjustBreaker(block.ramBytesUsed() - vector.ramBytesUsed() - sub.bytesReserved);
67+
success = true;
68+
return block;
69+
} finally {
70+
if (success == false) {
71+
Releasables.close(vector);
72+
blockFactory.adjustBreaker(-sub.bytesReserved);
73+
}
74+
}
75+
}
76+
77+
void writeArrayBlock(StreamOutput out) throws IOException {
78+
writeSubFields(out);
79+
vector.writeArrayVector(vector.getPositionCount(), out);
80+
}
81+
5782
@Override
5883
public LongVector asVector() {
5984
return null;

x-pack/plugin/esql/compute/src/main/generated-src/org/elasticsearch/compute/data/LongBlock.java

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -55,6 +55,7 @@ private static LongBlock readFrom(BlockStreamInput in) throws IOException {
5555
case SERIALIZE_BLOCK_VALUES -> LongBlock.readValues(in);
5656
case SERIALIZE_BLOCK_VECTOR -> LongVector.readFrom(in.blockFactory(), in).asBlock();
5757
case SERIALIZE_BLOCK_ARRAY -> LongArrayBlock.readArrayBlock(in.blockFactory(), in);
58+
case SERIALIZE_BLOCK_BIG_ARRAY -> LongBigArrayBlock.readArrayBlock(in.blockFactory(), in);
5859
default -> {
5960
assert false : "invalid block serialization type " + serializationType;
6061
throw new IllegalStateException("invalid serialization type " + serializationType);
@@ -91,6 +92,9 @@ default void writeTo(StreamOutput out) throws IOException {
9192
} else if (version.onOrAfter(TransportVersions.ESQL_SERIALIZE_ARRAY_BLOCK) && this instanceof LongArrayBlock b) {
9293
out.writeByte(SERIALIZE_BLOCK_ARRAY);
9394
b.writeArrayBlock(out);
95+
} else if (version.onOrAfter(TransportVersions.ESQL_SERIALIZE_BIG_ARRAY) && this instanceof LongBigArrayBlock b) {
96+
out.writeByte(SERIALIZE_BLOCK_BIG_ARRAY);
97+
b.writeArrayBlock(out);
9498
} else {
9599
out.writeByte(SERIALIZE_BLOCK_VALUES);
96100
LongBlock.writeValues(this, out);

0 commit comments

Comments
 (0)