Skip to content

Commit 502d522

Browse files
committed
Compact serialization of blocks
1 parent 6f0756a commit 502d522

File tree

30 files changed

+131
-229
lines changed

30 files changed

+131
-229
lines changed

server/src/main/java/org/elasticsearch/TransportVersions.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -184,6 +184,7 @@ static TransportVersion def(int id) {
184184
public static final TransportVersion INCLUDE_INDEX_MODE_IN_GET_DATA_STREAM = def(9_023_0_00);
185185
public static final TransportVersion MAX_OPERATION_SIZE_REJECTIONS_ADDED = def(9_024_0_00);
186186
public static final TransportVersion RETRY_ILM_ASYNC_ACTION_REQUIRE_ERROR = def(9_025_0_00);
187+
public static final TransportVersion ESQL_SERIALIZE_BLOCK_TYPE_CODE = def(9_026_0_00);
187188

188189
/*
189190
* STOP! READ THIS FIRST! No, really,

x-pack/plugin/esql/compute/src/main/generated-src/org/elasticsearch/compute/data/BooleanBlock.java

Lines changed: 0 additions & 13 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

x-pack/plugin/esql/compute/src/main/generated-src/org/elasticsearch/compute/data/BytesRefBlock.java

Lines changed: 0 additions & 13 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

x-pack/plugin/esql/compute/src/main/generated-src/org/elasticsearch/compute/data/DoubleBlock.java

Lines changed: 0 additions & 13 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

x-pack/plugin/esql/compute/src/main/generated-src/org/elasticsearch/compute/data/FloatBlock.java

Lines changed: 0 additions & 13 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

x-pack/plugin/esql/compute/src/main/generated-src/org/elasticsearch/compute/data/IntBlock.java

Lines changed: 0 additions & 13 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

x-pack/plugin/esql/compute/src/main/generated-src/org/elasticsearch/compute/data/LongBlock.java

Lines changed: 0 additions & 13 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/data/Block.java

Lines changed: 29 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -9,14 +9,17 @@
99

1010
import org.apache.lucene.util.Accountable;
1111
import org.apache.lucene.util.RamUsageEstimator;
12-
import org.elasticsearch.common.io.stream.NamedWriteable;
12+
import org.elasticsearch.common.io.stream.StreamOutput;
13+
import org.elasticsearch.common.io.stream.Writeable;
1314
import org.elasticsearch.common.unit.ByteSizeValue;
1415
import org.elasticsearch.core.RefCounted;
1516
import org.elasticsearch.core.Releasable;
1617
import org.elasticsearch.core.ReleasableIterator;
1718
import org.elasticsearch.core.Releasables;
1819
import org.elasticsearch.index.mapper.BlockLoader;
1920

21+
import java.io.IOException;
22+
2023
/**
2124
* A Block is a columnar representation of homogenous data. It has a position (row) count, and
2225
* various data retrieval methods for accessing the underlying data that is stored at a given
@@ -35,7 +38,7 @@
3538
* <p> Block are immutable and can be passed between threads as long as no two threads hold a reference to
3639
* the same block at the same time.
3740
*/
38-
public interface Block extends Accountable, BlockLoader.Block, NamedWriteable, RefCounted, Releasable {
41+
public interface Block extends Accountable, BlockLoader.Block, Writeable, RefCounted, Releasable {
3942
/**
4043
* The maximum number of values that can be added to one position via lookup.
4144
* TODO maybe make this everywhere?
@@ -329,6 +332,30 @@ static Block[] buildAll(Block.Builder... builders) {
329332
}
330333
}
331334

335+
/**
336+
* Writes only the data of the block to a stream output.
337+
* This method should be used when the type of the block is known during reading.
338+
*/
339+
void writeTo(StreamOutput out) throws IOException;
340+
341+
/**
342+
* Writes the type of the block followed by the block data to a stream output.
343+
* This should be paired with {@link #readTypedBlock(BlockStreamInput)}
344+
*/
345+
static void writeTypedBlock(Block block, StreamOutput out) throws IOException {
346+
block.elementType().writeTo(out);
347+
block.writeTo(out);
348+
}
349+
350+
/**
351+
* Reads the block type and then the block data from a stream input
352+
* This should be paired with {@link #writeTypedBlock(Block, StreamOutput)}
353+
*/
354+
static Block readTypedBlock(BlockStreamInput in) throws IOException {
355+
ElementType elementType = ElementType.readFrom(in);
356+
return elementType.reader.readBlock(in);
357+
}
358+
332359
/**
333360
* Serialization type for blocks: 0 and 1 replace false/true used in pre-8.14
334361
*/

x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/data/BlockWritables.java

Lines changed: 0 additions & 28 deletions
This file was deleted.

x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/data/CompositeBlock.java

Lines changed: 3 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -8,7 +8,6 @@
88
package org.elasticsearch.compute.data;
99

1010
import org.apache.lucene.util.Accountable;
11-
import org.elasticsearch.common.io.stream.NamedWriteableRegistry;
1211
import org.elasticsearch.common.io.stream.StreamInput;
1312
import org.elasticsearch.common.io.stream.StreamOutput;
1413
import org.elasticsearch.common.unit.ByteSizeValue;
@@ -42,8 +41,6 @@ public CompositeBlock(Block[] blocks) {
4241
}
4342
}
4443

45-
static NamedWriteableRegistry.Entry ENTRY = new NamedWriteableRegistry.Entry(Block.class, "CompositeBlock", CompositeBlock::readFrom);
46-
4744
@Override
4845
public Vector asVector() {
4946
return null;
@@ -208,18 +205,14 @@ public long ramBytesUsed() {
208205
return Arrays.stream(blocks).mapToLong(Accountable::ramBytesUsed).sum();
209206
}
210207

211-
@Override
212-
public String getWriteableName() {
213-
return "CompositeBlock";
214-
}
215-
216208
static Block readFrom(StreamInput in) throws IOException {
217209
final int numBlocks = in.readVInt();
218210
boolean success = false;
219211
final Block[] blocks = new Block[numBlocks];
212+
BlockStreamInput blockStreamInput = (BlockStreamInput) in;
220213
try {
221214
for (int b = 0; b < numBlocks; b++) {
222-
blocks[b] = in.readNamedWriteable(Block.class);
215+
blocks[b] = Block.readTypedBlock(blockStreamInput);
223216
}
224217
CompositeBlock result = new CompositeBlock(blocks);
225218
success = true;
@@ -235,7 +228,7 @@ static Block readFrom(StreamInput in) throws IOException {
235228
public void writeTo(StreamOutput out) throws IOException {
236229
out.writeVInt(blocks.length);
237230
for (Block block : blocks) {
238-
out.writeNamedWriteable(block);
231+
Block.writeTypedBlock(block, out);
239232
}
240233
}
241234

0 commit comments

Comments
 (0)