Skip to content

Commit 508e0bc

Browse files
committed
Added SampleOperator Status serialization test, and changed its rows stats to be longs
1 parent 2bbea48 commit 508e0bc

File tree

2 files changed

+81
-8
lines changed

2 files changed

+81
-8
lines changed

x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/operator/SampleOperator.java

Lines changed: 9 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -66,11 +66,11 @@ public String describe() {
6666
private final RandomSamplingQuery.RandomSamplingIterator randomSamplingIterator;
6767
private boolean finished;
6868

69-
private int pagesProcessed = 0;
70-
private int rowsReceived = 0;
71-
private int rowsEmitted = 0;
7269
private long collectNanos;
7370
private long emitNanos;
71+
private int pagesProcessed = 0;
72+
private long rowsReceived = 0;
73+
private long rowsEmitted = 0;
7474

7575
private SampleOperator(double probability, int seed) {
7676
finished = false;
@@ -109,7 +109,7 @@ private void createOutputPage(Page page) {
109109
final int[] sampledPositions = new int[page.getPositionCount()];
110110
int sampledIdx = 0;
111111
for (int i = randomSamplingIterator.docID(); i - rowsReceived < page.getPositionCount(); i = randomSamplingIterator.nextDoc()) {
112-
sampledPositions[sampledIdx++] = i - rowsReceived;
112+
sampledPositions[sampledIdx++] = Math.toIntExact(i - rowsReceived);
113113
}
114114
if (sampledIdx > 0) {
115115
outputPages.add(page.filter(Arrays.copyOf(sampledPositions, sampledIdx)));
@@ -167,7 +167,7 @@ public Operator.Status status() {
167167
return new Status(collectNanos, emitNanos, pagesProcessed, rowsReceived, rowsEmitted);
168168
}
169169

170-
public record Status(long collectNanos, long emitNanos, int pagesProcessed, int rowsReceived, int rowsEmitted)
170+
public record Status(long collectNanos, long emitNanos, int pagesProcessed, long rowsReceived, long rowsEmitted)
171171
implements
172172
Operator.Status {
173173

@@ -178,16 +178,17 @@ public record Status(long collectNanos, long emitNanos, int pagesProcessed, int
178178
);
179179

180180
Status(StreamInput streamInput) throws IOException {
181-
this(streamInput.readVLong(), streamInput.readVLong(), streamInput.readVInt(), streamInput.readVInt(), streamInput.readVInt());
181+
this(
182+
streamInput.readVLong(), streamInput.readVLong(), streamInput.readVInt(), streamInput.readVLong(), streamInput.readVLong());
182183
}
183184

184185
@Override
185186
public void writeTo(StreamOutput out) throws IOException {
186187
out.writeVLong(collectNanos);
187188
out.writeVLong(emitNanos);
188189
out.writeVInt(pagesProcessed);
189-
out.writeVInt(rowsReceived);
190-
out.writeVInt(rowsEmitted);
190+
out.writeVLong(rowsReceived);
191+
out.writeVLong(rowsEmitted);
191192
}
192193

193194
@Override
Lines changed: 72 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,72 @@
1+
/*
2+
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
3+
* or more contributor license agreements. Licensed under the Elastic License
4+
* 2.0; you may not use this file except in compliance with the Elastic License
5+
* 2.0.
6+
*/
7+
8+
package org.elasticsearch.compute.operator;
9+
10+
import org.elasticsearch.common.Strings;
11+
import org.elasticsearch.common.io.stream.Writeable;
12+
import org.elasticsearch.test.AbstractWireSerializingTestCase;
13+
import org.elasticsearch.test.ESTestCase;
14+
15+
import static org.hamcrest.Matchers.equalTo;
16+
17+
public class SampleOperatorStatusTests extends AbstractWireSerializingTestCase<SampleOperator.Status> {
18+
public static SampleOperator.Status simple() {
19+
return new SampleOperator.Status(500012, 200012, 123, 111, 222);
20+
}
21+
22+
public static String simpleToJson() {
23+
return """
24+
{
25+
"collect_nanos" : 500012,
26+
"collect_time" : "500micros",
27+
"emit_nanos" : 200012,
28+
"emit_time" : "200micros",
29+
"pages_processed" : 123,
30+
"rows_received" : 111,
31+
"rows_emitted" : 222
32+
}""";
33+
}
34+
35+
public void testToXContent() {
36+
assertThat(Strings.toString(simple(), true, true), equalTo(simpleToJson()));
37+
}
38+
39+
@Override
40+
protected Writeable.Reader<SampleOperator.Status> instanceReader() {
41+
return SampleOperator.Status::new;
42+
}
43+
44+
@Override
45+
public SampleOperator.Status createTestInstance() {
46+
return new SampleOperator.Status(
47+
randomNonNegativeLong(),
48+
randomNonNegativeLong(),
49+
randomNonNegativeInt(),
50+
randomNonNegativeLong(),
51+
randomNonNegativeLong()
52+
);
53+
}
54+
55+
@Override
56+
protected SampleOperator.Status mutateInstance(SampleOperator.Status instance) {
57+
long collectNanos = instance.collectNanos();
58+
long emitNanos = instance.emitNanos();
59+
int pagesProcessed = instance.pagesProcessed();
60+
long rowsReceived = instance.rowsReceived();
61+
long rowsEmitted = instance.rowsEmitted();
62+
switch (between(0, 4)) {
63+
case 0 -> collectNanos = randomValueOtherThan(collectNanos, ESTestCase::randomNonNegativeLong);
64+
case 1 -> emitNanos = randomValueOtherThan(emitNanos, ESTestCase::randomNonNegativeLong);
65+
case 2 -> pagesProcessed = randomValueOtherThan(pagesProcessed, ESTestCase::randomNonNegativeInt);
66+
case 3 -> rowsReceived = randomValueOtherThan(rowsReceived, ESTestCase::randomNonNegativeLong);
67+
case 4 -> rowsEmitted = randomValueOtherThan(rowsEmitted, ESTestCase::randomNonNegativeLong);
68+
default -> throw new UnsupportedOperationException();
69+
}
70+
return new SampleOperator.Status(collectNanos, emitNanos, pagesProcessed, rowsReceived, rowsEmitted);
71+
}
72+
}

0 commit comments

Comments
 (0)