-
Notifications
You must be signed in to change notification settings - Fork 25.7k
ESQL: Fix AsyncOperator status values and add emitted rows #132738
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from 10 commits
283c2d5
010d915
262f488
35d2ca2
916f591
5d74c2d
007fd4a
0da2e4e
55f8804
5535503
ba84ddd
4afdc80
121ce8a
a775efb
5dbbd22
1b5ae7d
bf4aea8
e2de5ed
6a505b0
25ef602
e8e9226
9c64a0b
523d542
b56abca
675d171
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -12,6 +12,8 @@ | |
| import org.elasticsearch.common.xcontent.XContentHelper; | ||
| import org.elasticsearch.compute.aggregation.GroupingAggregatorFunction; | ||
| import org.elasticsearch.compute.data.BlockFactory; | ||
| import org.elasticsearch.compute.data.Page; | ||
| import org.elasticsearch.compute.operator.AsyncOperator; | ||
| import org.elasticsearch.compute.operator.DriverContext; | ||
| import org.elasticsearch.compute.operator.Operator; | ||
| import org.elasticsearch.compute.operator.SinkOperator; | ||
|
|
@@ -20,12 +22,19 @@ | |
| import org.elasticsearch.xcontent.XContentBuilder; | ||
| import org.elasticsearch.xcontent.XContentType; | ||
| import org.hamcrest.Matcher; | ||
| import org.hamcrest.Matchers; | ||
|
|
||
| import java.io.IOException; | ||
| import java.util.List; | ||
| import java.util.Map; | ||
|
|
||
| import static org.hamcrest.Matchers.both; | ||
| import static org.hamcrest.Matchers.comparesEqualTo; | ||
| import static org.hamcrest.Matchers.either; | ||
| import static org.hamcrest.Matchers.hasKey; | ||
| import static org.hamcrest.Matchers.greaterThanOrEqualTo; | ||
| import static org.hamcrest.Matchers.hasEntry; | ||
| import static org.hamcrest.Matchers.instanceOf; | ||
| import static org.hamcrest.Matchers.is; | ||
| import static org.hamcrest.Matchers.matchesPattern; | ||
|
|
||
| /** | ||
|
|
@@ -105,37 +114,107 @@ public final void testSimpleToString() { | |
| } | ||
|
|
||
| /** | ||
| * Ensures that the Operator.Status of this operator has the standard fields. | ||
| * Ensures that the Operator.Status of this operator has the standard fields, set to 0. | ||
| */ | ||
| public void testOperatorStatus() throws IOException { | ||
| public void testEmptyOperatorStatus() { | ||
| DriverContext driverContext = driverContext(); | ||
| try (var operator = simple().get(driverContext)) { | ||
| Operator.Status status = operator.status(); | ||
|
|
||
| assumeTrue("Operator does not provide a status", status != null); | ||
|
|
||
| var xContent = XContentType.JSON.xContent(); | ||
| try (var xContentBuilder = XContentBuilder.builder(xContent)) { | ||
| status.toXContent(xContentBuilder, ToXContent.EMPTY_PARAMS); | ||
|
|
||
| var bytesReference = BytesReference.bytes(xContentBuilder); | ||
| var map = XContentHelper.convertToMap(bytesReference, false, xContentBuilder.contentType()).v2(); | ||
|
|
||
| if (operator instanceof SourceOperator) { | ||
| assertThat(map, hasKey("pages_emitted")); | ||
| assertThat(map, hasKey("rows_emitted")); | ||
| } else if (operator instanceof SinkOperator) { | ||
| assertThat(map, hasKey("pages_received")); | ||
| assertThat(map, hasKey("rows_received")); | ||
| } else { | ||
| assertThat(map, either(hasKey("pages_processed")).or(both(hasKey("pages_received")).and(hasKey("pages_emitted")))); | ||
| assertThat(map, hasKey("rows_received")); | ||
| assertThat(map, hasKey("rows_emitted")); | ||
| } | ||
| assertOperatorStatus(operator, List.of(), List.of()); | ||
| } | ||
| } | ||
|
|
||
| /** | ||
| * Assert the operator has a status, and its values are acceptable. | ||
| * Delegates specific checks to {link #checkOperatorStatusFields}, which may be overridden. | ||
| */ | ||
| protected void assertOperatorStatus(Operator operator, List<Page> input, List<Page> output) { | ||
| Operator.Status status = operator.status(); | ||
|
|
||
| if (status == null) { | ||
| // Operator doesn't provide a status | ||
| return; | ||
| } | ||
|
|
||
| var xContent = XContentType.JSON.xContent(); | ||
| try (var xContentBuilder = XContentBuilder.builder(xContent)) { | ||
| status.toXContent(xContentBuilder, ToXContent.EMPTY_PARAMS); | ||
|
|
||
| var bytesReference = BytesReference.bytes(xContentBuilder); | ||
| var map = XContentHelper.convertToMap(bytesReference, false, xContentBuilder.contentType()).v2(); | ||
|
|
||
| // For some operators, we don't know here if they consumed/processed all the input. | ||
| // That check should be done in the operator test | ||
| var nonNegativeMatcher = input.isEmpty() ? matchNumberEqualTo(0) : matchNumberGreaterThanOrEqualTo(0); | ||
| var inputPagesMatcher = matchNumberEqualTo(input.size()); | ||
| var totalInputRows = input.stream().mapToLong(Page::getPositionCount).sum(); | ||
| var inputRowsMatcher = matchNumberEqualTo(totalInputRows); | ||
| var outputPagesMatcher = matchNumberEqualTo(output.size()); | ||
| var totalOutputRows = output.stream().mapToLong(Page::getPositionCount).sum(); | ||
| var outputRowsMatcher = matchNumberEqualTo(totalOutputRows); | ||
|
|
||
| if (operator instanceof SourceOperator) { | ||
| assertThat(map, hasEntry(is("pages_emitted"), outputPagesMatcher)); | ||
| assertThat(map, hasEntry(is("rows_emitted"), outputRowsMatcher)); | ||
| } else if (operator instanceof SinkOperator) { | ||
| assertThat(map, hasEntry(is("pages_received"), inputPagesMatcher)); | ||
| assertThat(map, hasEntry(is("rows_received"), inputRowsMatcher)); | ||
| } else if (operator instanceof AsyncOperator) { | ||
| assertThat(map, hasEntry(is("received_pages"), nonNegativeMatcher)); | ||
| assertThat(map, hasEntry(is("completed_pages"), nonNegativeMatcher)); | ||
| assertThat(map, hasEntry(is("process_nanos"), nonNegativeMatcher)); | ||
|
||
| } else { | ||
| assertThat( | ||
| map, | ||
| Matchers.<Map<String, Object>>either(hasEntry(is("pages_processed"), nonNegativeMatcher)) | ||
| .or( | ||
| Matchers.<Map<String, Object>>both(hasEntry(is("pages_received"), nonNegativeMatcher)) | ||
| .and(hasEntry(is("pages_emitted"), outputPagesMatcher)) | ||
| ) | ||
| ); | ||
| assertThat(map, hasEntry(is("rows_received"), nonNegativeMatcher)); | ||
| assertThat(map, hasEntry(is("rows_emitted"), outputRowsMatcher)); | ||
| } | ||
|
|
||
| checkOperatorStatusFields(map, input, output); | ||
| } catch (IOException e) { | ||
| fail(e, "Failed to convert operator status to XContent"); | ||
| } | ||
| } | ||
|
|
||
| protected final Matcher<Object> matchNumberEqualTo(Number value) { | ||
| return wholeMatcher(comparesEqualTo(value.intValue()), comparesEqualTo(value.longValue())); | ||
| } | ||
|
|
||
| protected final Matcher<Object> matchNumberGreaterThanOrEqualTo(Number value) { | ||
| return wholeMatcher(greaterThanOrEqualTo(value.intValue()), greaterThanOrEqualTo(value.longValue())); | ||
| } | ||
|
|
||
| @SuppressWarnings("unchecked") | ||
| protected final Matcher<Object> wholeMatcher(Matcher<Integer> integerMatcher, Matcher<Long> longMatcher) { | ||
| return either(both(instanceOf(Integer.class)).and((Matcher<? super Object>) (Matcher<?>) integerMatcher)).or( | ||
| both(instanceOf(Long.class)).and((Matcher<? super Object>) (Matcher<?>) longMatcher) | ||
| ); | ||
| } | ||
|
|
||
| /** | ||
| * Tests the non-standard operator status fields. | ||
| * <p> | ||
| * The standard fields (already tested in the generic test) are: | ||
| * </p> | ||
| * <ul> | ||
| * <li>pages_received</li> | ||
| * <li>rows_received</li> | ||
| * <li>pages_processed</li> | ||
| * <li>pages_emitted</li> | ||
| * <li>rows_emitted</li> | ||
| * </ul> | ||
| * <p> | ||
| * To be overridden by subclasses. | ||
| * </p> | ||
| * @param status The XContent map representation of the status. | ||
| */ | ||
| protected void checkOperatorStatusFields(Map<String, Object> status, List<Page> input, List<Page> output) {} | ||
|
|
||
| /** | ||
| * A {@link DriverContext} with a nonBreakingBigArrays. | ||
| */ | ||
|
|
||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
👍 These are correct usages.