Skip to content

Commit 6066ac4

Browse files
authored
ESQL: Fix AsyncOperator status values and add emitted rows (#132738)
Some fixes around `AsyncOperator.Status`: - Fix its pages received/emitted, that were -1 of what they should be (And removed the "Max(0, ...)" to avoid falsifying it - Added the `emitted_rows` field to `LookupFromIndexOperator.Status`, which was missing, and would add interesting data on join results - Added better testing support for Status checks after a Driver run
1 parent 4bcc353 commit 6066ac4

File tree

17 files changed

+190
-71
lines changed

17 files changed

+190
-71
lines changed

docs/changelog/132738.yaml

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,5 @@
1+
pr: 132738
2+
summary: Fix `AsyncOperator` status values and add emitted rows
3+
area: ES|QL
4+
type: bug
5+
issues: []

server/src/main/java/org/elasticsearch/TransportVersions.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -367,6 +367,7 @@ static TransportVersion def(int id) {
367367
public static final TransportVersion RESOLVE_INDEX_MODE_ADDED = def(9_141_0_00);
368368
public static final TransportVersion DATA_STREAM_WRITE_INDEX_ONLY_SETTINGS = def(9_142_0_00);
369369
public static final TransportVersion SCRIPT_RESCORER = def(9_143_0_00);
370+
public static final TransportVersion ESQL_LOOKUP_OPERATOR_EMITTED_ROWS = def(9_144_0_00);
370371

371372
/*
372373
* STOP! READ THIS FIRST! No, really,

x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/operator/AsyncOperator.java

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -238,7 +238,7 @@ public IsBlockedResult isBlocked() {
238238

239239
@Override
240240
public final Operator.Status status() {
241-
return status(Math.max(0L, checkpoint.getMaxSeqNo()), Math.max(0L, checkpoint.getProcessedCheckpoint()), processNanos.sum());
241+
return status(checkpoint.getMaxSeqNo() + 1, checkpoint.getProcessedCheckpoint() + 1, processNanos.sum());
242242
}
243243

244244
protected Operator.Status status(long receivedPages, long completedPages, long processNanos) {
@@ -289,7 +289,7 @@ public long completedPages() {
289289
return completedPages;
290290
}
291291

292-
public long procesNanos() {
292+
public long processNanos() {
293293
return processNanos;
294294
}
295295

@@ -310,8 +310,8 @@ protected final XContentBuilder innerToXContent(XContentBuilder builder) throws
310310
if (builder.humanReadable()) {
311311
builder.field("process_time", TimeValue.timeValueNanos(processNanos));
312312
}
313-
builder.field("received_pages", receivedPages);
314-
builder.field("completed_pages", completedPages);
313+
builder.field("pages_received", receivedPages);
314+
builder.field("pages_completed", completedPages);
315315
return builder;
316316
}
317317

x-pack/plugin/esql/compute/src/test/java/org/elasticsearch/compute/operator/AsyncOperatorStatusTests.java

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -39,17 +39,17 @@ protected AsyncOperator.Status mutateInstance(AsyncOperator.Status in) throws IO
3939
case 0 -> new AsyncOperator.Status(
4040
randomValueOtherThan(in.receivedPages(), ESTestCase::randomNonNegativeLong),
4141
in.completedPages(),
42-
in.procesNanos()
42+
in.processNanos()
4343
);
4444
case 1 -> new AsyncOperator.Status(
4545
in.receivedPages(),
4646
randomValueOtherThan(in.completedPages(), ESTestCase::randomNonNegativeLong),
47-
in.procesNanos()
47+
in.processNanos()
4848
);
4949
case 2 -> new AsyncOperator.Status(
5050
in.receivedPages(),
5151
in.completedPages(),
52-
randomValueOtherThan(in.procesNanos(), ESTestCase::randomNonNegativeLong)
52+
randomValueOtherThan(in.processNanos(), ESTestCase::randomNonNegativeLong)
5353
);
5454
default -> throw new AssertionError("unknown ");
5555
};
@@ -62,8 +62,8 @@ public void testToXContent() {
6262
{
6363
"process_nanos" : 10,
6464
"process_time" : "10nanos",
65-
"received_pages" : 100,
66-
"completed_pages" : 50
65+
"pages_received" : 100,
66+
"pages_completed" : 50
6767
}"""));
6868
}
6969
}

x-pack/plugin/esql/compute/src/test/java/org/elasticsearch/compute/operator/ChangePointOperatorTests.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -87,7 +87,7 @@ protected Matcher<String> expectedToStringOfSimple() {
8787
}
8888

8989
@Override
90-
protected void assertEmptyStatus(Map<String, Object> map) {
90+
protected void assertStatus(Map<String, Object> map, List<Page> input, List<Page> output) {
9191
assertThat(map, nullValue());
9292
}
9393
}
Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -16,7 +16,7 @@
1616

1717
import static org.hamcrest.Matchers.equalTo;
1818

19-
public class LimitStatusTests extends AbstractWireSerializingTestCase<LimitOperator.Status> {
19+
public class LimitOperatorStatusTests extends AbstractWireSerializingTestCase<LimitOperator.Status> {
2020
public void testToXContent() {
2121
assertThat(Strings.toString(new LimitOperator.Status(10, 1, 1, 111, 222)), equalTo("""
2222
{"limit":10,"limit_remaining":1,"pages_processed":1,"rows_received":111,"rows_emitted":222}"""));

x-pack/plugin/esql/compute/src/test/java/org/elasticsearch/compute/operator/LimitOperatorTests.java

Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -18,10 +18,13 @@
1818

1919
import java.util.ArrayList;
2020
import java.util.List;
21+
import java.util.Map;
2122
import java.util.concurrent.atomic.AtomicInteger;
2223
import java.util.stream.LongStream;
2324

2425
import static org.elasticsearch.compute.test.RandomBlock.randomElementType;
26+
import static org.elasticsearch.test.MapMatcher.assertMap;
27+
import static org.elasticsearch.test.MapMatcher.matchesMap;
2528
import static org.hamcrest.Matchers.equalTo;
2629
import static org.hamcrest.Matchers.sameInstance;
2730

@@ -192,4 +195,17 @@ Block randomBlock(BlockFactory blockFactory, int size) {
192195
}
193196
return RandomBlock.randomBlock(blockFactory, randomElementType(), size, false, 1, 1, 0, 0).block();
194197
}
198+
199+
@Override
200+
protected final void assertStatus(Map<String, Object> map, List<Page> input, List<Page> output) {
201+
var emittedRows = output.stream().mapToInt(Page::getPositionCount).sum();
202+
203+
var mapMatcher = matchesMap().entry("rows_received", emittedRows)
204+
.entry("pages_processed", output.size())
205+
.entry("rows_emitted", emittedRows)
206+
.entry("limit", 100)
207+
.entry("limit_remaining", 100 - emittedRows);
208+
209+
assertMap(map, mapMatcher);
210+
}
195211
}

x-pack/plugin/esql/compute/src/test/java/org/elasticsearch/compute/operator/OutputOperatorTests.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,7 @@
77

88
package org.elasticsearch.compute.operator;
99

10+
import org.elasticsearch.compute.data.Page;
1011
import org.elasticsearch.compute.test.AnyOperatorTestCase;
1112
import org.hamcrest.Matcher;
1213

@@ -52,7 +53,7 @@ public void testBigDescription() {
5253
}
5354

5455
@Override
55-
protected void assertEmptyStatus(Map<String, Object> map) {
56+
protected void assertStatus(Map<String, Object> map, List<Page> input, List<Page> output) {
5657
assertThat(map, nullValue());
5758
}
5859
}

x-pack/plugin/esql/compute/test/src/main/java/org/elasticsearch/compute/test/AnyOperatorTestCase.java

Lines changed: 40 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,7 @@
1212
import org.elasticsearch.common.xcontent.XContentHelper;
1313
import org.elasticsearch.compute.aggregation.GroupingAggregatorFunction;
1414
import org.elasticsearch.compute.data.BlockFactory;
15+
import org.elasticsearch.compute.data.Page;
1516
import org.elasticsearch.compute.operator.DriverContext;
1617
import org.elasticsearch.compute.operator.Operator;
1718
import org.elasticsearch.compute.operator.SourceOperator;
@@ -23,11 +24,14 @@
2324
import org.hamcrest.Matcher;
2425

2526
import java.io.IOException;
27+
import java.util.List;
2628
import java.util.Map;
2729

2830
import static org.elasticsearch.test.MapMatcher.assertMap;
2931
import static org.elasticsearch.test.MapMatcher.matchesMap;
32+
import static org.hamcrest.Matchers.greaterThanOrEqualTo;
3033
import static org.hamcrest.Matchers.matchesPattern;
34+
import static org.hamcrest.Matchers.notNullValue;
3135

3236
/**
3337
* Superclass for testing any {@link Operator}, including {@link SourceOperator}s.
@@ -106,39 +110,58 @@ public final void testSimpleToString() {
106110
}
107111

108112
/**
109-
* Ensures that the Operator.Status of this operator has the standard fields.
113+
* Ensures that the Operator.Status of this operator has the standard fields, set to 0.
110114
*/
111-
public final void testOperatorStatus() throws IOException {
115+
public void testEmptyOperatorStatus() {
112116
DriverContext driverContext = driverContext();
113117
try (var operator = simple().get(driverContext)) {
114-
Operator.Status status = operator.status();
115-
if (status == null) {
116-
assertEmptyStatus(null);
117-
return;
118-
}
118+
assertOperatorStatus(operator, List.of(), List.of());
119+
}
120+
}
119121

120-
try (var xContentBuilder = XContentBuilder.builder(XContentType.JSON.xContent())) {
121-
status.toXContent(xContentBuilder, ToXContent.EMPTY_PARAMS);
122+
/**
123+
* Extracts and asserts the operator status.
124+
*/
125+
protected final void assertOperatorStatus(Operator operator, List<Page> input, List<Page> output) {
126+
Operator.Status status = operator.status();
122127

123-
var bytesReference = BytesReference.bytes(xContentBuilder);
124-
var map = XContentHelper.convertToMap(bytesReference, false, xContentBuilder.contentType()).v2();
128+
if (status == null) {
129+
assertStatus(null, input, output);
130+
return;
131+
}
125132

126-
assertEmptyStatus(map);
127-
}
133+
var xContent = XContentType.JSON.xContent();
134+
try (var xContentBuilder = XContentBuilder.builder(xContent)) {
135+
status.toXContent(xContentBuilder, ToXContent.EMPTY_PARAMS);
136+
137+
var bytesReference = BytesReference.bytes(xContentBuilder);
138+
var map = XContentHelper.convertToMap(bytesReference, false, xContentBuilder.contentType()).v2();
139+
140+
assertStatus(map, input, output);
141+
} catch (IOException e) {
142+
fail(e, "Failed to convert operator status to XContent");
128143
}
129144
}
130145

131146
/**
132147
* Assert that the status is sane.
148+
* <p>
149+
* This method should be overridden with custom logics and for better assertions and for operators without status.
150+
* </p>
133151
*/
134-
protected void assertEmptyStatus(@Nullable Map<String, Object> map) {
152+
protected void assertStatus(@Nullable Map<String, Object> map, List<Page> input, List<Page> output) {
153+
assertThat(map, notNullValue());
154+
155+
var totalInputRows = input.stream().mapToInt(Page::getPositionCount).sum();
156+
var totalOutputRows = output.stream().mapToInt(Page::getPositionCount).sum();
157+
135158
MapMatcher matcher = matchesMap().extraOk();
136159
if (map.containsKey("pages_processed")) {
137-
matcher = matcher.entry("pages_processed", 0);
160+
matcher = matcher.entry("pages_processed", greaterThanOrEqualTo(0));
138161
} else {
139-
matcher = matcher.entry("pages_received", 0).entry("pages_emitted", 0);
162+
matcher = matcher.entry("pages_received", input.size()).entry("pages_emitted", output.size());
140163
}
141-
matcher = matcher.entry("rows_received", 0).entry("rows_emitted", 0);
164+
matcher = matcher.entry("rows_received", totalInputRows).entry("rows_emitted", totalOutputRows);
142165
assertMap(map, matcher);
143166
}
144167

Lines changed: 38 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,38 @@
1+
/*
2+
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
3+
* or more contributor license agreements. Licensed under the Elastic License
4+
* 2.0; you may not use this file except in compliance with the Elastic License
5+
* 2.0.
6+
*/
7+
8+
package org.elasticsearch.compute.test;
9+
10+
import org.elasticsearch.compute.data.Page;
11+
import org.elasticsearch.test.MapMatcher;
12+
import org.hamcrest.Matcher;
13+
14+
import java.util.List;
15+
import java.util.Map;
16+
17+
import static org.elasticsearch.test.MapMatcher.assertMap;
18+
import static org.elasticsearch.test.MapMatcher.matchesMap;
19+
import static org.hamcrest.Matchers.either;
20+
import static org.hamcrest.Matchers.greaterThanOrEqualTo;
21+
22+
public abstract class AsyncOperatorTestCase extends OperatorTestCase {
23+
@Override
24+
@SuppressWarnings("unchecked")
25+
protected final void assertStatus(Map<String, Object> map, List<Page> input, List<Page> output) {
26+
var mapMatcher = matchesMap().entry("pages_received", input.size())
27+
.entry("pages_completed", input.size())
28+
.entry("process_nanos", either(greaterThanOrEqualTo(0)).or((Matcher<Integer>) (Matcher<?>) greaterThanOrEqualTo(0L)));
29+
30+
mapMatcher = extendStatusMatcher(mapMatcher, input, output);
31+
32+
assertMap(map, mapMatcher);
33+
}
34+
35+
protected MapMatcher extendStatusMatcher(MapMatcher mapMatcher, List<Page> input, List<Page> output) {
36+
return mapMatcher;
37+
}
38+
}

0 commit comments

Comments
 (0)