diff --git a/docs/changelog/132738.yaml b/docs/changelog/132738.yaml new file mode 100644 index 0000000000000..86afd5f00fb56 --- /dev/null +++ b/docs/changelog/132738.yaml @@ -0,0 +1,5 @@ +pr: 132738 +summary: Fix `AsyncOperator` status values and add emitted rows +area: ES|QL +type: bug +issues: [] diff --git a/server/src/main/java/org/elasticsearch/TransportVersions.java b/server/src/main/java/org/elasticsearch/TransportVersions.java index bc29db8d7e2ec..1338339b01173 100644 --- a/server/src/main/java/org/elasticsearch/TransportVersions.java +++ b/server/src/main/java/org/elasticsearch/TransportVersions.java @@ -367,6 +367,7 @@ static TransportVersion def(int id) { public static final TransportVersion RESOLVE_INDEX_MODE_ADDED = def(9_141_0_00); public static final TransportVersion DATA_STREAM_WRITE_INDEX_ONLY_SETTINGS = def(9_142_0_00); public static final TransportVersion SCRIPT_RESCORER = def(9_143_0_00); + public static final TransportVersion ESQL_LOOKUP_OPERATOR_EMITTED_ROWS = def(9_144_0_00); /* * STOP! READ THIS FIRST! No, really, diff --git a/x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/operator/AsyncOperator.java b/x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/operator/AsyncOperator.java index b53fb37d3f362..07f4f8b332157 100644 --- a/x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/operator/AsyncOperator.java +++ b/x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/operator/AsyncOperator.java @@ -238,7 +238,7 @@ public IsBlockedResult isBlocked() { @Override public final Operator.Status status() { - return status(Math.max(0L, checkpoint.getMaxSeqNo()), Math.max(0L, checkpoint.getProcessedCheckpoint()), processNanos.sum()); + return status(checkpoint.getMaxSeqNo() + 1, checkpoint.getProcessedCheckpoint() + 1, processNanos.sum()); } protected Operator.Status status(long receivedPages, long completedPages, long processNanos) { @@ -289,7 +289,7 @@ public long completedPages() { return completedPages; } - public long procesNanos() { + public long processNanos() { return processNanos; } @@ -310,8 +310,8 @@ protected final XContentBuilder innerToXContent(XContentBuilder builder) throws if (builder.humanReadable()) { builder.field("process_time", TimeValue.timeValueNanos(processNanos)); } - builder.field("received_pages", receivedPages); - builder.field("completed_pages", completedPages); + builder.field("pages_received", receivedPages); + builder.field("pages_completed", completedPages); return builder; } diff --git a/x-pack/plugin/esql/compute/src/test/java/org/elasticsearch/compute/operator/AsyncOperatorStatusTests.java b/x-pack/plugin/esql/compute/src/test/java/org/elasticsearch/compute/operator/AsyncOperatorStatusTests.java index 1990728e128f0..54be5889aa697 100644 --- a/x-pack/plugin/esql/compute/src/test/java/org/elasticsearch/compute/operator/AsyncOperatorStatusTests.java +++ b/x-pack/plugin/esql/compute/src/test/java/org/elasticsearch/compute/operator/AsyncOperatorStatusTests.java @@ -39,17 +39,17 @@ protected AsyncOperator.Status mutateInstance(AsyncOperator.Status in) throws IO case 0 -> new AsyncOperator.Status( randomValueOtherThan(in.receivedPages(), ESTestCase::randomNonNegativeLong), in.completedPages(), - in.procesNanos() + in.processNanos() ); case 1 -> new AsyncOperator.Status( in.receivedPages(), randomValueOtherThan(in.completedPages(), ESTestCase::randomNonNegativeLong), - in.procesNanos() + in.processNanos() ); case 2 -> new AsyncOperator.Status( in.receivedPages(), in.completedPages(), - randomValueOtherThan(in.procesNanos(), ESTestCase::randomNonNegativeLong) + randomValueOtherThan(in.processNanos(), ESTestCase::randomNonNegativeLong) ); default -> throw new AssertionError("unknown "); }; @@ -62,8 +62,8 @@ public void testToXContent() { { "process_nanos" : 10, "process_time" : "10nanos", - "received_pages" : 100, - "completed_pages" : 50 + "pages_received" : 100, + "pages_completed" : 50 }""")); } } diff --git a/x-pack/plugin/esql/compute/src/test/java/org/elasticsearch/compute/operator/ChangePointOperatorTests.java b/x-pack/plugin/esql/compute/src/test/java/org/elasticsearch/compute/operator/ChangePointOperatorTests.java index c383028900199..a98a3f6d9dd67 100644 --- a/x-pack/plugin/esql/compute/src/test/java/org/elasticsearch/compute/operator/ChangePointOperatorTests.java +++ b/x-pack/plugin/esql/compute/src/test/java/org/elasticsearch/compute/operator/ChangePointOperatorTests.java @@ -87,7 +87,7 @@ protected Matcher expectedToStringOfSimple() { } @Override - protected void assertEmptyStatus(Map map) { + protected void assertStatus(Map map, List input, List output) { assertThat(map, nullValue()); } } diff --git a/x-pack/plugin/esql/compute/src/test/java/org/elasticsearch/compute/operator/LimitStatusTests.java b/x-pack/plugin/esql/compute/src/test/java/org/elasticsearch/compute/operator/LimitOperatorStatusTests.java similarity index 96% rename from x-pack/plugin/esql/compute/src/test/java/org/elasticsearch/compute/operator/LimitStatusTests.java rename to x-pack/plugin/esql/compute/src/test/java/org/elasticsearch/compute/operator/LimitOperatorStatusTests.java index 016c8a85f94cd..7ed4982f555d1 100644 --- a/x-pack/plugin/esql/compute/src/test/java/org/elasticsearch/compute/operator/LimitStatusTests.java +++ b/x-pack/plugin/esql/compute/src/test/java/org/elasticsearch/compute/operator/LimitOperatorStatusTests.java @@ -16,7 +16,7 @@ import static org.hamcrest.Matchers.equalTo; -public class LimitStatusTests extends AbstractWireSerializingTestCase { +public class LimitOperatorStatusTests extends AbstractWireSerializingTestCase { public void testToXContent() { assertThat(Strings.toString(new LimitOperator.Status(10, 1, 1, 111, 222)), equalTo(""" {"limit":10,"limit_remaining":1,"pages_processed":1,"rows_received":111,"rows_emitted":222}""")); diff --git a/x-pack/plugin/esql/compute/src/test/java/org/elasticsearch/compute/operator/LimitOperatorTests.java b/x-pack/plugin/esql/compute/src/test/java/org/elasticsearch/compute/operator/LimitOperatorTests.java index 8740ec8135783..50666d0d428e3 100644 --- a/x-pack/plugin/esql/compute/src/test/java/org/elasticsearch/compute/operator/LimitOperatorTests.java +++ b/x-pack/plugin/esql/compute/src/test/java/org/elasticsearch/compute/operator/LimitOperatorTests.java @@ -18,10 +18,13 @@ import java.util.ArrayList; import java.util.List; +import java.util.Map; import java.util.concurrent.atomic.AtomicInteger; import java.util.stream.LongStream; import static org.elasticsearch.compute.test.RandomBlock.randomElementType; +import static org.elasticsearch.test.MapMatcher.assertMap; +import static org.elasticsearch.test.MapMatcher.matchesMap; import static org.hamcrest.Matchers.equalTo; import static org.hamcrest.Matchers.sameInstance; @@ -192,4 +195,17 @@ Block randomBlock(BlockFactory blockFactory, int size) { } return RandomBlock.randomBlock(blockFactory, randomElementType(), size, false, 1, 1, 0, 0).block(); } + + @Override + protected final void assertStatus(Map map, List input, List output) { + var emittedRows = output.stream().mapToInt(Page::getPositionCount).sum(); + + var mapMatcher = matchesMap().entry("rows_received", emittedRows) + .entry("pages_processed", output.size()) + .entry("rows_emitted", emittedRows) + .entry("limit", 100) + .entry("limit_remaining", 100 - emittedRows); + + assertMap(map, mapMatcher); + } } diff --git a/x-pack/plugin/esql/compute/src/test/java/org/elasticsearch/compute/operator/OutputOperatorTests.java b/x-pack/plugin/esql/compute/src/test/java/org/elasticsearch/compute/operator/OutputOperatorTests.java index 3116b247020f4..f6d77b42477ec 100644 --- a/x-pack/plugin/esql/compute/src/test/java/org/elasticsearch/compute/operator/OutputOperatorTests.java +++ b/x-pack/plugin/esql/compute/src/test/java/org/elasticsearch/compute/operator/OutputOperatorTests.java @@ -7,6 +7,7 @@ package org.elasticsearch.compute.operator; +import org.elasticsearch.compute.data.Page; import org.elasticsearch.compute.test.AnyOperatorTestCase; import org.hamcrest.Matcher; @@ -52,7 +53,7 @@ public void testBigDescription() { } @Override - protected void assertEmptyStatus(Map map) { + protected void assertStatus(Map map, List input, List output) { assertThat(map, nullValue()); } } diff --git a/x-pack/plugin/esql/compute/test/src/main/java/org/elasticsearch/compute/test/AnyOperatorTestCase.java b/x-pack/plugin/esql/compute/test/src/main/java/org/elasticsearch/compute/test/AnyOperatorTestCase.java index d047a01ec7db6..e1a2110111d1e 100644 --- a/x-pack/plugin/esql/compute/test/src/main/java/org/elasticsearch/compute/test/AnyOperatorTestCase.java +++ b/x-pack/plugin/esql/compute/test/src/main/java/org/elasticsearch/compute/test/AnyOperatorTestCase.java @@ -12,6 +12,7 @@ import org.elasticsearch.common.xcontent.XContentHelper; import org.elasticsearch.compute.aggregation.GroupingAggregatorFunction; import org.elasticsearch.compute.data.BlockFactory; +import org.elasticsearch.compute.data.Page; import org.elasticsearch.compute.operator.DriverContext; import org.elasticsearch.compute.operator.Operator; import org.elasticsearch.compute.operator.SourceOperator; @@ -23,11 +24,14 @@ import org.hamcrest.Matcher; import java.io.IOException; +import java.util.List; import java.util.Map; import static org.elasticsearch.test.MapMatcher.assertMap; import static org.elasticsearch.test.MapMatcher.matchesMap; +import static org.hamcrest.Matchers.greaterThanOrEqualTo; import static org.hamcrest.Matchers.matchesPattern; +import static org.hamcrest.Matchers.notNullValue; /** * Superclass for testing any {@link Operator}, including {@link SourceOperator}s. @@ -106,39 +110,58 @@ public final void testSimpleToString() { } /** - * Ensures that the Operator.Status of this operator has the standard fields. + * Ensures that the Operator.Status of this operator has the standard fields, set to 0. */ - public final void testOperatorStatus() throws IOException { + public void testEmptyOperatorStatus() { DriverContext driverContext = driverContext(); try (var operator = simple().get(driverContext)) { - Operator.Status status = operator.status(); - if (status == null) { - assertEmptyStatus(null); - return; - } + assertOperatorStatus(operator, List.of(), List.of()); + } + } - try (var xContentBuilder = XContentBuilder.builder(XContentType.JSON.xContent())) { - status.toXContent(xContentBuilder, ToXContent.EMPTY_PARAMS); + /** + * Extracts and asserts the operator status. + */ + protected final void assertOperatorStatus(Operator operator, List input, List output) { + Operator.Status status = operator.status(); - var bytesReference = BytesReference.bytes(xContentBuilder); - var map = XContentHelper.convertToMap(bytesReference, false, xContentBuilder.contentType()).v2(); + if (status == null) { + assertStatus(null, input, output); + return; + } - assertEmptyStatus(map); - } + var xContent = XContentType.JSON.xContent(); + try (var xContentBuilder = XContentBuilder.builder(xContent)) { + status.toXContent(xContentBuilder, ToXContent.EMPTY_PARAMS); + + var bytesReference = BytesReference.bytes(xContentBuilder); + var map = XContentHelper.convertToMap(bytesReference, false, xContentBuilder.contentType()).v2(); + + assertStatus(map, input, output); + } catch (IOException e) { + fail(e, "Failed to convert operator status to XContent"); } } /** * Assert that the status is sane. + *

+ * This method should be overridden with custom logics and for better assertions and for operators without status. + *

*/ - protected void assertEmptyStatus(@Nullable Map map) { + protected void assertStatus(@Nullable Map map, List input, List output) { + assertThat(map, notNullValue()); + + var totalInputRows = input.stream().mapToInt(Page::getPositionCount).sum(); + var totalOutputRows = output.stream().mapToInt(Page::getPositionCount).sum(); + MapMatcher matcher = matchesMap().extraOk(); if (map.containsKey("pages_processed")) { - matcher = matcher.entry("pages_processed", 0); + matcher = matcher.entry("pages_processed", greaterThanOrEqualTo(0)); } else { - matcher = matcher.entry("pages_received", 0).entry("pages_emitted", 0); + matcher = matcher.entry("pages_received", input.size()).entry("pages_emitted", output.size()); } - matcher = matcher.entry("rows_received", 0).entry("rows_emitted", 0); + matcher = matcher.entry("rows_received", totalInputRows).entry("rows_emitted", totalOutputRows); assertMap(map, matcher); } diff --git a/x-pack/plugin/esql/compute/test/src/main/java/org/elasticsearch/compute/test/AsyncOperatorTestCase.java b/x-pack/plugin/esql/compute/test/src/main/java/org/elasticsearch/compute/test/AsyncOperatorTestCase.java new file mode 100644 index 0000000000000..db091d5b4ebcd --- /dev/null +++ b/x-pack/plugin/esql/compute/test/src/main/java/org/elasticsearch/compute/test/AsyncOperatorTestCase.java @@ -0,0 +1,38 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License + * 2.0; you may not use this file except in compliance with the Elastic License + * 2.0. + */ + +package org.elasticsearch.compute.test; + +import org.elasticsearch.compute.data.Page; +import org.elasticsearch.test.MapMatcher; +import org.hamcrest.Matcher; + +import java.util.List; +import java.util.Map; + +import static org.elasticsearch.test.MapMatcher.assertMap; +import static org.elasticsearch.test.MapMatcher.matchesMap; +import static org.hamcrest.Matchers.either; +import static org.hamcrest.Matchers.greaterThanOrEqualTo; + +public abstract class AsyncOperatorTestCase extends OperatorTestCase { + @Override + @SuppressWarnings("unchecked") + protected final void assertStatus(Map map, List input, List output) { + var mapMatcher = matchesMap().entry("pages_received", input.size()) + .entry("pages_completed", input.size()) + .entry("process_nanos", either(greaterThanOrEqualTo(0)).or((Matcher) (Matcher) greaterThanOrEqualTo(0L))); + + mapMatcher = extendStatusMatcher(mapMatcher, input, output); + + assertMap(map, mapMatcher); + } + + protected MapMatcher extendStatusMatcher(MapMatcher mapMatcher, List input, List output) { + return mapMatcher; + } +} diff --git a/x-pack/plugin/esql/compute/test/src/main/java/org/elasticsearch/compute/test/OperatorTestCase.java b/x-pack/plugin/esql/compute/test/src/main/java/org/elasticsearch/compute/test/OperatorTestCase.java index f9f9769929f28..d3a2e173ed3da 100644 --- a/x-pack/plugin/esql/compute/test/src/main/java/org/elasticsearch/compute/test/OperatorTestCase.java +++ b/x-pack/plugin/esql/compute/test/src/main/java/org/elasticsearch/compute/test/OperatorTestCase.java @@ -224,6 +224,7 @@ protected final void assertSimple(DriverContext context, int size) { var operator = simple().get(context); List results = drive(operator, input.iterator(), context); assertSimpleOutput(origInput, results); + assertOperatorStatus(operator, origInput, results); assertThat(context.breaker().getUsed(), equalTo(0L)); // Release all result blocks. After this, all input blocks should be released as well, otherwise we have a leak. diff --git a/x-pack/plugin/esql/compute/test/src/main/java/org/elasticsearch/compute/test/SourceOperatorTestCase.java b/x-pack/plugin/esql/compute/test/src/main/java/org/elasticsearch/compute/test/SourceOperatorTestCase.java index 904aa8882c857..906c998300d88 100644 --- a/x-pack/plugin/esql/compute/test/src/main/java/org/elasticsearch/compute/test/SourceOperatorTestCase.java +++ b/x-pack/plugin/esql/compute/test/src/main/java/org/elasticsearch/compute/test/SourceOperatorTestCase.java @@ -7,6 +7,9 @@ package org.elasticsearch.compute.test; +import org.elasticsearch.compute.data.Page; + +import java.util.List; import java.util.Map; import static org.elasticsearch.test.MapMatcher.assertMap; @@ -14,7 +17,12 @@ public abstract class SourceOperatorTestCase extends AnyOperatorTestCase { @Override - protected void assertEmptyStatus(Map map) { - assertMap(map, matchesMap().extraOk().entry("pages_emitted", 0).entry("rows_emitted", 0)); + protected void assertStatus(Map map, List input, List output) { + assertMap( + map, + matchesMap().extraOk() + .entry("pages_emitted", output.size()) + .entry("rows_emitted", output.stream().mapToInt(Page::getPositionCount).sum()) + ); } } diff --git a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/enrich/LookupFromIndexOperator.java b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/enrich/LookupFromIndexOperator.java index b1313a3713b3b..6f2f37119210e 100644 --- a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/enrich/LookupFromIndexOperator.java +++ b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/enrich/LookupFromIndexOperator.java @@ -7,6 +7,7 @@ package org.elasticsearch.xpack.esql.enrich; +import org.elasticsearch.TransportVersions; import org.elasticsearch.action.ActionListener; import org.elasticsearch.common.collect.Iterators; import org.elasticsearch.common.io.stream.NamedWriteableRegistry; @@ -94,6 +95,10 @@ public Operator get(DriverContext driverContext) { * Total number of pages emitted by this {@link Operator}. */ private long emittedPages = 0L; + /** + * Total number of rows emitted by this {@link Operator}. + */ + private long emittedRows = 0L; /** * The ongoing join or {@code null} none is ongoing at the moment. */ @@ -170,7 +175,9 @@ public Page getOutput() { Page right = ongoing.itr.next(); emittedPages++; try { - return ongoing.join.join(right); + Page joinedPage = ongoing.join.join(right); + emittedRows += joinedPage.getPositionCount(); + return joinedPage; } finally { right.releaseBlocks(); } @@ -183,6 +190,7 @@ public Page getOutput() { return null; } emittedPages++; + emittedRows += remaining.get().getPositionCount(); return remaining.get(); } @@ -229,7 +237,7 @@ protected void doClose() { @Override protected Operator.Status status(long receivedPages, long completedPages, long processNanos) { - return new LookupFromIndexOperator.Status(receivedPages, completedPages, processNanos, totalRows, emittedPages); + return new LookupFromIndexOperator.Status(receivedPages, completedPages, processNanos, totalRows, emittedPages, emittedRows); } public static class Status extends AsyncOperator.Status { @@ -244,17 +252,27 @@ public static class Status extends AsyncOperator.Status { * Total number of pages emitted by this {@link Operator}. */ private final long emittedPages; + /** + * Total number of rows emitted by this {@link Operator}. + */ + private final long emittedRows; - Status(long receivedPages, long completedPages, long totalTimeInMillis, long totalRows, long emittedPages) { - super(receivedPages, completedPages, totalTimeInMillis); + Status(long receivedPages, long completedPages, long processNanos, long totalRows, long emittedPages, long emittedRows) { + super(receivedPages, completedPages, processNanos); this.totalRows = totalRows; this.emittedPages = emittedPages; + this.emittedRows = emittedRows; } Status(StreamInput in) throws IOException { super(in); this.totalRows = in.readVLong(); this.emittedPages = in.readVLong(); + if (in.getTransportVersion().onOrAfter(TransportVersions.ESQL_LOOKUP_OPERATOR_EMITTED_ROWS)) { + this.emittedRows = in.readVLong(); + } else { + this.emittedRows = 0L; + } } @Override @@ -262,6 +280,10 @@ public void writeTo(StreamOutput out) throws IOException { super.writeTo(out); out.writeVLong(totalRows); out.writeVLong(emittedPages); + if (out.getTransportVersion().onOrAfter(TransportVersions.ESQL_LOOKUP_OPERATOR_EMITTED_ROWS)) { + out.writeVLong(emittedRows); + } + } @Override @@ -273,6 +295,10 @@ public long emittedPages() { return emittedPages; } + public long emittedRows() { + return emittedPages; + } + public long totalRows() { return totalRows; } @@ -281,8 +307,9 @@ public long totalRows() { public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException { builder.startObject(); super.innerToXContent(builder); - builder.field("emitted_pages", emittedPages()); - builder.field("total_rows", totalRows()); + builder.field("pages_emitted", emittedPages); + builder.field("rows_emitted", emittedRows); + builder.field("total_rows", totalRows); return builder.endObject(); } @@ -295,12 +322,12 @@ public boolean equals(Object o) { return false; } Status status = (Status) o; - return totalRows == status.totalRows && emittedPages == status.emittedPages; + return totalRows == status.totalRows && emittedPages == status.emittedPages && emittedRows == status.emittedRows; } @Override public int hashCode() { - return Objects.hash(super.hashCode(), totalRows, emittedPages); + return Objects.hash(super.hashCode(), totalRows, emittedPages, emittedRows); } } diff --git a/x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/enrich/EnrichOperatorStatusTests.java b/x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/enrich/EnrichOperatorStatusTests.java index 5f30cc860cae1..59e860a8314ec 100644 --- a/x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/enrich/EnrichOperatorStatusTests.java +++ b/x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/enrich/EnrichOperatorStatusTests.java @@ -41,25 +41,25 @@ protected EnrichLookupOperator.Status mutateInstance(EnrichLookupOperator.Status randomValueOtherThan(in.receivedPages(), ESTestCase::randomNonNegativeLong), in.completedPages(), in.totalTerms, - in.procesNanos() + in.processNanos() ); case 1 -> new EnrichLookupOperator.Status( in.receivedPages(), randomValueOtherThan(in.completedPages(), ESTestCase::randomNonNegativeLong), in.totalTerms, - in.procesNanos() + in.processNanos() ); case 2 -> new EnrichLookupOperator.Status( in.receivedPages(), in.completedPages(), randomValueOtherThan(in.totalTerms, ESTestCase::randomNonNegativeLong), - in.procesNanos() + in.processNanos() ); case 3 -> new EnrichLookupOperator.Status( in.receivedPages(), in.completedPages(), in.totalTerms, - randomValueOtherThan(in.procesNanos(), ESTestCase::randomNonNegativeLong) + randomValueOtherThan(in.processNanos(), ESTestCase::randomNonNegativeLong) ); default -> throw new AssertionError("unknown "); }; @@ -72,8 +72,8 @@ public void testToXContent() { { "process_nanos" : 10000, "process_time" : "10micros", - "received_pages" : 100, - "completed_pages" : 50, + "pages_received" : 100, + "pages_completed" : 50, "total_terms" : 120 }""")); } diff --git a/x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/enrich/LookupFromIndexOperatorStatusTests.java b/x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/enrich/LookupFromIndexOperatorStatusTests.java index c65d240275089..3e1a4a782cb24 100644 --- a/x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/enrich/LookupFromIndexOperatorStatusTests.java +++ b/x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/enrich/LookupFromIndexOperatorStatusTests.java @@ -30,6 +30,7 @@ protected LookupFromIndexOperator.Status createTestInstance() { randomNonNegativeLong(), randomLongBetween(0, TimeValue.timeValueHours(1).millis()), randomNonNegativeLong(), + randomNonNegativeLong(), randomNonNegativeLong() ); } @@ -38,30 +39,33 @@ protected LookupFromIndexOperator.Status createTestInstance() { protected LookupFromIndexOperator.Status mutateInstance(LookupFromIndexOperator.Status in) throws IOException { long receivedPages = in.receivedPages(); long completedPages = in.completedPages(); - long procesNanos = in.procesNanos(); - long totalTerms = in.totalRows(); + long procesNanos = in.processNanos(); + long totalRows = in.totalRows(); long emittedPages = in.emittedPages(); - switch (randomIntBetween(0, 4)) { + long emittedRows = in.emittedRows(); + switch (randomIntBetween(0, 5)) { case 0 -> receivedPages = randomValueOtherThan(receivedPages, ESTestCase::randomNonNegativeLong); case 1 -> completedPages = randomValueOtherThan(completedPages, ESTestCase::randomNonNegativeLong); case 2 -> procesNanos = randomValueOtherThan(procesNanos, ESTestCase::randomNonNegativeLong); - case 3 -> totalTerms = randomValueOtherThan(totalTerms, ESTestCase::randomNonNegativeLong); + case 3 -> totalRows = randomValueOtherThan(totalRows, ESTestCase::randomNonNegativeLong); case 4 -> emittedPages = randomValueOtherThan(emittedPages, ESTestCase::randomNonNegativeLong); + case 5 -> emittedRows = randomValueOtherThan(emittedRows, ESTestCase::randomNonNegativeLong); default -> throw new UnsupportedOperationException(); } - return new LookupFromIndexOperator.Status(receivedPages, completedPages, procesNanos, totalTerms, emittedPages); + return new LookupFromIndexOperator.Status(receivedPages, completedPages, procesNanos, totalRows, emittedPages, emittedRows); } public void testToXContent() { - var status = new LookupFromIndexOperator.Status(100, 50, TimeValue.timeValueSeconds(10).millis(), 120, 88); + var status = new LookupFromIndexOperator.Status(100, 50, TimeValue.timeValueNanos(10).nanos(), 120, 88, 800); String json = Strings.toString(status, true, true); assertThat(json, equalTo(""" { - "process_nanos" : 10000, - "process_time" : "10micros", - "received_pages" : 100, - "completed_pages" : 50, - "emitted_pages" : 88, + "process_nanos" : 10, + "process_time" : "10nanos", + "pages_received" : 100, + "pages_completed" : 50, + "pages_emitted" : 88, + "rows_emitted" : 800, "total_rows" : 120 }""")); } diff --git a/x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/enrich/LookupFromIndexOperatorTests.java b/x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/enrich/LookupFromIndexOperatorTests.java index 43e6e914d03a3..5f3d22957731a 100644 --- a/x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/enrich/LookupFromIndexOperatorTests.java +++ b/x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/enrich/LookupFromIndexOperatorTests.java @@ -37,8 +37,8 @@ import org.elasticsearch.compute.operator.DriverContext; import org.elasticsearch.compute.operator.Operator; import org.elasticsearch.compute.operator.SourceOperator; +import org.elasticsearch.compute.test.AsyncOperatorTestCase; import org.elasticsearch.compute.test.NoOpReleasable; -import org.elasticsearch.compute.test.OperatorTestCase; import org.elasticsearch.compute.test.SequenceLongBlockSourceOperator; import org.elasticsearch.compute.test.TupleLongLongBlockSourceOperator; import org.elasticsearch.core.IOUtils; @@ -56,6 +56,7 @@ import org.elasticsearch.tasks.CancellableTask; import org.elasticsearch.tasks.TaskId; import org.elasticsearch.test.ClusterServiceUtils; +import org.elasticsearch.test.MapMatcher; import org.elasticsearch.test.transport.MockTransport; import org.elasticsearch.threadpool.FixedExecutorBuilder; import org.elasticsearch.threadpool.TestThreadPool; @@ -84,7 +85,7 @@ import static org.hamcrest.Matchers.matchesPattern; import static org.mockito.Mockito.mock; -public class LookupFromIndexOperatorTests extends OperatorTestCase { +public class LookupFromIndexOperatorTests extends AsyncOperatorTestCase { private static final int LOOKUP_SIZE = 1000; private final ThreadPool threadPool = threadPool(); private final Directory lookupIndexDirectory = newDirectory(); @@ -304,7 +305,10 @@ public void release() { } @Override - protected void assertEmptyStatus(Map map) { - assumeFalse("not yet standardized", true); + protected MapMatcher extendStatusMatcher(MapMatcher mapMatcher, List input, List output) { + var totalInputRows = input.stream().mapToInt(Page::getPositionCount).sum(); + var totalOutputRows = output.stream().mapToInt(Page::getPositionCount).sum(); + + return mapMatcher.entry("total_rows", totalInputRows).entry("pages_emitted", output.size()).entry("rows_emitted", totalOutputRows); } } diff --git a/x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/inference/InferenceOperatorTestCase.java b/x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/inference/InferenceOperatorTestCase.java index a95935c1e22e7..07f367e615a5b 100644 --- a/x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/inference/InferenceOperatorTestCase.java +++ b/x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/inference/InferenceOperatorTestCase.java @@ -28,7 +28,7 @@ import org.elasticsearch.compute.operator.EvalOperator; import org.elasticsearch.compute.operator.SourceOperator; import org.elasticsearch.compute.test.AbstractBlockSourceOperator; -import org.elasticsearch.compute.test.OperatorTestCase; +import org.elasticsearch.compute.test.AsyncOperatorTestCase; import org.elasticsearch.core.Releasables; import org.elasticsearch.core.TimeValue; import org.elasticsearch.inference.InferenceServiceResults; @@ -41,16 +41,12 @@ import org.junit.After; import org.junit.Before; -import java.util.Map; import java.util.function.BiFunction; import java.util.function.Consumer; -import static org.elasticsearch.test.MapMatcher.assertMap; -import static org.elasticsearch.test.MapMatcher.matchesMap; import static org.hamcrest.Matchers.equalTo; -import static org.hamcrest.Matchers.greaterThanOrEqualTo; -public abstract class InferenceOperatorTestCase extends OperatorTestCase { +public abstract class InferenceOperatorTestCase extends AsyncOperatorTestCase { protected ThreadPool threadPool; protected int inputsCount; @@ -120,11 +116,6 @@ protected Page createPage(int positionOffset, int length) { }; } - @Override - protected void assertEmptyStatus(Map map) { - assertMap(map, matchesMap().entry("received_pages", 0).entry("completed_pages", 0).entry("process_nanos", greaterThanOrEqualTo(0))); - } - @SuppressWarnings("unchecked") protected InferenceService mockedInferenceService() { Client mockClient = new NoOpClient(threadPool) {