From d4dd90715b553da4f097727251e71041eea91d72 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Iv=C3=A1n=20Cea=20Fontenla?= Date: Mon, 21 Jul 2025 14:27:47 +0200 Subject: [PATCH] ESQL: Added Sample operator NamedWritable to plugin (#131541) `SampleOperator.Status` wasn't declared as a NamedWritable by the plugin, leading to serialization errors when `SAMPLE` is used with `profile: true`. It leads to an `IllegalArgumentException: Unknown NamedWriteable [org.elasticsearch.compute.operator.Operator$Status][sample]` Profiles will be tested in this PR: https://github.com/elastic/elasticsearch/pull/131474, that's currently failing because of this bug --- docs/changelog/131541.yaml | 5 ++ .../org/elasticsearch/TransportVersions.java | 1 + .../compute/operator/SampleOperator.java | 30 +++++--- .../operator/SampleOperatorStatusTests.java | 72 +++++++++++++++++++ .../xpack/esql/plugin/EsqlPlugin.java | 2 + 5 files changed, 101 insertions(+), 9 deletions(-) create mode 100644 docs/changelog/131541.yaml create mode 100644 x-pack/plugin/esql/compute/src/test/java/org/elasticsearch/compute/operator/SampleOperatorStatusTests.java diff --git a/docs/changelog/131541.yaml b/docs/changelog/131541.yaml new file mode 100644 index 0000000000000..5c658c194385a --- /dev/null +++ b/docs/changelog/131541.yaml @@ -0,0 +1,5 @@ +pr: 131541 +summary: Added Sample operator `NamedWritable` to plugin +area: ES|QL +type: bug +issues: [] diff --git a/server/src/main/java/org/elasticsearch/TransportVersions.java b/server/src/main/java/org/elasticsearch/TransportVersions.java index 3e6ff54140700..9720564a7a520 100644 --- a/server/src/main/java/org/elasticsearch/TransportVersions.java +++ b/server/src/main/java/org/elasticsearch/TransportVersions.java @@ -330,6 +330,7 @@ static TransportVersion def(int id) { public static final TransportVersion MAPPINGS_IN_DATA_STREAMS = def(9_112_0_00); public static final TransportVersion ESQL_SPLIT_ON_BIG_VALUES_9_1 = def(9_112_0_01); public static final TransportVersion ESQL_FIXED_INDEX_LIKE_9_1 = def(9_112_0_02); + public static final TransportVersion ESQL_SAMPLE_OPERATOR_STATUS_9_1 = def(9_112_0_03); /* * STOP! READ THIS FIRST! No, really, diff --git a/x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/operator/SampleOperator.java b/x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/operator/SampleOperator.java index 56ba95f66f5fa..47d0be6c314e5 100644 --- a/x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/operator/SampleOperator.java +++ b/x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/operator/SampleOperator.java @@ -66,11 +66,11 @@ public String describe() { private final RandomSamplingQuery.RandomSamplingIterator randomSamplingIterator; private boolean finished; - private int pagesProcessed = 0; - private int rowsReceived = 0; - private int rowsEmitted = 0; private long collectNanos; private long emitNanos; + private int pagesProcessed = 0; + private long rowsReceived = 0; + private long rowsEmitted = 0; private SampleOperator(double probability, int seed) { finished = false; @@ -109,7 +109,7 @@ private void createOutputPage(Page page) { final int[] sampledPositions = new int[page.getPositionCount()]; int sampledIdx = 0; for (int i = randomSamplingIterator.docID(); i - rowsReceived < page.getPositionCount(); i = randomSamplingIterator.nextDoc()) { - sampledPositions[sampledIdx++] = i - rowsReceived; + sampledPositions[sampledIdx++] = Math.toIntExact(i - rowsReceived); } if (sampledIdx > 0) { outputPages.add(page.filter(Arrays.copyOf(sampledPositions, sampledIdx))); @@ -167,7 +167,7 @@ public Operator.Status status() { return new Status(collectNanos, emitNanos, pagesProcessed, rowsReceived, rowsEmitted); } - private record Status(long collectNanos, long emitNanos, int pagesProcessed, int rowsReceived, int rowsEmitted) + public record Status(long collectNanos, long emitNanos, int pagesProcessed, long rowsReceived, long rowsEmitted) implements Operator.Status { @@ -178,7 +178,13 @@ private record Status(long collectNanos, long emitNanos, int pagesProcessed, int ); Status(StreamInput streamInput) throws IOException { - this(streamInput.readVLong(), streamInput.readVLong(), streamInput.readVInt(), streamInput.readVInt(), streamInput.readVInt()); + this( + streamInput.readVLong(), + streamInput.readVLong(), + streamInput.readVInt(), + streamInput.readVLong(), + streamInput.readVLong() + ); } @Override @@ -186,8 +192,8 @@ public void writeTo(StreamOutput out) throws IOException { out.writeVLong(collectNanos); out.writeVLong(emitNanos); out.writeVInt(pagesProcessed); - out.writeVInt(rowsReceived); - out.writeVInt(rowsEmitted); + out.writeVLong(rowsReceived); + out.writeVLong(rowsEmitted); } @Override @@ -236,7 +242,13 @@ public String toString() { @Override public TransportVersion getMinimalSupportedVersion() { - return TransportVersions.ZERO; + assert false : "must not be called when overriding supportsVersion"; + throw new UnsupportedOperationException("must not be called when overriding supportsVersion"); + } + + @Override + public boolean supportsVersion(TransportVersion version) { + return version.onOrAfter(TransportVersions.ESQL_SAMPLE_OPERATOR_STATUS_9_1); } } } diff --git a/x-pack/plugin/esql/compute/src/test/java/org/elasticsearch/compute/operator/SampleOperatorStatusTests.java b/x-pack/plugin/esql/compute/src/test/java/org/elasticsearch/compute/operator/SampleOperatorStatusTests.java new file mode 100644 index 0000000000000..50f3f456f3745 --- /dev/null +++ b/x-pack/plugin/esql/compute/src/test/java/org/elasticsearch/compute/operator/SampleOperatorStatusTests.java @@ -0,0 +1,72 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License + * 2.0; you may not use this file except in compliance with the Elastic License + * 2.0. + */ + +package org.elasticsearch.compute.operator; + +import org.elasticsearch.common.Strings; +import org.elasticsearch.common.io.stream.Writeable; +import org.elasticsearch.test.AbstractWireSerializingTestCase; +import org.elasticsearch.test.ESTestCase; + +import static org.hamcrest.Matchers.equalTo; + +public class SampleOperatorStatusTests extends AbstractWireSerializingTestCase { + public static SampleOperator.Status simple() { + return new SampleOperator.Status(500012, 200012, 123, 111, 222); + } + + public static String simpleToJson() { + return """ + { + "collect_nanos" : 500012, + "collect_time" : "500micros", + "emit_nanos" : 200012, + "emit_time" : "200micros", + "pages_processed" : 123, + "rows_received" : 111, + "rows_emitted" : 222 + }"""; + } + + public void testToXContent() { + assertThat(Strings.toString(simple(), true, true), equalTo(simpleToJson())); + } + + @Override + protected Writeable.Reader instanceReader() { + return SampleOperator.Status::new; + } + + @Override + public SampleOperator.Status createTestInstance() { + return new SampleOperator.Status( + randomNonNegativeLong(), + randomNonNegativeLong(), + randomNonNegativeInt(), + randomNonNegativeLong(), + randomNonNegativeLong() + ); + } + + @Override + protected SampleOperator.Status mutateInstance(SampleOperator.Status instance) { + long collectNanos = instance.collectNanos(); + long emitNanos = instance.emitNanos(); + int pagesProcessed = instance.pagesProcessed(); + long rowsReceived = instance.rowsReceived(); + long rowsEmitted = instance.rowsEmitted(); + switch (between(0, 4)) { + case 0 -> collectNanos = randomValueOtherThan(collectNanos, ESTestCase::randomNonNegativeLong); + case 1 -> emitNanos = randomValueOtherThan(emitNanos, ESTestCase::randomNonNegativeLong); + case 2 -> pagesProcessed = randomValueOtherThan(pagesProcessed, ESTestCase::randomNonNegativeInt); + case 3 -> rowsReceived = randomValueOtherThan(rowsReceived, ESTestCase::randomNonNegativeLong); + case 4 -> rowsEmitted = randomValueOtherThan(rowsEmitted, ESTestCase::randomNonNegativeLong); + default -> throw new UnsupportedOperationException(); + } + return new SampleOperator.Status(collectNanos, emitNanos, pagesProcessed, rowsReceived, rowsEmitted); + } +} diff --git a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plugin/EsqlPlugin.java b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plugin/EsqlPlugin.java index 7cba5eeb56278..1374b38c15999 100644 --- a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plugin/EsqlPlugin.java +++ b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plugin/EsqlPlugin.java @@ -33,6 +33,7 @@ import org.elasticsearch.compute.operator.HashAggregationOperator; import org.elasticsearch.compute.operator.LimitOperator; import org.elasticsearch.compute.operator.MvExpandOperator; +import org.elasticsearch.compute.operator.SampleOperator; import org.elasticsearch.compute.operator.exchange.ExchangeService; import org.elasticsearch.compute.operator.exchange.ExchangeSinkOperator; import org.elasticsearch.compute.operator.exchange.ExchangeSourceOperator; @@ -328,6 +329,7 @@ public List getNamedWriteables() { entries.add(AsyncOperator.Status.ENTRY); entries.add(EnrichLookupOperator.Status.ENTRY); entries.add(LookupFromIndexOperator.Status.ENTRY); + entries.add(SampleOperator.Status.ENTRY); entries.add(ExpressionQueryBuilder.ENTRY); entries.add(PlanStreamWrapperQueryBuilder.ENTRY);