Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions docs/changelog/131541.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
pr: 131541
summary: Added Sample operator `NamedWritable` to plugin
area: ES|QL
type: bug
issues: []
Original file line number Diff line number Diff line change
Expand Up @@ -330,6 +330,7 @@ static TransportVersion def(int id) {
public static final TransportVersion MAPPINGS_IN_DATA_STREAMS = def(9_112_0_00);
public static final TransportVersion ESQL_SPLIT_ON_BIG_VALUES_9_1 = def(9_112_0_01);
public static final TransportVersion ESQL_FIXED_INDEX_LIKE_9_1 = def(9_112_0_02);
public static final TransportVersion ESQL_SAMPLE_OPERATOR_STATUS_9_1 = def(9_112_0_03);

/*
* STOP! READ THIS FIRST! No, really,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -66,11 +66,11 @@ public String describe() {
private final RandomSamplingQuery.RandomSamplingIterator randomSamplingIterator;
private boolean finished;

private int pagesProcessed = 0;
private int rowsReceived = 0;
private int rowsEmitted = 0;
private long collectNanos;
private long emitNanos;
private int pagesProcessed = 0;
private long rowsReceived = 0;
private long rowsEmitted = 0;

private SampleOperator(double probability, int seed) {
finished = false;
Expand Down Expand Up @@ -109,7 +109,7 @@ private void createOutputPage(Page page) {
final int[] sampledPositions = new int[page.getPositionCount()];
int sampledIdx = 0;
for (int i = randomSamplingIterator.docID(); i - rowsReceived < page.getPositionCount(); i = randomSamplingIterator.nextDoc()) {
sampledPositions[sampledIdx++] = i - rowsReceived;
sampledPositions[sampledIdx++] = Math.toIntExact(i - rowsReceived);
}
if (sampledIdx > 0) {
outputPages.add(page.filter(Arrays.copyOf(sampledPositions, sampledIdx)));
Expand Down Expand Up @@ -167,7 +167,7 @@ public Operator.Status status() {
return new Status(collectNanos, emitNanos, pagesProcessed, rowsReceived, rowsEmitted);
}

private record Status(long collectNanos, long emitNanos, int pagesProcessed, int rowsReceived, int rowsEmitted)
public record Status(long collectNanos, long emitNanos, int pagesProcessed, long rowsReceived, long rowsEmitted)
implements
Operator.Status {

Expand All @@ -178,16 +178,22 @@ private record Status(long collectNanos, long emitNanos, int pagesProcessed, int
);

Status(StreamInput streamInput) throws IOException {
this(streamInput.readVLong(), streamInput.readVLong(), streamInput.readVInt(), streamInput.readVInt(), streamInput.readVInt());
this(
streamInput.readVLong(),
streamInput.readVLong(),
streamInput.readVInt(),
streamInput.readVLong(),
streamInput.readVLong()
);
}

@Override
public void writeTo(StreamOutput out) throws IOException {
out.writeVLong(collectNanos);
out.writeVLong(emitNanos);
out.writeVInt(pagesProcessed);
out.writeVInt(rowsReceived);
out.writeVInt(rowsEmitted);
out.writeVLong(rowsReceived);
out.writeVLong(rowsEmitted);
}

@Override
Expand Down Expand Up @@ -236,7 +242,13 @@ public String toString() {

@Override
public TransportVersion getMinimalSupportedVersion() {
return TransportVersions.ZERO;
assert false : "must not be called when overriding supportsVersion";
throw new UnsupportedOperationException("must not be called when overriding supportsVersion");
}

@Override
public boolean supportsVersion(TransportVersion version) {
return version.onOrAfter(TransportVersions.ESQL_SAMPLE_OPERATOR_STATUS_9_1);
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,72 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License
* 2.0; you may not use this file except in compliance with the Elastic License
* 2.0.
*/

package org.elasticsearch.compute.operator;

import org.elasticsearch.common.Strings;
import org.elasticsearch.common.io.stream.Writeable;
import org.elasticsearch.test.AbstractWireSerializingTestCase;
import org.elasticsearch.test.ESTestCase;

import static org.hamcrest.Matchers.equalTo;

public class SampleOperatorStatusTests extends AbstractWireSerializingTestCase<SampleOperator.Status> {
public static SampleOperator.Status simple() {
return new SampleOperator.Status(500012, 200012, 123, 111, 222);
}

public static String simpleToJson() {
return """
{
"collect_nanos" : 500012,
"collect_time" : "500micros",
"emit_nanos" : 200012,
"emit_time" : "200micros",
"pages_processed" : 123,
"rows_received" : 111,
"rows_emitted" : 222
}""";
}

public void testToXContent() {
assertThat(Strings.toString(simple(), true, true), equalTo(simpleToJson()));
}

@Override
protected Writeable.Reader<SampleOperator.Status> instanceReader() {
return SampleOperator.Status::new;
}

@Override
public SampleOperator.Status createTestInstance() {
return new SampleOperator.Status(
randomNonNegativeLong(),
randomNonNegativeLong(),
randomNonNegativeInt(),
randomNonNegativeLong(),
randomNonNegativeLong()
);
}

@Override
protected SampleOperator.Status mutateInstance(SampleOperator.Status instance) {
long collectNanos = instance.collectNanos();
long emitNanos = instance.emitNanos();
int pagesProcessed = instance.pagesProcessed();
long rowsReceived = instance.rowsReceived();
long rowsEmitted = instance.rowsEmitted();
switch (between(0, 4)) {
case 0 -> collectNanos = randomValueOtherThan(collectNanos, ESTestCase::randomNonNegativeLong);
case 1 -> emitNanos = randomValueOtherThan(emitNanos, ESTestCase::randomNonNegativeLong);
case 2 -> pagesProcessed = randomValueOtherThan(pagesProcessed, ESTestCase::randomNonNegativeInt);
case 3 -> rowsReceived = randomValueOtherThan(rowsReceived, ESTestCase::randomNonNegativeLong);
case 4 -> rowsEmitted = randomValueOtherThan(rowsEmitted, ESTestCase::randomNonNegativeLong);
default -> throw new UnsupportedOperationException();
}
return new SampleOperator.Status(collectNanos, emitNanos, pagesProcessed, rowsReceived, rowsEmitted);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@
import org.elasticsearch.compute.operator.HashAggregationOperator;
import org.elasticsearch.compute.operator.LimitOperator;
import org.elasticsearch.compute.operator.MvExpandOperator;
import org.elasticsearch.compute.operator.SampleOperator;
import org.elasticsearch.compute.operator.exchange.ExchangeService;
import org.elasticsearch.compute.operator.exchange.ExchangeSinkOperator;
import org.elasticsearch.compute.operator.exchange.ExchangeSourceOperator;
Expand Down Expand Up @@ -328,6 +329,7 @@ public List<NamedWriteableRegistry.Entry> getNamedWriteables() {
entries.add(AsyncOperator.Status.ENTRY);
entries.add(EnrichLookupOperator.Status.ENTRY);
entries.add(LookupFromIndexOperator.Status.ENTRY);
entries.add(SampleOperator.Status.ENTRY);
entries.add(ExpressionQueryBuilder.ENTRY);
entries.add(PlanStreamWrapperQueryBuilder.ENTRY);

Expand Down
Loading