-
Notifications
You must be signed in to change notification settings - Fork 25.7k
ESQL: Fix AsyncOperator status values and add emitted rows #132738
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from 22 commits
283c2d5
010d915
262f488
35d2ca2
916f591
5d74c2d
007fd4a
0da2e4e
55f8804
5535503
ba84ddd
4afdc80
121ce8a
a775efb
5dbbd22
1b5ae7d
bf4aea8
e2de5ed
6a505b0
25ef602
e8e9226
9c64a0b
523d542
b56abca
675d171
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,5 @@ | ||
| pr: 132738 | ||
| summary: Fix `AsyncOperator` status values and add emitted rows | ||
| area: ES|QL | ||
| type: bug | ||
| issues: [] |
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -12,6 +12,7 @@ | |
| import org.elasticsearch.common.xcontent.XContentHelper; | ||
| import org.elasticsearch.compute.aggregation.GroupingAggregatorFunction; | ||
| import org.elasticsearch.compute.data.BlockFactory; | ||
| import org.elasticsearch.compute.data.Page; | ||
| import org.elasticsearch.compute.operator.DriverContext; | ||
| import org.elasticsearch.compute.operator.Operator; | ||
| import org.elasticsearch.compute.operator.SourceOperator; | ||
|
|
@@ -23,11 +24,18 @@ | |
| import org.hamcrest.Matcher; | ||
|
|
||
| import java.io.IOException; | ||
| import java.util.List; | ||
| import java.util.Map; | ||
|
|
||
| import static org.elasticsearch.test.MapMatcher.assertMap; | ||
| import static org.elasticsearch.test.MapMatcher.matchesMap; | ||
| import static org.hamcrest.Matchers.both; | ||
| import static org.hamcrest.Matchers.comparesEqualTo; | ||
| import static org.hamcrest.Matchers.either; | ||
| import static org.hamcrest.Matchers.greaterThanOrEqualTo; | ||
| import static org.hamcrest.Matchers.instanceOf; | ||
| import static org.hamcrest.Matchers.matchesPattern; | ||
| import static org.hamcrest.Matchers.notNullValue; | ||
|
|
||
| /** | ||
| * Superclass for testing any {@link Operator}, including {@link SourceOperator}s. | ||
|
|
@@ -106,42 +114,85 @@ public final void testSimpleToString() { | |
| } | ||
|
|
||
| /** | ||
| * Ensures that the Operator.Status of this operator has the standard fields. | ||
| * Ensures that the Operator.Status of this operator has the standard fields, set to 0. | ||
| */ | ||
| public final void testOperatorStatus() throws IOException { | ||
| public void testEmptyOperatorStatus() { | ||
| DriverContext driverContext = driverContext(); | ||
| try (var operator = simple().get(driverContext)) { | ||
| Operator.Status status = operator.status(); | ||
| if (status == null) { | ||
| assertEmptyStatus(null); | ||
| return; | ||
| } | ||
| assertOperatorStatus(operator, List.of(), List.of()); | ||
| } | ||
| } | ||
|
|
||
| /** | ||
| * Extracts and asserts the operator status. | ||
| */ | ||
| protected final void assertOperatorStatus(Operator operator, List<Page> input, List<Page> output) { | ||
| Operator.Status status = operator.status(); | ||
|
|
||
| try (var xContentBuilder = XContentBuilder.builder(XContentType.JSON.xContent())) { | ||
| status.toXContent(xContentBuilder, ToXContent.EMPTY_PARAMS); | ||
| if (status == null) { | ||
| assertStatus(null, input, output); | ||
| return; | ||
| } | ||
|
|
||
| var bytesReference = BytesReference.bytes(xContentBuilder); | ||
| var map = XContentHelper.convertToMap(bytesReference, false, xContentBuilder.contentType()).v2(); | ||
| var xContent = XContentType.JSON.xContent(); | ||
| try (var xContentBuilder = XContentBuilder.builder(xContent)) { | ||
| status.toXContent(xContentBuilder, ToXContent.EMPTY_PARAMS); | ||
|
|
||
| assertEmptyStatus(map); | ||
| } | ||
| var bytesReference = BytesReference.bytes(xContentBuilder); | ||
| var map = XContentHelper.convertToMap(bytesReference, false, xContentBuilder.contentType()).v2(); | ||
|
|
||
| assertStatus(map, input, output); | ||
| } catch (IOException e) { | ||
| fail(e, "Failed to convert operator status to XContent"); | ||
| } | ||
| } | ||
|
|
||
| /** | ||
| * Assert that the status is sane. | ||
| * <p> | ||
| * This method should be overridden with custom logics and for better assertions and for operators without status. | ||
| * </p> | ||
| */ | ||
| protected void assertEmptyStatus(@Nullable Map<String, Object> map) { | ||
| protected void assertStatus(@Nullable Map<String, Object> map, List<Page> input, List<Page> output) { | ||
| assertThat(map, notNullValue()); | ||
|
|
||
| var totalInputRows = input.stream().mapToInt(Page::getPositionCount).sum(); | ||
| var totalOutputRows = output.stream().mapToInt(Page::getPositionCount).sum(); | ||
|
|
||
| MapMatcher matcher = matchesMap().extraOk(); | ||
| if (map.containsKey("pages_processed")) { | ||
| matcher = matcher.entry("pages_processed", 0); | ||
| matcher = matcher.entry("pages_processed", greaterThanOrEqualTo(0)); | ||
| } else { | ||
| matcher = matcher.entry("pages_received", 0).entry("pages_emitted", 0); | ||
| matcher = matcher.entry("pages_received", input.size()).entry("pages_emitted", output.size()); | ||
| } | ||
| matcher = matcher.entry("rows_received", 0).entry("rows_emitted", 0); | ||
| matcher = matcher.entry("rows_received", totalInputRows).entry("rows_emitted", totalOutputRows); | ||
| assertMap(map, matcher); | ||
| } | ||
|
|
||
| /** | ||
| * EqualTo matcher that takes care of whole number types (Integers and longs). | ||
| */ | ||
| protected final Matcher<Object> matchNumberEqualTo(Number value) { | ||
| return wholeMatcher(comparesEqualTo(value.intValue()), comparesEqualTo(value.longValue())); | ||
| } | ||
|
|
||
| /** | ||
| * GreaterThanOrEqualTo matcher that takes care of whole number types (Integers and longs). | ||
| */ | ||
| protected final Matcher<Object> matchNumberGreaterThanOrEqualTo(Number value) { | ||
| return wholeMatcher(greaterThanOrEqualTo(value.intValue()), greaterThanOrEqualTo(value.longValue())); | ||
| } | ||
|
|
||
| /** | ||
| * Matcher that matches based on the number type (Integer or long). | ||
| */ | ||
| @SuppressWarnings("unchecked") | ||
| protected final Matcher<Object> wholeMatcher(Matcher<Integer> integerMatcher, Matcher<Long> longMatcher) { | ||
| return either(both(instanceOf(Integer.class)).and((Matcher<? super Object>) (Matcher<?>) integerMatcher)).or( | ||
| both(instanceOf(Long.class)).and((Matcher<? super Object>) (Matcher<?>) longMatcher) | ||
| ); | ||
| } | ||
|
||
|
|
||
| /** | ||
| * A {@link DriverContext} with a nonBreakingBigArrays. | ||
| */ | ||
|
|
||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,34 @@ | ||
| /* | ||
| * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one | ||
| * or more contributor license agreements. Licensed under the Elastic License | ||
| * 2.0; you may not use this file except in compliance with the Elastic License | ||
| * 2.0. | ||
| */ | ||
|
|
||
| package org.elasticsearch.compute.test; | ||
|
|
||
| import org.elasticsearch.compute.data.Page; | ||
| import org.elasticsearch.test.MapMatcher; | ||
|
|
||
| import java.util.List; | ||
| import java.util.Map; | ||
|
|
||
| import static org.elasticsearch.test.MapMatcher.assertMap; | ||
| import static org.elasticsearch.test.MapMatcher.matchesMap; | ||
|
|
||
| public abstract class AsyncOperatorTestCase extends OperatorTestCase { | ||
| @Override | ||
| protected final void assertStatus(Map<String, Object> map, List<Page> input, List<Page> output) { | ||
| var mapMatcher = matchesMap().entry("pages_received", input.size()) | ||
| .entry("pages_completed", input.size()) | ||
| .entry("process_nanos", matchNumberGreaterThanOrEqualTo(0)); | ||
|
|
||
| mapMatcher = extendStatusMatcher(mapMatcher, input, output); | ||
|
|
||
| assertMap(map, mapMatcher); | ||
| } | ||
|
|
||
| protected MapMatcher extendStatusMatcher(MapMatcher mapMatcher, List<Page> input, List<Page> output) { | ||
| return mapMatcher; | ||
| } | ||
| } |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
👍 These are correct usages.