Skip to content

Commit 6818965

Browse files
nik9000ywangd
authored andcommitted
ESQL: Claim transport version for partitioning (elastic#127775)
Claims a transport version to backport elastic#125739.
1 parent ed40590 commit 6818965

File tree

2 files changed

+8
-2
lines changed

2 files changed

+8
-2
lines changed

server/src/main/java/org/elasticsearch/TransportVersions.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -173,6 +173,7 @@ static TransportVersion def(int id) {
173173
public static final TransportVersion INTRODUCE_FAILURES_DEFAULT_RETENTION_BACKPORT_8_19 = def(8_841_0_26);
174174
public static final TransportVersion RESCORE_VECTOR_ALLOW_ZERO_BACKPORT_8_19 = def(8_841_0_27);
175175
public static final TransportVersion INFERENCE_ADD_TIMEOUT_PUT_ENDPOINT_8_19 = def(8_841_0_28);
176+
public static final TransportVersion ESQL_REPORT_SHARD_PARTITIONING_8_19 = def(8_841_0_29);
176177
public static final TransportVersion V_9_0_0 = def(9_000_0_09);
177178
public static final TransportVersion INITIAL_ELASTICSEARCH_9_0_1 = def(9_000_0_10);
178179
public static final TransportVersion INITIAL_ELASTICSEARCH_9_0_2 = def(9_000_0_11);

x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/lucene/LuceneOperator.java

Lines changed: 7 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -45,6 +45,7 @@
4545
import java.util.stream.Collectors;
4646

4747
import static org.elasticsearch.TransportVersions.ESQL_REPORT_SHARD_PARTITIONING;
48+
import static org.elasticsearch.TransportVersions.ESQL_REPORT_SHARD_PARTITIONING_8_19;
4849

4950
public abstract class LuceneOperator extends SourceOperator {
5051
private static final Logger logger = LogManager.getLogger(LuceneOperator.class);
@@ -352,7 +353,7 @@ private Status(LuceneOperator operator) {
352353
} else {
353354
rowsEmitted = 0;
354355
}
355-
partitioningStrategies = in.getTransportVersion().onOrAfter(ESQL_REPORT_SHARD_PARTITIONING)
356+
partitioningStrategies = serializeShardPartitioning(in.getTransportVersion())
356357
? in.readMap(LuceneSliceQueue.PartitioningStrategy::readFrom)
357358
: Map.of();
358359
}
@@ -376,11 +377,15 @@ public void writeTo(StreamOutput out) throws IOException {
376377
if (out.getTransportVersion().onOrAfter(TransportVersions.ESQL_PROFILE_ROWS_PROCESSED)) {
377378
out.writeVLong(rowsEmitted);
378379
}
379-
if (out.getTransportVersion().onOrAfter(ESQL_REPORT_SHARD_PARTITIONING)) {
380+
if (serializeShardPartitioning(out.getTransportVersion())) {
380381
out.writeMap(partitioningStrategies, StreamOutput::writeString, StreamOutput::writeWriteable);
381382
}
382383
}
383384

385+
private static boolean serializeShardPartitioning(TransportVersion version) {
386+
return version.onOrAfter(ESQL_REPORT_SHARD_PARTITIONING) || version.isPatchFrom(ESQL_REPORT_SHARD_PARTITIONING_8_19);
387+
}
388+
384389
@Override
385390
public String getWriteableName() {
386391
return ENTRY.name;

0 commit comments

Comments
 (0)