diff --git a/docs/changelog/125631.yaml b/docs/changelog/125631.yaml new file mode 100644 index 0000000000000..32917bb1da060 --- /dev/null +++ b/docs/changelog/125631.yaml @@ -0,0 +1,5 @@ +pr: 125631 +summary: Add `documents_found` and `values_loaded` +area: ES|QL +type: enhancement +issues: [] diff --git a/server/src/main/java/org/elasticsearch/TransportVersions.java b/server/src/main/java/org/elasticsearch/TransportVersions.java index 33cc154cd0b75..51482a99dc8b1 100644 --- a/server/src/main/java/org/elasticsearch/TransportVersions.java +++ b/server/src/main/java/org/elasticsearch/TransportVersions.java @@ -222,6 +222,7 @@ static TransportVersion def(int id) { public static final TransportVersion AMAZON_BEDROCK_TASK_SETTINGS = def(9_049_0_00); public static final TransportVersion ESQL_REPORT_SHARD_PARTITIONING = def(9_050_0_00); public static final TransportVersion ESQL_QUERY_PLANNING_DURATION = def(9_051_0_00); + public static final TransportVersion ESQL_DOCUMENTS_FOUND_AND_VALUES_LOADED = def(9_052_0_00); /* * STOP! READ THIS FIRST! No, really, diff --git a/server/src/main/java/org/elasticsearch/common/Strings.java b/server/src/main/java/org/elasticsearch/common/Strings.java index b87112386b440..a9cedb7cf7f50 100644 --- a/server/src/main/java/org/elasticsearch/common/Strings.java +++ b/server/src/main/java/org/elasticsearch/common/Strings.java @@ -822,7 +822,7 @@ public static String toString(ChunkedToXContent chunkedToXContent, boolean prett * Allows to configure the params. * Allows to control whether the outputted json needs to be pretty printed and human readable. */ - private static String toString(ToXContent toXContent, ToXContent.Params params, boolean pretty, boolean human) { + public static String toString(ToXContent toXContent, ToXContent.Params params, boolean pretty, boolean human) { try { XContentBuilder builder = createBuilder(pretty, human); if (toXContent.isFragment()) { diff --git a/test/framework/src/main/java/org/elasticsearch/test/rest/ESRestTestCase.java b/test/framework/src/main/java/org/elasticsearch/test/rest/ESRestTestCase.java index 65eb8832227bb..aaa1a92ee16a0 100644 --- a/test/framework/src/main/java/org/elasticsearch/test/rest/ESRestTestCase.java +++ b/test/framework/src/main/java/org/elasticsearch/test/rest/ESRestTestCase.java @@ -2672,8 +2672,13 @@ protected static MapMatcher getProfileMatcher() { .entry("drivers", instanceOf(List.class)); } - protected static MapMatcher getResultMatcher(boolean includeMetadata, boolean includePartial) { + protected static MapMatcher getResultMatcher(boolean includeMetadata, boolean includePartial, boolean includeDocumentsFound) { MapMatcher mapMatcher = matchesMap(); + if (includeDocumentsFound) { + // Older versions may not return documents_found and values_loaded. + mapMatcher = mapMatcher.entry("documents_found", greaterThanOrEqualTo(0)); + mapMatcher = mapMatcher.entry("values_loaded", greaterThanOrEqualTo(0)); + } if (includeMetadata) { mapMatcher = mapMatcher.entry("took", greaterThanOrEqualTo(0)); } @@ -2688,7 +2693,7 @@ protected static MapMatcher getResultMatcher(boolean includeMetadata, boolean in * Create empty result matcher from result, taking into account all metadata items. */ protected static MapMatcher getResultMatcher(Map result) { - return getResultMatcher(result.containsKey("took"), result.containsKey("is_partial")); + return getResultMatcher(result.containsKey("took"), result.containsKey("is_partial"), result.containsKey("documents_found")); } /** diff --git a/x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/data/CompositeBlock.java b/x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/data/CompositeBlock.java index 8d0b8a27f5833..10b91c578a6d6 100644 --- a/x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/data/CompositeBlock.java +++ b/x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/data/CompositeBlock.java @@ -83,7 +83,11 @@ public int getPositionCount() { @Override public int getTotalValueCount() { - throw new UnsupportedOperationException("Composite block"); + int totalValueCount = 0; + for (Block b : blocks) { + totalValueCount += b.getTotalValueCount(); + } + return totalValueCount; } @Override diff --git a/x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/lucene/LuceneOperator.java b/x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/lucene/LuceneOperator.java index 9bd5af16b094f..9d670854a3045 100644 --- a/x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/lucene/LuceneOperator.java +++ b/x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/lucene/LuceneOperator.java @@ -434,6 +434,11 @@ public Map partitioningStrategies return partitioningStrategies; } + @Override + public long documentsFound() { + return rowsEmitted; + } + @Override public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException { builder.startObject(); diff --git a/x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/lucene/ValuesSourceReaderOperator.java b/x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/lucene/ValuesSourceReaderOperator.java index 3bccdf75afac3..8d2465d4664f8 100644 --- a/x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/lucene/ValuesSourceReaderOperator.java +++ b/x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/lucene/ValuesSourceReaderOperator.java @@ -47,6 +47,8 @@ import java.util.function.IntFunction; import java.util.function.Supplier; +import static org.elasticsearch.TransportVersions.ESQL_DOCUMENTS_FOUND_AND_VALUES_LOADED; + /** * Operator that extracts doc_values from a Lucene index out of pages that have been produced by {@link LuceneSourceOperator} * and outputs them to a new column. @@ -113,6 +115,7 @@ public record ShardContext(IndexReader reader, Supplier newSourceL private final BlockFactory blockFactory; private final Map readersBuilt = new TreeMap<>(); + private long valuesLoaded; int lastShard = -1; int lastSegment = -1; @@ -165,6 +168,9 @@ public int get(int i) { } } success = true; + for (Block b : blocks) { + valuesLoaded += b.getTotalValueCount(); + } return page.appendBlocks(blocks); } catch (IOException e) { throw new UncheckedIOException(e); @@ -547,7 +553,7 @@ public String toString() { @Override protected Status status(long processNanos, int pagesProcessed, long rowsReceived, long rowsEmitted) { - return new Status(new TreeMap<>(readersBuilt), processNanos, pagesProcessed, rowsReceived, rowsEmitted); + return new Status(new TreeMap<>(readersBuilt), processNanos, pagesProcessed, rowsReceived, rowsEmitted, valuesLoaded); } public static class Status extends AbstractPageMappingOperator.Status { @@ -558,21 +564,34 @@ public static class Status extends AbstractPageMappingOperator.Status { ); private final Map readersBuilt; - - Status(Map readersBuilt, long processNanos, int pagesProcessed, long rowsReceived, long rowsEmitted) { + private final long valuesLoaded; + + Status( + Map readersBuilt, + long processNanos, + int pagesProcessed, + long rowsReceived, + long rowsEmitted, + long valuesLoaded + ) { super(processNanos, pagesProcessed, rowsReceived, rowsEmitted); this.readersBuilt = readersBuilt; + this.valuesLoaded = valuesLoaded; } Status(StreamInput in) throws IOException { super(in); readersBuilt = in.readOrderedMap(StreamInput::readString, StreamInput::readVInt); + valuesLoaded = in.getTransportVersion().onOrAfter(ESQL_DOCUMENTS_FOUND_AND_VALUES_LOADED) ? in.readVLong() : 0; } @Override public void writeTo(StreamOutput out) throws IOException { super.writeTo(out); out.writeMap(readersBuilt, StreamOutput::writeVInt); + if (out.getTransportVersion().onOrAfter(ESQL_DOCUMENTS_FOUND_AND_VALUES_LOADED)) { + out.writeVLong(valuesLoaded); + } } @Override @@ -584,6 +603,11 @@ public Map readersBuilt() { return readersBuilt; } + @Override + public long valuesLoaded() { + return valuesLoaded; + } + @Override public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException { builder.startObject(); @@ -592,6 +616,7 @@ public XContentBuilder toXContent(XContentBuilder builder, Params params) throws builder.field(e.getKey(), e.getValue()); } builder.endObject(); + builder.field("values_loaded", valuesLoaded); innerToXContent(builder); return builder.endObject(); } @@ -600,12 +625,12 @@ public XContentBuilder toXContent(XContentBuilder builder, Params params) throws public boolean equals(Object o) { if (super.equals(o) == false) return false; Status status = (Status) o; - return readersBuilt.equals(status.readersBuilt); + return readersBuilt.equals(status.readersBuilt) && valuesLoaded == status.valuesLoaded; } @Override public int hashCode() { - return Objects.hash(super.hashCode(), readersBuilt); + return Objects.hash(super.hashCode(), readersBuilt, valuesLoaded); } @Override @@ -710,6 +735,4 @@ public BlockLoader.AggregateMetricDoubleBuilder aggregateMetricDoubleBuilder(int return factory.newAggregateMetricDoubleBlockBuilder(count); } } - - // TODO tests that mix source loaded fields and doc values in the same block } diff --git a/x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/operator/DriverCompletionInfo.java b/x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/operator/DriverCompletionInfo.java new file mode 100644 index 0000000000000..bafcc076a46e8 --- /dev/null +++ b/x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/operator/DriverCompletionInfo.java @@ -0,0 +1,117 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License + * 2.0; you may not use this file except in compliance with the Elastic License + * 2.0. + */ + +package org.elasticsearch.compute.operator; + +import org.elasticsearch.common.io.stream.StreamInput; +import org.elasticsearch.common.io.stream.StreamOutput; +import org.elasticsearch.common.io.stream.Writeable; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.Collections; +import java.util.List; +import java.util.concurrent.atomic.AtomicLong; + +/** + * Information returned when one of more {@link Driver}s is completed. + * @param documentsFound The number of documents found by all lucene queries performed by these drivers. + * @param valuesLoaded The number of values loaded from lucene for all drivers. This is + * roughly the number of documents times the number of + * fields per document. Except {@code null} values don't count. + * And multivalued fields count as many times as there are values. + * @param collectedProfiles {@link DriverProfile}s from each driver. These are fairly cheap to build but + * not free so this will be empty if the {@code profile} option was not set in + * the request. + */ +public record DriverCompletionInfo(long documentsFound, long valuesLoaded, List collectedProfiles) implements Writeable { + + /** + * Completion info we use when we didn't properly complete any drivers. + * Usually this is returned with an error, but it's also used when receiving + * responses from very old nodes. + */ + public static final DriverCompletionInfo EMPTY = new DriverCompletionInfo(0, 0, List.of()); + + /** + * Build a {@link DriverCompletionInfo} for many drivers including their profile output. + */ + public static DriverCompletionInfo includingProfiles(List drivers) { + long documentsFound = 0; + long valuesLoaded = 0; + List collectedProfiles = new ArrayList<>(drivers.size()); + for (Driver d : drivers) { + DriverProfile p = d.profile(); + for (OperatorStatus o : p.operators()) { + documentsFound += o.documentsFound(); + valuesLoaded += o.valuesLoaded(); + } + collectedProfiles.add(p); + } + return new DriverCompletionInfo(documentsFound, valuesLoaded, collectedProfiles); + } + + /** + * Build a {@link DriverCompletionInfo} for many drivers excluding their profile output. + */ + public static DriverCompletionInfo excludingProfiles(List drivers) { + long documentsFound = 0; + long valuesLoaded = 0; + for (Driver d : drivers) { + DriverStatus s = d.status(); + assert s.status() == DriverStatus.Status.DONE; + for (OperatorStatus o : s.completedOperators()) { + documentsFound += o.documentsFound(); + valuesLoaded += o.valuesLoaded(); + } + } + return new DriverCompletionInfo(documentsFound, valuesLoaded, List.of()); + } + + public DriverCompletionInfo(StreamInput in) throws IOException { + this(in.readVLong(), in.readVLong(), in.readCollectionAsImmutableList(DriverProfile::readFrom)); + } + + @Override + public void writeTo(StreamOutput out) throws IOException { + out.writeVLong(documentsFound); + out.writeVLong(valuesLoaded); + out.writeCollection(collectedProfiles, (o, v) -> v.writeTo(o)); + } + + public static class Accumulator { + private long documentsFound; + private long valuesLoaded; + private final List collectedProfiles = new ArrayList<>(); + + public void accumulate(DriverCompletionInfo info) { + this.documentsFound += info.documentsFound; + this.valuesLoaded += info.valuesLoaded; + this.collectedProfiles.addAll(info.collectedProfiles); + } + + public DriverCompletionInfo finish() { + return new DriverCompletionInfo(documentsFound, valuesLoaded, collectedProfiles); + } + } + + public static class AtomicAccumulator { + private final AtomicLong documentsFound = new AtomicLong(); + private final AtomicLong valuesLoaded = new AtomicLong(); + private final List collectedProfiles = Collections.synchronizedList(new ArrayList<>()); + + public void accumulate(DriverCompletionInfo info) { + this.documentsFound.addAndGet(info.documentsFound); + this.valuesLoaded.addAndGet(info.valuesLoaded); + this.collectedProfiles.addAll(info.collectedProfiles); + } + + public DriverCompletionInfo finish() { + return new DriverCompletionInfo(documentsFound.get(), valuesLoaded.get(), collectedProfiles); + } + } +} diff --git a/x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/operator/DriverProfile.java b/x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/operator/DriverProfile.java index 1abe3087b4d93..1e75b913fb99d 100644 --- a/x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/operator/DriverProfile.java +++ b/x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/operator/DriverProfile.java @@ -104,6 +104,8 @@ public Iterator toXContentChunked(ToXContent.Params params if (b.humanReadable()) { b.field("cpu_time", TimeValue.timeValueNanos(cpuNanos)); } + b.field("documents_found", operators.stream().mapToLong(OperatorStatus::documentsFound).sum()); + b.field("values_loaded", operators.stream().mapToLong(OperatorStatus::valuesLoaded).sum()); b.field("iterations", iterations); return b; }), diff --git a/x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/operator/DriverStatus.java b/x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/operator/DriverStatus.java index d919cc1c18697..c96ac2c4c80b8 100644 --- a/x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/operator/DriverStatus.java +++ b/x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/operator/DriverStatus.java @@ -124,6 +124,8 @@ public XContentBuilder toXContent(XContentBuilder builder, Params params) throws if (builder.humanReadable()) { builder.field("cpu_time", TimeValue.timeValueNanos(cpuNanos)); } + builder.field("documents_found", documentsFound()); + builder.field("values_loaded", valuesLoaded()); builder.field("iterations", iterations); builder.field("status", status, params); builder.startArray("completed_operators"); @@ -145,6 +147,34 @@ public String toString() { return Strings.toString(this); } + /** + * The number of documents found by this driver. + */ + public long documentsFound() { + long documentsFound = 0; + for (OperatorStatus s : completedOperators) { + documentsFound += s.documentsFound(); + } + for (OperatorStatus s : activeOperators) { + documentsFound += s.documentsFound(); + } + return documentsFound; + } + + /** + * The number of values loaded by this operator. + */ + public long valuesLoaded() { + long valuesLoaded = 0; + for (OperatorStatus s : completedOperators) { + valuesLoaded += s.valuesLoaded(); + } + for (OperatorStatus s : activeOperators) { + valuesLoaded += s.valuesLoaded(); + } + return valuesLoaded; + } + public enum Status implements Writeable, ToXContentFragment { QUEUED, STARTING, diff --git a/x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/operator/Operator.java b/x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/operator/Operator.java index 46e85bec693e8..0a382a40c809c 100644 --- a/x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/operator/Operator.java +++ b/x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/operator/Operator.java @@ -105,5 +105,21 @@ interface OperatorFactory extends Describable { /** * Status of an {@link Operator} to be returned by the tasks API. */ - interface Status extends ToXContentObject, VersionedNamedWriteable {} + interface Status extends ToXContentObject, VersionedNamedWriteable { + /** + * The number of documents found by this operator. Most operators + * don't find documents and will return {@code 0} here. + */ + default long documentsFound() { + return 0; + } + + /** + * The number of values loaded by this operator. Most operators + * don't load values and will return {@code 0} here. + */ + default long valuesLoaded() { + return 0; + } + } } diff --git a/x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/operator/OperatorStatus.java b/x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/operator/OperatorStatus.java index 6d83338faf7c5..14d3da12ad5d4 100644 --- a/x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/operator/OperatorStatus.java +++ b/x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/operator/OperatorStatus.java @@ -49,4 +49,26 @@ public XContentBuilder toXContent(XContentBuilder builder, Params params) throws public String toString() { return Strings.toString(this); } + + /** + * The number of documents found by this operator. Most operators + * don't find documents and will return {@code 0} here. + */ + public long documentsFound() { + if (status == null) { + return 0; + } + return status.documentsFound(); + } + + /** + * The number of values loaded by this operator. Most operators + * don't load values and will return {@code 0} here. + */ + public long valuesLoaded() { + if (status == null) { + return 0; + } + return status.valuesLoaded(); + } } diff --git a/x-pack/plugin/esql/compute/src/test/java/org/elasticsearch/compute/data/CompositeBlockTests.java b/x-pack/plugin/esql/compute/src/test/java/org/elasticsearch/compute/data/CompositeBlockTests.java index ed7eded6eeda4..540adf0f2c1bf 100644 --- a/x-pack/plugin/esql/compute/src/test/java/org/elasticsearch/compute/data/CompositeBlockTests.java +++ b/x-pack/plugin/esql/compute/src/test/java/org/elasticsearch/compute/data/CompositeBlockTests.java @@ -12,6 +12,7 @@ import java.util.Arrays; import java.util.List; +import java.util.function.Supplier; import static org.hamcrest.Matchers.equalTo; @@ -21,19 +22,29 @@ public class CompositeBlockTests extends ComputeTestCase { .filter(e -> e != ElementType.COMPOSITE && e != ElementType.UNKNOWN && e != ElementType.DOC) .toList(); - public static CompositeBlock randomCompositeBlock(BlockFactory blockFactory, int numBlocks, int positionCount) { + public static CompositeBlock randomCompositeBlock( + BlockFactory blockFactory, + Supplier randomElementType, + boolean nullAllowed, + int numBlocks, + int positionCount, + int minValuesPerPosition, + int maxValuesPerPosition, + int minDupsPerPosition, + int maxDupsPerPosition + ) { Block[] blocks = new Block[numBlocks]; for (int b = 0; b < numBlocks; b++) { - ElementType elementType = randomFrom(supportedSubElementTypes); + ElementType elementType = randomElementType.get(); blocks[b] = RandomBlock.randomBlock( blockFactory, elementType, positionCount, - elementType == ElementType.NULL || randomBoolean(), - 0, - between(1, 2), - 0, - between(1, 2) + nullAllowed && (elementType == ElementType.NULL || randomBoolean()), + minValuesPerPosition, + maxValuesPerPosition, + minDupsPerPosition, + maxDupsPerPosition ).block(); } return new CompositeBlock(blocks); @@ -43,7 +54,19 @@ public void testFilter() { final BlockFactory blockFactory = blockFactory(); int numBlocks = randomIntBetween(1, 1000); int positionCount = randomIntBetween(1, 1000); - try (CompositeBlock origComposite = randomCompositeBlock(blockFactory, numBlocks, positionCount)) { + try ( + CompositeBlock origComposite = randomCompositeBlock( + blockFactory, + () -> randomFrom(supportedSubElementTypes), + true, + numBlocks, + positionCount, + 0, + between(1, 2), + 0, + between(1, 2) + ) + ) { int[] selected = new int[randomIntBetween(0, positionCount * 3)]; for (int i = 0; i < selected.length; i++) { selected[i] = randomIntBetween(0, positionCount - 1); @@ -59,4 +82,25 @@ public void testFilter() { } } } + + public void testTotalValueCount() { + final BlockFactory blockFactory = blockFactory(); + int numBlocks = randomIntBetween(1, 1000); + int positionCount = randomIntBetween(1, 1000); + try ( + CompositeBlock composite = randomCompositeBlock( + blockFactory, + () -> randomValueOtherThan(ElementType.NULL, () -> randomFrom(supportedSubElementTypes)), + false, + numBlocks, + positionCount, + 1, + 1, + 0, + 0 + ) + ) { + assertThat(composite.getTotalValueCount(), equalTo(numBlocks * positionCount)); + } + } } diff --git a/x-pack/plugin/esql/compute/src/test/java/org/elasticsearch/compute/lucene/ValuesSourceReaderOperatorStatusTests.java b/x-pack/plugin/esql/compute/src/test/java/org/elasticsearch/compute/lucene/ValuesSourceReaderOperatorStatusTests.java index 4303137f74bb3..af1463b88c62c 100644 --- a/x-pack/plugin/esql/compute/src/test/java/org/elasticsearch/compute/lucene/ValuesSourceReaderOperatorStatusTests.java +++ b/x-pack/plugin/esql/compute/src/test/java/org/elasticsearch/compute/lucene/ValuesSourceReaderOperatorStatusTests.java @@ -20,7 +20,7 @@ public class ValuesSourceReaderOperatorStatusTests extends AbstractWireSerializingTestCase { public static ValuesSourceReaderOperator.Status simple() { - return new ValuesSourceReaderOperator.Status(Map.of("ReaderType", 3), 1022323, 123, 111, 222); + return new ValuesSourceReaderOperator.Status(Map.of("ReaderType", 3), 1022323, 123, 111, 222, 1000); } public static String simpleToJson() { @@ -29,6 +29,7 @@ public static String simpleToJson() { "readers_built" : { "ReaderType" : 3 }, + "values_loaded" : 1000, "process_nanos" : 1022323, "process_time" : "1ms", "pages_processed" : 123, @@ -53,6 +54,7 @@ public ValuesSourceReaderOperator.Status createTestInstance() { randomNonNegativeLong(), randomNonNegativeInt(), randomNonNegativeLong(), + randomNonNegativeLong(), randomNonNegativeLong() ); } @@ -73,14 +75,16 @@ protected ValuesSourceReaderOperator.Status mutateInstance(ValuesSourceReaderOpe int pagesProcessed = instance.pagesProcessed(); long rowsReceived = instance.rowsReceived(); long rowsEmitted = instance.rowsEmitted(); - switch (between(0, 4)) { + long valuesLoaded = instance.valuesLoaded(); + switch (between(0, 5)) { case 0 -> readersBuilt = randomValueOtherThan(readersBuilt, this::randomReadersBuilt); case 1 -> processNanos = randomValueOtherThan(processNanos, ESTestCase::randomNonNegativeLong); case 2 -> pagesProcessed = randomValueOtherThan(pagesProcessed, ESTestCase::randomNonNegativeInt); case 3 -> rowsReceived = randomValueOtherThan(rowsReceived, ESTestCase::randomNonNegativeLong); case 4 -> rowsEmitted = randomValueOtherThan(rowsEmitted, ESTestCase::randomNonNegativeLong); + case 5 -> valuesLoaded = randomValueOtherThan(valuesLoaded, ESTestCase::randomNonNegativeLong); default -> throw new UnsupportedOperationException(); } - return new ValuesSourceReaderOperator.Status(readersBuilt, processNanos, pagesProcessed, rowsReceived, rowsEmitted); + return new ValuesSourceReaderOperator.Status(readersBuilt, processNanos, pagesProcessed, rowsReceived, rowsEmitted, valuesLoaded); } } diff --git a/x-pack/plugin/esql/compute/src/test/java/org/elasticsearch/compute/operator/DriverProfileTests.java b/x-pack/plugin/esql/compute/src/test/java/org/elasticsearch/compute/operator/DriverProfileTests.java index 08087f249c19f..c8f8094f69c27 100644 --- a/x-pack/plugin/esql/compute/src/test/java/org/elasticsearch/compute/operator/DriverProfileTests.java +++ b/x-pack/plugin/esql/compute/src/test/java/org/elasticsearch/compute/operator/DriverProfileTests.java @@ -58,6 +58,8 @@ public void testToXContent() { "took_time" : "10micros", "cpu_nanos" : 10000, "cpu_time" : "10micros", + "documents_found" : 222, + "values_loaded" : 1000, "iterations" : 12, "operators" : [ { diff --git a/x-pack/plugin/esql/compute/src/test/java/org/elasticsearch/compute/operator/DriverStatusTests.java b/x-pack/plugin/esql/compute/src/test/java/org/elasticsearch/compute/operator/DriverStatusTests.java index 3915c9d6a37b8..df3583d0c99bd 100644 --- a/x-pack/plugin/esql/compute/src/test/java/org/elasticsearch/compute/operator/DriverStatusTests.java +++ b/x-pack/plugin/esql/compute/src/test/java/org/elasticsearch/compute/operator/DriverStatusTests.java @@ -59,6 +59,8 @@ public void testToXContent() { "last_updated" : "1973-11-29T09:27:23.214Z", "cpu_nanos" : 123213, "cpu_time" : "123.2micros", + "documents_found" : 222, + "values_loaded" : 1000, "iterations" : 55, "status" : "running", "completed_operators" : [ diff --git a/x-pack/plugin/esql/qa/server/multi-clusters/src/javaRestTest/java/org/elasticsearch/xpack/esql/ccq/MultiClustersIT.java b/x-pack/plugin/esql/qa/server/multi-clusters/src/javaRestTest/java/org/elasticsearch/xpack/esql/ccq/MultiClustersIT.java index 791f5dacdce64..4ea413e4fcd3b 100644 --- a/x-pack/plugin/esql/qa/server/multi-clusters/src/javaRestTest/java/org/elasticsearch/xpack/esql/ccq/MultiClustersIT.java +++ b/x-pack/plugin/esql/qa/server/multi-clusters/src/javaRestTest/java/org/elasticsearch/xpack/esql/ccq/MultiClustersIT.java @@ -159,7 +159,11 @@ private Map runEsql(RestEsqlTestCase.RequestObjectBuilder reques } private void assertResultMap(boolean includeCCSMetadata, Map result, C columns, V values, boolean remoteOnly) { - MapMatcher mapMatcher = getResultMatcher(ccsMetadataAvailable(), result.containsKey("is_partial")); + MapMatcher mapMatcher = getResultMatcher( + ccsMetadataAvailable(), + result.containsKey("is_partial"), + result.containsKey("documents_found") + ); if (includeCCSMetadata) { mapMatcher = mapMatcher.entry("_clusters", any(Map.class)); } diff --git a/x-pack/plugin/esql/qa/server/single-node/src/javaRestTest/java/org/elasticsearch/xpack/esql/qa/single_node/RestEsqlIT.java b/x-pack/plugin/esql/qa/server/single-node/src/javaRestTest/java/org/elasticsearch/xpack/esql/qa/single_node/RestEsqlIT.java index 031c5c5898740..e5820a4bf6a5c 100644 --- a/x-pack/plugin/esql/qa/server/single-node/src/javaRestTest/java/org/elasticsearch/xpack/esql/qa/single_node/RestEsqlIT.java +++ b/x-pack/plugin/esql/qa/server/single-node/src/javaRestTest/java/org/elasticsearch/xpack/esql/qa/single_node/RestEsqlIT.java @@ -659,7 +659,9 @@ private MapMatcher commonProfile() { .entry("cpu_nanos", greaterThan(0L)) .entry("took_nanos", greaterThan(0L)) .entry("operators", instanceOf(List.class)) - .entry("sleeps", matchesMap().extraOk()); + .entry("sleeps", matchesMap().extraOk()) + .entry("documents_found", greaterThanOrEqualTo(0)) + .entry("values_loaded", greaterThanOrEqualTo(0)); } /** @@ -689,7 +691,8 @@ private String checkOperatorProfile(Map o) { .entry("process_nanos", greaterThan(0)) .entry("processed_queries", List.of("*:*")) .entry("partitioning_strategies", matchesMap().entry("rest-esql-test:0", "SHARD")); - case "ValuesSourceReaderOperator" -> basicProfile().entry("readers_built", matchesMap().extraOk()); + case "ValuesSourceReaderOperator" -> basicProfile().entry("values_loaded", greaterThanOrEqualTo(0)) + .entry("readers_built", matchesMap().extraOk()); case "AggregationOperator" -> matchesMap().entry("pages_processed", greaterThan(0)) .entry("rows_received", greaterThan(0)) .entry("rows_emitted", greaterThan(0)) diff --git a/x-pack/plugin/esql/qa/server/src/main/java/org/elasticsearch/xpack/esql/qa/rest/RestEsqlTestCase.java b/x-pack/plugin/esql/qa/server/src/main/java/org/elasticsearch/xpack/esql/qa/rest/RestEsqlTestCase.java index ab44347cf1e02..0cb522f1fb84d 100644 --- a/x-pack/plugin/esql/qa/server/src/main/java/org/elasticsearch/xpack/esql/qa/rest/RestEsqlTestCase.java +++ b/x-pack/plugin/esql/qa/server/src/main/java/org/elasticsearch/xpack/esql/qa/rest/RestEsqlTestCase.java @@ -57,6 +57,7 @@ import static java.util.Map.entry; import static org.elasticsearch.common.logging.LoggerMessageFormat.format; import static org.elasticsearch.test.ListMatcher.matchesList; +import static org.elasticsearch.test.MapMatcher.assertMap; import static org.elasticsearch.test.MapMatcher.matchesMap; import static org.elasticsearch.xpack.esql.EsqlTestUtils.as; import static org.elasticsearch.xpack.esql.qa.rest.RestEsqlTestCase.Mode.ASYNC; @@ -264,12 +265,19 @@ public static RequestObjectBuilder jsonBuilder() throws IOException { public void testGetAnswer() throws IOException { Map answer = runEsql(requestObjectBuilder().query("row a = 1, b = 2")); - assertEquals(4, answer.size()); + assertEquals(6, answer.size()); assertThat(((Integer) answer.get("took")).intValue(), greaterThanOrEqualTo(0)); Map colA = Map.of("name", "a", "type", "integer"); Map colB = Map.of("name", "b", "type", "integer"); - assertEquals(List.of(colA, colB), answer.get("columns")); - assertEquals(List.of(List.of(1, 2)), answer.get("values")); + assertMap( + answer, + matchesMap().entry("took", greaterThanOrEqualTo(0)) + .entry("is_partial", any(Boolean.class)) + .entry("documents_found", 0) + .entry("values_loaded", 0) + .entry("columns", List.of(colA, colB)) + .entry("values", List.of(List.of(1, 2))) + ); } public void testUseUnknownIndex() throws IOException { diff --git a/x-pack/plugin/esql/src/internalClusterTest/java/org/elasticsearch/xpack/esql/action/EsqlActionTaskIT.java b/x-pack/plugin/esql/src/internalClusterTest/java/org/elasticsearch/xpack/esql/action/EsqlActionTaskIT.java index d6ef2bf1ce6a7..e98574926e586 100644 --- a/x-pack/plugin/esql/src/internalClusterTest/java/org/elasticsearch/xpack/esql/action/EsqlActionTaskIT.java +++ b/x-pack/plugin/esql/src/internalClusterTest/java/org/elasticsearch/xpack/esql/action/EsqlActionTaskIT.java @@ -135,6 +135,7 @@ public void testTaskContents() throws Exception { matchesMap().entry("pause_me:column_at_a_time:ScriptLongs", greaterThanOrEqualTo(1)) ); assertThat(oStatus.pagesProcessed(), greaterThanOrEqualTo(1)); + assertThat(oStatus.valuesLoaded(), greaterThanOrEqualTo(1L)); valuesSourceReaders++; continue; } @@ -181,6 +182,19 @@ public void testTaskContents() throws Exception { \\_ProjectOperator[projection = [0]] \\_LimitOperator[limit = 1000] \\_OutputOperator[columns = [sum(pause_me)]]""")); + + for (TaskInfo task : dataTasks(foundTasks)) { + assertThat(((DriverStatus) task.status()).documentsFound(), greaterThan(0L)); + assertThat(((DriverStatus) task.status()).valuesLoaded(), greaterThan(0L)); + } + for (TaskInfo task : nodeReduceTasks(foundTasks)) { + assertThat(((DriverStatus) task.status()).documentsFound(), equalTo(0L)); + assertThat(((DriverStatus) task.status()).valuesLoaded(), equalTo(0L)); + } + for (TaskInfo task : coordinatorTasks(foundTasks)) { + assertThat(((DriverStatus) task.status()).documentsFound(), equalTo(0L)); + assertThat(((DriverStatus) task.status()).valuesLoaded(), equalTo(0L)); + } } finally { scriptPermits.release(numberOfDocs()); try (EsqlQueryResponse esqlResponse = response.get()) { diff --git a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/action/EsqlCapabilities.java b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/action/EsqlCapabilities.java index bce31633e2212..7dd679e681798 100644 --- a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/action/EsqlCapabilities.java +++ b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/action/EsqlCapabilities.java @@ -939,6 +939,12 @@ public enum Cap { */ METRICS_COMMAND(Build.current().isSnapshot()), + /** + * Are the {@code documents_found} and {@code values_loaded} fields available + * in the response and profile? + */ + DOCUMENTS_FOUND_AND_VALUES_LOADED, + /** * Index component selector syntax (my-data-stream-name::failures) */ diff --git a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/action/EsqlQueryResponse.java b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/action/EsqlQueryResponse.java index f1c26fa0174cd..c005414704d0f 100644 --- a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/action/EsqlQueryResponse.java +++ b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/action/EsqlQueryResponse.java @@ -35,6 +35,8 @@ import java.util.Objects; import java.util.Optional; +import static org.elasticsearch.TransportVersions.ESQL_DOCUMENTS_FOUND_AND_VALUES_LOADED; + public class EsqlQueryResponse extends org.elasticsearch.xpack.core.esql.action.EsqlQueryResponse implements ChunkedToXContentObject, @@ -47,6 +49,8 @@ public class EsqlQueryResponse extends org.elasticsearch.xpack.core.esql.action. private final List columns; private final List pages; + private final long documentsFound; + private final long valuesLoaded; private final Profile profile; private final boolean columnar; private final String asyncExecutionId; @@ -58,6 +62,8 @@ public class EsqlQueryResponse extends org.elasticsearch.xpack.core.esql.action. public EsqlQueryResponse( List columns, List pages, + long documentsFound, + long valuesLoaded, @Nullable Profile profile, boolean columnar, @Nullable String asyncExecutionId, @@ -67,6 +73,8 @@ public EsqlQueryResponse( ) { this.columns = columns; this.pages = pages; + this.valuesLoaded = valuesLoaded; + this.documentsFound = documentsFound; this.profile = profile; this.columnar = columnar; this.asyncExecutionId = asyncExecutionId; @@ -78,12 +86,14 @@ public EsqlQueryResponse( public EsqlQueryResponse( List columns, List pages, + long documentsFound, + long valuesLoaded, @Nullable Profile profile, boolean columnar, boolean isAsync, EsqlExecutionInfo executionInfo ) { - this(columns, pages, profile, columnar, null, false, isAsync, executionInfo); + this(columns, pages, documentsFound, valuesLoaded, profile, columnar, null, false, isAsync, executionInfo); } /** @@ -109,6 +119,8 @@ static EsqlQueryResponse deserialize(BlockStreamInput in) throws IOException { } List columns = in.readCollectionAsList(ColumnInfoImpl::new); List pages = in.readCollectionAsList(Page::new); + long documentsFound = in.getTransportVersion().onOrAfter(ESQL_DOCUMENTS_FOUND_AND_VALUES_LOADED) ? in.readVLong() : 0; + long valuesLoaded = in.getTransportVersion().onOrAfter(ESQL_DOCUMENTS_FOUND_AND_VALUES_LOADED) ? in.readVLong() : 0; if (in.getTransportVersion().onOrAfter(TransportVersions.V_8_12_0)) { profile = in.readOptionalWriteable(Profile::new); } @@ -117,7 +129,18 @@ static EsqlQueryResponse deserialize(BlockStreamInput in) throws IOException { if (in.getTransportVersion().onOrAfter(TransportVersions.V_8_16_0)) { executionInfo = in.readOptionalWriteable(EsqlExecutionInfo::new); } - return new EsqlQueryResponse(columns, pages, profile, columnar, asyncExecutionId, isRunning, isAsync, executionInfo); + return new EsqlQueryResponse( + columns, + pages, + documentsFound, + valuesLoaded, + profile, + columnar, + asyncExecutionId, + isRunning, + isAsync, + executionInfo + ); } @Override @@ -129,6 +152,10 @@ public void writeTo(StreamOutput out) throws IOException { } out.writeCollection(columns); out.writeCollection(pages); + if (out.getTransportVersion().onOrAfter(ESQL_DOCUMENTS_FOUND_AND_VALUES_LOADED)) { + out.writeVLong(documentsFound); + out.writeVLong(valuesLoaded); + } if (out.getTransportVersion().onOrAfter(TransportVersions.V_8_12_0)) { out.writeOptionalWriteable(profile); } @@ -161,6 +188,14 @@ public Iterator column(int columnIndex) { return ResponseValueUtils.valuesForColumn(columnIndex, columns.get(columnIndex).type(), pages); } + public long documentsFound() { + return documentsFound; + } + + public long valuesLoaded() { + return valuesLoaded; + } + public Profile profile() { return profile; } @@ -218,6 +253,12 @@ public Iterator toXContentChunked(ToXContent.Params params tookTime = Collections.emptyIterator(); } + Iterator meta = ChunkedToXContentHelper.chunk((builder, p) -> { + builder.field("documents_found", documentsFound); + builder.field("values_loaded", valuesLoaded); + return builder; + }); + Iterator columnHeadings = dropNullColumns ? Iterators.concat( ResponseXContentUtils.allColumns(columns, "all_columns"), @@ -232,6 +273,7 @@ public Iterator toXContentChunked(ToXContent.Params params ChunkedToXContentHelper.startObject(), asyncPropertiesOrEmpty(), tookTime, + meta, columnHeadings, ChunkedToXContentHelper.array("values", valuesIt), executionInfoRender, @@ -280,6 +322,8 @@ public boolean equals(Object o) { && Objects.equals(isRunning, that.isRunning) && columnar == that.columnar && Iterators.equals(values(), that.values(), (row1, row2) -> Iterators.equals(row1, row2, Objects::equals)) + && documentsFound == that.documentsFound + && valuesLoaded == that.valuesLoaded && Objects.equals(profile, that.profile) && Objects.equals(executionInfo, that.executionInfo); } @@ -290,8 +334,11 @@ public int hashCode() { asyncExecutionId, isRunning, columns, - Iterators.hashCode(values(), row -> Iterators.hashCode(row, Objects::hashCode)), columnar, + Iterators.hashCode(values(), row -> Iterators.hashCode(row, Objects::hashCode)), + documentsFound, + valuesLoaded, + profile, executionInfo ); } diff --git a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/action/EsqlQueryTask.java b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/action/EsqlQueryTask.java index f896a25317102..4d7565a5d7863 100644 --- a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/action/EsqlQueryTask.java +++ b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/action/EsqlQueryTask.java @@ -44,6 +44,7 @@ public EsqlExecutionInfo executionInfo() { @Override public EsqlQueryResponse getCurrentResult() { - return new EsqlQueryResponse(List.of(), List.of(), null, false, getExecutionId().getEncoded(), true, true, executionInfo); + // TODO it'd be nice to have the number of documents we've read from completed drivers here + return new EsqlQueryResponse(List.of(), List.of(), 0, 0, null, false, getExecutionId().getEncoded(), true, true, executionInfo); } } diff --git a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plugin/ClusterComputeHandler.java b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plugin/ClusterComputeHandler.java index 5ad81177a6a44..6938fa300022c 100644 --- a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plugin/ClusterComputeHandler.java +++ b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plugin/ClusterComputeHandler.java @@ -11,7 +11,7 @@ import org.elasticsearch.action.ActionListenerResponseHandler; import org.elasticsearch.action.OriginalIndices; import org.elasticsearch.action.support.ChannelActionListener; -import org.elasticsearch.compute.operator.DriverProfile; +import org.elasticsearch.compute.operator.DriverCompletionInfo; import org.elasticsearch.compute.operator.exchange.ExchangeService; import org.elasticsearch.compute.operator.exchange.ExchangeSourceHandler; import org.elasticsearch.core.Releasable; @@ -75,7 +75,7 @@ void startComputeOnRemoteCluster( RemoteCluster cluster, Runnable cancelQueryOnFailure, EsqlExecutionInfo executionInfo, - ActionListener> listener + ActionListener listener ) { var queryPragmas = configuration.pragmas(); listener = ActionListener.runBefore(listener, exchangeSource.addEmptySink()::close); @@ -87,10 +87,10 @@ void startComputeOnRemoteCluster( final boolean receivedResults = finalResponse.get() != null || pagesFetched.get(); if (receivedResults == false && EsqlCCSUtils.shouldIgnoreRuntimeError(executionInfo, clusterAlias, e)) { EsqlCCSUtils.markClusterWithFinalStateAndNoShards(executionInfo, clusterAlias, EsqlExecutionInfo.Cluster.Status.SKIPPED, e); - l.onResponse(List.of()); + l.onResponse(DriverCompletionInfo.EMPTY); } else if (configuration.allowPartialResults()) { EsqlCCSUtils.markClusterWithFinalStateAndNoShards(executionInfo, clusterAlias, EsqlExecutionInfo.Cluster.Status.PARTIAL, e); - l.onResponse(List.of()); + l.onResponse(DriverCompletionInfo.EMPTY); } else { l.onFailure(e); } @@ -118,9 +118,9 @@ void startComputeOnRemoteCluster( onGroupFailure = computeService.cancelQueryOnFailure(groupTask); l = ActionListener.runAfter(l, () -> transportService.getTaskManager().unregister(groupTask)); } - try (var computeListener = new ComputeListener(transportService.getThreadPool(), onGroupFailure, l.map(profiles -> { + try (var computeListener = new ComputeListener(transportService.getThreadPool(), onGroupFailure, l.map(completionInfo -> { updateExecutionInfo(executionInfo, clusterAlias, finalResponse.get()); - return profiles; + return completionInfo; }))) { var remoteSink = exchangeService.newRemoteSink(groupTask, childSessionId, transportService, cluster.connection); exchangeSource.addRemoteSink( @@ -134,7 +134,7 @@ void startComputeOnRemoteCluster( var clusterRequest = new ClusterComputeRequest(clusterAlias, childSessionId, configuration, remotePlan); final ActionListener clusterListener = computeListener.acquireCompute().map(r -> { finalResponse.set(r); - return r.getProfiles(); + return r.getCompletionInfo(); }); transportService.sendChildRequest( cluster.connection, @@ -287,7 +287,7 @@ void runComputeOnRemoteCluster( cancelQueryOnFailure, computeListener.acquireCompute().map(r -> { finalResponse.set(r); - return r.getProfiles(); + return r.getCompletionInfo(); }) ); } diff --git a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plugin/ComputeListener.java b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plugin/ComputeListener.java index c8b8e84fd2478..856f131cb5645 100644 --- a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plugin/ComputeListener.java +++ b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plugin/ComputeListener.java @@ -10,15 +10,11 @@ import org.elasticsearch.action.ActionListener; import org.elasticsearch.action.support.RefCountingListener; import org.elasticsearch.compute.EsqlRefCountingListener; -import org.elasticsearch.compute.operator.DriverProfile; +import org.elasticsearch.compute.operator.DriverCompletionInfo; import org.elasticsearch.compute.operator.ResponseHeadersCollector; import org.elasticsearch.core.Releasable; import org.elasticsearch.threadpool.ThreadPool; -import java.util.ArrayList; -import java.util.Collections; -import java.util.List; - /** * A variant of {@link RefCountingListener} with the following differences: * 1. Automatically cancels sub tasks on failure (via runOnTaskFailure) @@ -27,19 +23,18 @@ * 4. Collects failures and returns the most appropriate exception to the caller. */ final class ComputeListener implements Releasable { + private final DriverCompletionInfo.AtomicAccumulator completionInfoAccumulator = new DriverCompletionInfo.AtomicAccumulator(); private final EsqlRefCountingListener refs; - private final List collectedProfiles; private final ResponseHeadersCollector responseHeaders; private final Runnable runOnFailure; - ComputeListener(ThreadPool threadPool, Runnable runOnFailure, ActionListener> delegate) { + ComputeListener(ThreadPool threadPool, Runnable runOnFailure, ActionListener delegate) { this.runOnFailure = runOnFailure; this.responseHeaders = new ResponseHeadersCollector(threadPool.getThreadContext()); - this.collectedProfiles = Collections.synchronizedList(new ArrayList<>()); // listener that executes after all the sub-listeners refs (created via acquireCompute) have completed this.refs = new EsqlRefCountingListener(delegate.delegateFailure((l, ignored) -> { responseHeaders.finish(); - delegate.onResponse(collectedProfiles.stream().toList()); + delegate.onResponse(completionInfoAccumulator.finish()); })); } @@ -60,13 +55,11 @@ ActionListener acquireAvoid() { /** * Acquires a new listener that collects compute result. This listener will also collect warnings emitted during compute */ - ActionListener> acquireCompute() { + ActionListener acquireCompute() { final ActionListener delegate = acquireAvoid(); - return ActionListener.wrap(profiles -> { + return ActionListener.wrap(info -> { responseHeaders.collect(); - if (profiles != null && profiles.isEmpty() == false) { - collectedProfiles.addAll(profiles); - } + completionInfoAccumulator.accumulate(info); delegate.onResponse(null); }, e -> { responseHeaders.collect(); diff --git a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plugin/ComputeResponse.java b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plugin/ComputeResponse.java index 289186ae38e6e..1a9b211d7a487 100644 --- a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plugin/ComputeResponse.java +++ b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plugin/ComputeResponse.java @@ -11,6 +11,7 @@ import org.elasticsearch.action.search.ShardSearchFailure; import org.elasticsearch.common.io.stream.StreamInput; import org.elasticsearch.common.io.stream.StreamOutput; +import org.elasticsearch.compute.operator.DriverCompletionInfo; import org.elasticsearch.compute.operator.DriverProfile; import org.elasticsearch.core.TimeValue; import org.elasticsearch.transport.TransportResponse; @@ -18,11 +19,13 @@ import java.io.IOException; import java.util.List; +import static org.elasticsearch.TransportVersions.ESQL_DOCUMENTS_FOUND_AND_VALUES_LOADED; + /** * The compute result of {@link DataNodeRequest} or {@link ClusterComputeRequest} */ final class ComputeResponse extends TransportResponse { - private final List profiles; + private final DriverCompletionInfo completionInfo; // for use with ClusterComputeRequests (cross-cluster searches) private final TimeValue took; // overall took time for a specific cluster in a cross-cluster search @@ -32,12 +35,12 @@ final class ComputeResponse extends TransportResponse { public final int failedShards; public final List failures; - ComputeResponse(List profiles) { - this(profiles, null, null, null, null, null, List.of()); + ComputeResponse(DriverCompletionInfo completionInfo) { + this(completionInfo, null, null, null, null, null, List.of()); } ComputeResponse( - List profiles, + DriverCompletionInfo completionInfo, TimeValue took, Integer totalShards, Integer successfulShards, @@ -45,7 +48,7 @@ final class ComputeResponse extends TransportResponse { Integer failedShards, List failures ) { - this.profiles = profiles; + this.completionInfo = completionInfo; this.took = took; this.totalShards = totalShards == null ? 0 : totalShards.intValue(); this.successfulShards = successfulShards == null ? 0 : successfulShards.intValue(); @@ -55,14 +58,16 @@ final class ComputeResponse extends TransportResponse { } ComputeResponse(StreamInput in) throws IOException { - if (in.getTransportVersion().onOrAfter(TransportVersions.V_8_12_0)) { + if (in.getTransportVersion().onOrAfter(ESQL_DOCUMENTS_FOUND_AND_VALUES_LOADED)) { + completionInfo = new DriverCompletionInfo(in); + } else if (in.getTransportVersion().onOrAfter(TransportVersions.V_8_12_0)) { if (in.readBoolean()) { - profiles = in.readCollectionAsImmutableList(DriverProfile::readFrom); + completionInfo = new DriverCompletionInfo(0, 0, in.readCollectionAsImmutableList(DriverProfile::readFrom)); } else { - profiles = null; + completionInfo = DriverCompletionInfo.EMPTY; } } else { - profiles = null; + completionInfo = DriverCompletionInfo.EMPTY; } if (in.getTransportVersion().onOrAfter(TransportVersions.V_8_16_0)) { this.took = in.readOptionalTimeValue(); @@ -87,13 +92,11 @@ final class ComputeResponse extends TransportResponse { @Override public void writeTo(StreamOutput out) throws IOException { - if (out.getTransportVersion().onOrAfter(TransportVersions.V_8_12_0)) { - if (profiles == null) { - out.writeBoolean(false); - } else { - out.writeBoolean(true); - out.writeCollection(profiles); - } + if (out.getTransportVersion().onOrAfter(ESQL_DOCUMENTS_FOUND_AND_VALUES_LOADED)) { + completionInfo.writeTo(out); + } else if (out.getTransportVersion().onOrAfter(TransportVersions.V_8_12_0)) { + out.writeBoolean(true); + out.writeCollection(completionInfo.collectedProfiles()); } if (out.getTransportVersion().onOrAfter(TransportVersions.V_8_16_0)) { out.writeOptionalTimeValue(took); @@ -108,8 +111,8 @@ public void writeTo(StreamOutput out) throws IOException { } } - public List getProfiles() { - return profiles; + public DriverCompletionInfo getCompletionInfo() { + return completionInfo; } public TimeValue getTook() { diff --git a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plugin/ComputeService.java b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plugin/ComputeService.java index ec5334fc44cf1..4e91e1d505791 100644 --- a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plugin/ComputeService.java +++ b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plugin/ComputeService.java @@ -19,7 +19,7 @@ import org.elasticsearch.compute.data.Page; import org.elasticsearch.compute.lucene.DataPartitioning; import org.elasticsearch.compute.operator.Driver; -import org.elasticsearch.compute.operator.DriverProfile; +import org.elasticsearch.compute.operator.DriverCompletionInfo; import org.elasticsearch.compute.operator.DriverTaskRunner; import org.elasticsearch.compute.operator.FailureCollector; import org.elasticsearch.compute.operator.exchange.ExchangeService; @@ -304,10 +304,14 @@ public void executePlan( ); updateShardCountForCoordinatorOnlyQuery(execInfo); try ( - var computeListener = new ComputeListener(transportService.getThreadPool(), cancelQueryOnFailure, listener.map(profiles -> { - updateExecutionInfoAfterCoordinatorOnlyQuery(execInfo); - return new Result(physicalPlan.output(), collectedPages, profiles, execInfo); - })) + var computeListener = new ComputeListener( + transportService.getThreadPool(), + cancelQueryOnFailure, + listener.map(completionInfo -> { + updateExecutionInfoAfterCoordinatorOnlyQuery(execInfo); + return new Result(physicalPlan.output(), collectedPages, completionInfo, execInfo); + }) + ) ) { runCompute(rootTask, computeContext, coordinatorPlan, computeListener.acquireCompute()); return; @@ -336,10 +340,16 @@ public void executePlan( ); listener = ActionListener.runBefore(listener, () -> exchangeService.removeExchangeSourceHandler(sessionId)); exchangeService.addExchangeSourceHandler(sessionId, exchangeSource); - try (var computeListener = new ComputeListener(transportService.getThreadPool(), cancelQueryOnFailure, listener.map(profiles -> { - execInfo.markEndQuery(); // TODO: revisit this time recording model as part of INLINESTATS improvements - return new Result(outputAttributes, collectedPages, profiles, execInfo); - }))) { + try ( + var computeListener = new ComputeListener( + transportService.getThreadPool(), + cancelQueryOnFailure, + listener.map(completionInfo -> { + execInfo.markEndQuery(); // TODO: revisit this time recording model as part of INLINESTATS improvements + return new Result(outputAttributes, collectedPages, completionInfo, execInfo); + }) + ) + ) { try (Releasable ignored = exchangeSource.addEmptySink()) { // run compute on the coordinator final AtomicBoolean localClusterWasInterrupted = new AtomicBoolean(); @@ -347,7 +357,7 @@ public void executePlan( var localListener = new ComputeListener( transportService.getThreadPool(), cancelQueryOnFailure, - computeListener.acquireCompute().delegateFailure((l, profiles) -> { + computeListener.acquireCompute().delegateFailure((l, completionInfo) -> { if (execInfo.clusterInfo.containsKey(LOCAL_CLUSTER)) { execInfo.swapCluster(LOCAL_CLUSTER, (k, v) -> { var tookTime = execInfo.tookSoFar(); @@ -362,7 +372,7 @@ public void executePlan( return builder.build(); }); } - l.onResponse(profiles); + l.onResponse(completionInfo); }) ) ) { @@ -405,7 +415,7 @@ public void executePlan( .setFailures(r.failures) .build() ); - dataNodesListener.onResponse(r.getProfiles()); + dataNodesListener.onResponse(r.getCompletionInfo()); }, e -> { if (configuration.allowPartialResults()) { execInfo.swapCluster( @@ -414,7 +424,7 @@ public void executePlan( EsqlExecutionInfo.Cluster.Status.PARTIAL ).setFailures(List.of(new ShardSearchFailure(e))).build() ); - dataNodesListener.onResponse(List.of()); + dataNodesListener.onResponse(DriverCompletionInfo.EMPTY); } else { dataNodesListener.onFailure(e); } @@ -489,7 +499,7 @@ private static void updateExecutionInfoAfterCoordinatorOnlyQuery(EsqlExecutionIn } } - void runCompute(CancellableTask task, ComputeContext context, PhysicalPlan plan, ActionListener> listener) { + void runCompute(CancellableTask task, ComputeContext context, PhysicalPlan plan, ActionListener listener) { listener = ActionListener.runBefore(listener, () -> Releasables.close(context.searchContexts())); List contexts = new ArrayList<>(context.searchContexts().size()); for (int i = 0; i < context.searchContexts().size(); i++) { @@ -552,9 +562,9 @@ public SourceProvider createSourceProvider() { } ActionListener listenerCollectingStatus = listener.map(ignored -> { if (context.configuration().profile()) { - return drivers.stream().map(Driver::profile).toList(); + return DriverCompletionInfo.includingProfiles(drivers); } else { - return List.of(); + return DriverCompletionInfo.excludingProfiles(drivers); } }); listenerCollectingStatus = ActionListener.releaseAfter(listenerCollectingStatus, () -> Releasables.close(drivers)); diff --git a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plugin/DataNodeComputeHandler.java b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plugin/DataNodeComputeHandler.java index d8aa9ea3e258a..59da28fae7279 100644 --- a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plugin/DataNodeComputeHandler.java +++ b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plugin/DataNodeComputeHandler.java @@ -16,7 +16,7 @@ import org.elasticsearch.action.support.ChannelActionListener; import org.elasticsearch.action.support.RefCountingRunnable; import org.elasticsearch.cluster.node.DiscoveryNode; -import org.elasticsearch.compute.operator.DriverProfile; +import org.elasticsearch.compute.operator.DriverCompletionInfo; import org.elasticsearch.compute.operator.exchange.ExchangeService; import org.elasticsearch.compute.operator.exchange.ExchangeSink; import org.elasticsearch.compute.operator.exchange.ExchangeSinkHandler; @@ -191,7 +191,7 @@ protected void sendRequest( TransportRequestOptions.EMPTY, new ActionListenerResponseHandler<>(computeListener.acquireCompute().map(r -> { nodeResponseRef.set(r); - return r.profiles(); + return r.completionInfo(); }), DataNodeComputeResponse::new, esqlExecutor) ); } @@ -250,15 +250,15 @@ private void runBatch(int startBatchIndex) { final int endBatchIndex = Math.min(startBatchIndex + maxConcurrentShards, request.shardIds().size()); final AtomicInteger pagesProduced = new AtomicInteger(); List shardIds = request.shardIds().subList(startBatchIndex, endBatchIndex); - ActionListener> batchListener = new ActionListener<>() { - final ActionListener> ref = computeListener.acquireCompute(); + ActionListener batchListener = new ActionListener<>() { + final ActionListener ref = computeListener.acquireCompute(); @Override - public void onResponse(List result) { + public void onResponse(DriverCompletionInfo info) { try { onBatchCompleted(endBatchIndex); } finally { - ref.onResponse(result); + ref.onResponse(info); } } @@ -268,7 +268,7 @@ public void onFailure(Exception e) { for (ShardId shardId : shardIds) { addShardLevelFailure(shardId, e); } - onResponse(List.of()); + onResponse(DriverCompletionInfo.EMPTY); } else { // TODO: add these to fatal failures so we can continue processing other shards. try { @@ -282,7 +282,7 @@ public void onFailure(Exception e) { acquireSearchContexts(clusterAlias, shardIds, configuration, request.aliasFilters(), ActionListener.wrap(searchContexts -> { assert ThreadPool.assertCurrentThreadPool(ThreadPool.Names.SEARCH, ESQL_WORKER_THREAD_POOL_NAME); if (searchContexts.isEmpty()) { - batchListener.onResponse(List.of()); + batchListener.onResponse(DriverCompletionInfo.EMPTY); return; } var computeContext = new ComputeContext( diff --git a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plugin/DataNodeComputeResponse.java b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plugin/DataNodeComputeResponse.java index 1313db9e70449..4de26e4034d81 100644 --- a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plugin/DataNodeComputeResponse.java +++ b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plugin/DataNodeComputeResponse.java @@ -9,52 +9,63 @@ import org.elasticsearch.common.io.stream.StreamInput; import org.elasticsearch.common.io.stream.StreamOutput; +import org.elasticsearch.compute.operator.DriverCompletionInfo; import org.elasticsearch.compute.operator.DriverProfile; import org.elasticsearch.index.shard.ShardId; import org.elasticsearch.transport.TransportResponse; import java.io.IOException; -import java.util.List; import java.util.Map; -import java.util.Objects; + +import static org.elasticsearch.TransportVersions.ESQL_DOCUMENTS_FOUND_AND_VALUES_LOADED; /** * The compute result of {@link DataNodeRequest} */ final class DataNodeComputeResponse extends TransportResponse { - private final List profiles; + private final DriverCompletionInfo completionInfo; private final Map shardLevelFailures; - DataNodeComputeResponse(List profiles, Map shardLevelFailures) { - this.profiles = profiles; + DataNodeComputeResponse(DriverCompletionInfo completionInfo, Map shardLevelFailures) { + this.completionInfo = completionInfo; this.shardLevelFailures = shardLevelFailures; } DataNodeComputeResponse(StreamInput in) throws IOException { + if (in.getTransportVersion().onOrAfter(ESQL_DOCUMENTS_FOUND_AND_VALUES_LOADED)) { + this.completionInfo = new DriverCompletionInfo(in); + this.shardLevelFailures = in.readMap(ShardId::new, StreamInput::readException); + return; + } if (DataNodeComputeHandler.supportShardLevelRetryFailure(in.getTransportVersion())) { - this.profiles = in.readCollectionAsImmutableList(DriverProfile::readFrom); + this.completionInfo = new DriverCompletionInfo(0, 0, in.readCollectionAsImmutableList(DriverProfile::readFrom)); this.shardLevelFailures = in.readMap(ShardId::new, StreamInput::readException); - } else { - this.profiles = Objects.requireNonNullElse(new ComputeResponse(in).getProfiles(), List.of()); - this.shardLevelFailures = Map.of(); + return; } + this.completionInfo = new ComputeResponse(in).getCompletionInfo(); + this.shardLevelFailures = Map.of(); } @Override public void writeTo(StreamOutput out) throws IOException { + if (out.getTransportVersion().onOrAfter(ESQL_DOCUMENTS_FOUND_AND_VALUES_LOADED)) { + completionInfo.writeTo(out); + out.writeMap(shardLevelFailures, (o, v) -> v.writeTo(o), StreamOutput::writeException); + return; + } if (DataNodeComputeHandler.supportShardLevelRetryFailure(out.getTransportVersion())) { - out.writeCollection(profiles, (o, v) -> v.writeTo(o)); + out.writeCollection(completionInfo.collectedProfiles(), (o, v) -> v.writeTo(o)); out.writeMap(shardLevelFailures, (o, v) -> v.writeTo(o), StreamOutput::writeException); - } else { - if (shardLevelFailures.isEmpty() == false) { - throw new IllegalStateException("shard level failures are not supported in old versions"); - } - new ComputeResponse(profiles).writeTo(out); + return; + } + if (shardLevelFailures.isEmpty() == false) { + throw new IllegalStateException("shard level failures are not supported in old versions"); } + new ComputeResponse(completionInfo).writeTo(out); } - List profiles() { - return profiles; + public DriverCompletionInfo completionInfo() { + return completionInfo; } Map shardLevelFailures() { diff --git a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plugin/DataNodeRequestSender.java b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plugin/DataNodeRequestSender.java index d2ba7ddc11792..b27e74867885d 100644 --- a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plugin/DataNodeRequestSender.java +++ b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plugin/DataNodeRequestSender.java @@ -21,7 +21,7 @@ import org.elasticsearch.cluster.node.DiscoveryNodeRole; import org.elasticsearch.common.breaker.CircuitBreakingException; import org.elasticsearch.common.util.concurrent.ConcurrentCollections; -import org.elasticsearch.compute.operator.DriverProfile; +import org.elasticsearch.compute.operator.DriverCompletionInfo; import org.elasticsearch.compute.operator.FailureCollector; import org.elasticsearch.core.TimeValue; import org.elasticsearch.index.Index; @@ -111,17 +111,23 @@ final void startComputeOnDataNodes( ) { final long startTimeInNanos = System.nanoTime(); searchShards(requestFilter, concreteIndices, originalIndices, ActionListener.wrap(targetShards -> { - try (var computeListener = new ComputeListener(transportService.getThreadPool(), runOnTaskFailure, listener.map(profiles -> { - return new ComputeResponse( - profiles, - TimeValue.timeValueNanos(System.nanoTime() - startTimeInNanos), - targetShards.totalShards(), - targetShards.totalShards() - shardFailures.size() - skippedShards.get(), - targetShards.skippedShards() + skippedShards.get(), - shardFailures.size(), - selectFailures() - ); - }))) { + try ( + var computeListener = new ComputeListener( + transportService.getThreadPool(), + runOnTaskFailure, + listener.map( + completionInfo -> new ComputeResponse( + completionInfo, + TimeValue.timeValueNanos(System.nanoTime() - startTimeInNanos), + targetShards.totalShards(), + targetShards.totalShards() - shardFailures.size() - skippedShards.get(), + targetShards.skippedShards() + skippedShards.get(), + shardFailures.size(), + selectFailures() + ) + ) + ) + ) { for (TargetShard shard : targetShards.shards.values()) { for (DiscoveryNode node : shard.remainingNodes) { nodePermits.putIfAbsent(node, new Semaphore(1)); @@ -234,15 +240,15 @@ private List selectFailures() { } private void sendOneNodeRequest(TargetShards targetShards, ComputeListener computeListener, NodeRequest request) { - final ActionListener> listener = computeListener.acquireCompute(); + final ActionListener listener = computeListener.acquireCompute(); sendRequest(request.node, request.shardIds, request.aliasFilters, new NodeListener() { - void onAfter(List profiles) { + void onAfter(DriverCompletionInfo info) { nodePermits.get(request.node).release(); if (concurrentRequests != null) { concurrentRequests.release(); } trySendingRequestsForPendingShards(targetShards, computeListener); - listener.onResponse(profiles); + listener.onResponse(info); } @Override @@ -258,7 +264,7 @@ public void onResponse(DataNodeComputeResponse response) { trackShardLevelFailure(shardId, false, e.getValue()); pendingShardIds.add(shardId); } - onAfter(response.profiles()); + onAfter(response.completionInfo()); } @Override @@ -267,7 +273,7 @@ public void onFailure(Exception e, boolean receivedData) { trackShardLevelFailure(shardId, receivedData, e); pendingShardIds.add(shardId); } - onAfter(List.of()); + onAfter(DriverCompletionInfo.EMPTY); } @Override @@ -276,7 +282,7 @@ public void onSkip() { if (rootTask.isCancelled()) { onFailure(new TaskCancelledException("null"), true); } else { - onResponse(new DataNodeComputeResponse(List.of(), Map.of())); + onResponse(new DataNodeComputeResponse(DriverCompletionInfo.EMPTY, Map.of())); } } }); diff --git a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plugin/TransportEsqlQueryAction.java b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plugin/TransportEsqlQueryAction.java index cd67682a5ea7a..176545b705fe3 100644 --- a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plugin/TransportEsqlQueryAction.java +++ b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plugin/TransportEsqlQueryAction.java @@ -330,7 +330,9 @@ private EsqlQueryResponse toResponse(Task task, EsqlQueryRequest request, Config } return new ColumnInfoImpl(c.name(), c.dataType().outputType(), originalTypes); }).toList(); - EsqlQueryResponse.Profile profile = configuration.profile() ? new EsqlQueryResponse.Profile(result.profiles()) : null; + EsqlQueryResponse.Profile profile = configuration.profile() + ? new EsqlQueryResponse.Profile(result.completionInfo().collectedProfiles()) + : null; threadPool.getThreadContext().addResponseHeader(AsyncExecutionId.ASYNC_EXECUTION_IS_RUNNING_HEADER, "?0"); if (task instanceof EsqlQueryTask asyncTask && request.keepOnCompletion()) { String asyncExecutionId = asyncTask.getExecutionId().getEncoded(); @@ -338,6 +340,8 @@ private EsqlQueryResponse toResponse(Task task, EsqlQueryRequest request, Config return new EsqlQueryResponse( columns, result.pages(), + result.completionInfo().documentsFound(), + result.completionInfo().documentsFound(), profile, request.columnar(), asyncExecutionId, @@ -346,7 +350,16 @@ private EsqlQueryResponse toResponse(Task task, EsqlQueryRequest request, Config result.executionInfo() ); } - return new EsqlQueryResponse(columns, result.pages(), profile, request.columnar(), request.async(), result.executionInfo()); + return new EsqlQueryResponse( + columns, + result.pages(), + result.completionInfo().documentsFound(), + result.completionInfo().valuesLoaded(), + profile, + request.columnar(), + request.async(), + result.executionInfo() + ); } /** @@ -398,6 +411,8 @@ public EsqlQueryResponse initialResponse(EsqlQueryTask task) { return new EsqlQueryResponse( List.of(), List.of(), + 0, + 0, null, false, asyncExecutionId, diff --git a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/session/EsqlCCSUtils.java b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/session/EsqlCCSUtils.java index fd20911184ae8..b604832ffd14d 100644 --- a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/session/EsqlCCSUtils.java +++ b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/session/EsqlCCSUtils.java @@ -15,6 +15,7 @@ import org.elasticsearch.action.support.IndicesOptions; import org.elasticsearch.common.Strings; import org.elasticsearch.common.util.set.Sets; +import org.elasticsearch.compute.operator.DriverCompletionInfo; import org.elasticsearch.core.Nullable; import org.elasticsearch.indices.IndicesExpressionGrouper; import org.elasticsearch.license.XPackLicenseState; @@ -75,7 +76,7 @@ abstract static class CssPartialErrorsActionListener implements ActionListener 0) { // code-path to execute subplans - executeSubPlan(new ArrayList<>(), physicalPlan, iterator, executionInfo, runner, listener); + executeSubPlan(new DriverCompletionInfo.Accumulator(), physicalPlan, iterator, executionInfo, runner, listener); } else { // execute main plan runner.run(physicalPlan, listener); @@ -247,7 +247,7 @@ private void executeSubPlans( } private void executeSubPlan( - List profileAccumulator, + DriverCompletionInfo.Accumulator completionInfoAccumulator, PhysicalPlan plan, Iterator subPlanIterator, EsqlExecutionInfo executionInfo, @@ -258,7 +258,7 @@ private void executeSubPlan( runner.run(tuple.physical, listener.delegateFailureAndWrap((next, result) -> { try { - profileAccumulator.addAll(result.profiles()); + completionInfoAccumulator.accumulate(result.completionInfo()); LocalRelation resultWrapper = resultToPlan(tuple.logical, result); // replace the original logical plan with the backing result @@ -274,12 +274,14 @@ private void executeSubPlan( if (subPlanIterator.hasNext() == false) { runner.run(newPlan, next.delegateFailureAndWrap((finalListener, finalResult) -> { - profileAccumulator.addAll(finalResult.profiles()); - finalListener.onResponse(new Result(finalResult.schema(), finalResult.pages(), profileAccumulator, executionInfo)); + completionInfoAccumulator.accumulate(finalResult.completionInfo()); + finalListener.onResponse( + new Result(finalResult.schema(), finalResult.pages(), completionInfoAccumulator.finish(), executionInfo) + ); })); } else { // continue executing the subplans - executeSubPlan(profileAccumulator, newPlan, subPlanIterator, executionInfo, runner, next); + executeSubPlan(completionInfoAccumulator, newPlan, subPlanIterator, executionInfo, runner, next); } } finally { Releasables.closeExpectNoException(Releasables.wrap(Iterators.map(result.pages().iterator(), p -> p::releaseBlocks))); diff --git a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/session/Result.java b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/session/Result.java index 4f90893c759b8..5da8a53e53f15 100644 --- a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/session/Result.java +++ b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/session/Result.java @@ -9,7 +9,7 @@ import org.elasticsearch.compute.data.Block; import org.elasticsearch.compute.data.Page; -import org.elasticsearch.compute.operator.DriverProfile; +import org.elasticsearch.compute.operator.DriverCompletionInfo; import org.elasticsearch.core.Nullable; import org.elasticsearch.xpack.esql.action.EsqlExecutionInfo; import org.elasticsearch.xpack.esql.core.expression.Attribute; @@ -23,10 +23,12 @@ * that was run. Each {@link Page} contains a {@link Block} of values for each * attribute in this list. * @param pages Actual values produced by running the ESQL. - * @param profiles {@link DriverProfile}s from all drivers that ran to produce the output. These - * are quite cheap to build, so we build them for all ESQL runs, regardless of if - * users have asked for them. But we only include them in the results if users ask - * for them. + * @param completionInfo Information collected from drivers after they've been completed. * @param executionInfo Metadata about the execution of this query. Used for cross cluster queries. */ -public record Result(List schema, List pages, List profiles, @Nullable EsqlExecutionInfo executionInfo) {} +public record Result( + List schema, + List pages, + DriverCompletionInfo completionInfo, + @Nullable EsqlExecutionInfo executionInfo +) {} diff --git a/x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/CsvTests.java b/x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/CsvTests.java index ddec9d56282f8..7ab45f805c754 100644 --- a/x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/CsvTests.java +++ b/x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/CsvTests.java @@ -27,6 +27,7 @@ import org.elasticsearch.compute.data.BlockFactory; import org.elasticsearch.compute.data.Page; import org.elasticsearch.compute.operator.Driver; +import org.elasticsearch.compute.operator.DriverCompletionInfo; import org.elasticsearch.compute.operator.DriverRunner; import org.elasticsearch.compute.operator.exchange.ExchangeSinkHandler; import org.elasticsearch.compute.operator.exchange.ExchangeSourceHandler; @@ -728,6 +729,9 @@ protected void start(Driver driver, ActionListener driverListener) { } }; listener = ActionListener.releaseAfter(listener, () -> Releasables.close(drivers)); - runner.runToCompletion(drivers, listener.map(ignore -> new Result(physicalPlan.output(), collectedPages, List.of(), null))); + runner.runToCompletion( + drivers, + listener.map(ignore -> new Result(physicalPlan.output(), collectedPages, DriverCompletionInfo.EMPTY, null)) + ); } } diff --git a/x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/action/EsqlQueryResponseTests.java b/x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/action/EsqlQueryResponseTests.java index 4347fcc8a59e7..ccab9d54a96a1 100644 --- a/x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/action/EsqlQueryResponseTests.java +++ b/x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/action/EsqlQueryResponseTests.java @@ -41,6 +41,7 @@ import org.elasticsearch.index.mapper.BlockLoader; import org.elasticsearch.rest.action.RestActions; import org.elasticsearch.test.AbstractChunkedSerializingTestCase; +import org.elasticsearch.test.ESTestCase; import org.elasticsearch.transport.RemoteClusterAware; import org.elasticsearch.xcontent.InstantiatingObjectParser; import org.elasticsearch.xcontent.ObjectParser; @@ -129,7 +130,18 @@ EsqlQueryResponse randomResponseAsync(boolean columnar, EsqlQueryResponse.Profil id = randomAlphaOfLengthBetween(1, 16); isRunning = randomBoolean(); } - return new EsqlQueryResponse(columns, values, profile, columnar, id, isRunning, async, createExecutionInfo()); + return new EsqlQueryResponse( + columns, + values, + randomNonNegativeLong(), + randomNonNegativeLong(), + profile, + columnar, + id, + isRunning, + async, + createExecutionInfo() + ); } EsqlExecutionInfo createExecutionInfo() { @@ -255,58 +267,42 @@ protected EsqlQueryResponse mutateInstance(EsqlQueryResponse instance) { allNull = false; } } - return switch (allNull ? between(0, 2) : between(0, 3)) { + List columns = instance.columns(); + List pages = deepCopyOfPages(instance); + long documentsFound = instance.documentsFound(); + long valuesLoaded = instance.valuesLoaded(); + EsqlQueryResponse.Profile profile = instance.profile(); + boolean columnar = instance.columnar(); + boolean isAsync = instance.isAsync(); + EsqlExecutionInfo executionInfo = instance.getExecutionInfo(); + switch (allNull ? between(0, 4) : between(0, 5)) { case 0 -> { int mutCol = between(0, instance.columns().size() - 1); - List cols = new ArrayList<>(instance.columns()); + columns = new ArrayList<>(instance.columns()); // keep the type the same so the values are still valid but change the name - cols.set( - mutCol, - new ColumnInfoImpl(cols.get(mutCol).name() + "mut", cols.get(mutCol).type(), cols.get(mutCol).originalTypes()) - ); - yield new EsqlQueryResponse( - cols, - deepCopyOfPages(instance), - instance.profile(), - instance.columnar(), - instance.isAsync(), - instance.getExecutionInfo() - ); + ColumnInfoImpl mut = columns.get(mutCol); + columns.set(mutCol, new ColumnInfoImpl(mut.name() + "mut", mut.type(), mut.originalTypes())); } - case 1 -> new EsqlQueryResponse( - instance.columns(), - deepCopyOfPages(instance), - instance.profile(), - false == instance.columnar(), - instance.isAsync(), - instance.getExecutionInfo() - ); - case 2 -> new EsqlQueryResponse( - instance.columns(), - deepCopyOfPages(instance), - randomValueOtherThan(instance.profile(), this::randomProfile), - instance.columnar(), - instance.isAsync(), - instance.getExecutionInfo() - ); - case 3 -> { + case 1 -> documentsFound = randomValueOtherThan(documentsFound, ESTestCase::randomNonNegativeLong); + case 2 -> valuesLoaded = randomValueOtherThan(valuesLoaded, ESTestCase::randomNonNegativeLong); + case 3 -> columnar = false == columnar; + case 4 -> profile = randomValueOtherThan(profile, this::randomProfile); + case 5 -> { + assert allNull == false + : "can't replace values while preserving types if all pages are null - the only valid values are null"; int noPages = instance.pages().size(); List differentPages = List.of(); do { differentPages.forEach(p -> Releasables.closeExpectNoException(p::releaseBlocks)); differentPages = randomList(noPages, noPages, () -> randomPage(instance.columns())); } while (differentPages.equals(instance.pages())); - yield new EsqlQueryResponse( - instance.columns(), - differentPages, - instance.profile(), - instance.columnar(), - instance.isAsync(), - instance.getExecutionInfo() - ); + pages.forEach(Page::releaseBlocks); + pages = differentPages; } default -> throw new IllegalArgumentException(); - }; + } + ; + return new EsqlQueryResponse(columns, pages, documentsFound, valuesLoaded, profile, columnar, isAsync, executionInfo); } private List deepCopyOfPages(EsqlQueryResponse response) { @@ -358,6 +354,8 @@ public static class ResponseBuilder { ObjectParser.ValueType.BOOLEAN_OR_NULL ); parser.declareInt(constructorArg(), new ParseField("took")); + parser.declareLong(constructorArg(), new ParseField("documents_found")); + parser.declareLong(constructorArg(), new ParseField("values_loaded")); parser.declareObjectArray(constructorArg(), (p, c) -> ColumnInfoImpl.fromXContent(p), new ParseField("columns")); parser.declareField(constructorArg(), (p, c) -> p.list(), new ParseField("values"), ObjectParser.ValueType.OBJECT_ARRAY); parser.declareObject(optionalConstructorArg(), (p, c) -> parseClusters(p), new ParseField("_clusters")); @@ -372,6 +370,8 @@ public ResponseBuilder( @Nullable String asyncExecutionId, Boolean isRunning, Integer took, + long documentsFound, + long valuesLoaded, List columns, List> values, EsqlExecutionInfo executionInfo @@ -380,6 +380,8 @@ public ResponseBuilder( this.response = new EsqlQueryResponse( columns, List.of(valuesToPage(TestBlockFactory.getNonBreakingInstance(), columns, values)), + documentsFound, + valuesLoaded, null, false, asyncExecutionId, @@ -574,62 +576,154 @@ public void testChunkResponseSizeColumnar() { try (EsqlQueryResponse resp = randomResponse(true, null)) { int columnCount = resp.pages().get(0).getBlockCount(); int bodySize = resp.pages().stream().mapToInt(p -> p.getPositionCount() * p.getBlockCount()).sum() + columnCount * 2; - assertChunkCount(resp, r -> 5 + clusterDetailsSize(resp.getExecutionInfo().clusterInfo.size()) + bodySize); + assertChunkCount(resp, r -> 6 + clusterDetailsSize(resp.getExecutionInfo().clusterInfo.size()) + bodySize); } try (EsqlQueryResponse resp = randomResponseAsync(true, null, true)) { int columnCount = resp.pages().get(0).getBlockCount(); int bodySize = resp.pages().stream().mapToInt(p -> p.getPositionCount() * p.getBlockCount()).sum() + columnCount * 2; - assertChunkCount(resp, r -> 6 + clusterDetailsSize(resp.getExecutionInfo().clusterInfo.size()) + bodySize); // is_running + assertChunkCount(resp, r -> 7 + clusterDetailsSize(resp.getExecutionInfo().clusterInfo.size()) + bodySize); // is_running } } public void testChunkResponseSizeRows() { try (EsqlQueryResponse resp = randomResponse(false, null)) { int bodySize = resp.pages().stream().mapToInt(Page::getPositionCount).sum(); - assertChunkCount(resp, r -> 5 + clusterDetailsSize(resp.getExecutionInfo().clusterInfo.size()) + bodySize); + assertChunkCount(resp, r -> 6 + clusterDetailsSize(resp.getExecutionInfo().clusterInfo.size()) + bodySize); } try (EsqlQueryResponse resp = randomResponseAsync(false, null, true)) { int bodySize = resp.pages().stream().mapToInt(Page::getPositionCount).sum(); - assertChunkCount(resp, r -> 6 + clusterDetailsSize(resp.getExecutionInfo().clusterInfo.size()) + bodySize); + assertChunkCount(resp, r -> 7 + clusterDetailsSize(resp.getExecutionInfo().clusterInfo.size()) + bodySize); } } public void testSimpleXContentColumnar() { try (EsqlQueryResponse response = simple(true)) { - assertThat(Strings.toString(wrapAsToXContent(response)), equalTo(""" - {"columns":[{"name":"foo","type":"integer"}],"values":[[40,80]]}""")); + assertThat(Strings.toString(wrapAsToXContent(response), true, false), equalTo(""" + { + "documents_found" : 3, + "values_loaded" : 100, + "columns" : [ + { + "name" : "foo", + "type" : "integer" + } + ], + "values" : [ + [ + 40, + 80 + ] + ] + }""")); } } public void testSimpleXContentColumnarDropNulls() { try (EsqlQueryResponse response = simple(true)) { assertThat( - Strings.toString(wrapAsToXContent(response), new ToXContent.MapParams(Map.of(DROP_NULL_COLUMNS_OPTION, "true"))), + Strings.toString( + wrapAsToXContent(response), + new ToXContent.MapParams(Map.of(DROP_NULL_COLUMNS_OPTION, "true")), + true, + false + ), equalTo(""" - {"all_columns":[{"name":"foo","type":"integer"}],"columns":[{"name":"foo","type":"integer"}],"values":[[40,80]]}""") + { + "documents_found" : 3, + "values_loaded" : 100, + "all_columns" : [ + { + "name" : "foo", + "type" : "integer" + } + ], + "columns" : [ + { + "name" : "foo", + "type" : "integer" + } + ], + "values" : [ + [ + 40, + 80 + ] + ] + }""") ); } } public void testSimpleXContentColumnarAsync() { try (EsqlQueryResponse response = simple(true, true)) { - assertThat(Strings.toString(wrapAsToXContent(response)), equalTo(""" - {"is_running":false,"columns":[{"name":"foo","type":"integer"}],"values":[[40,80]]}""")); + assertThat(Strings.toString(wrapAsToXContent(response), true, false), equalTo(""" + { + "is_running" : false, + "documents_found" : 3, + "values_loaded" : 100, + "columns" : [ + { + "name" : "foo", + "type" : "integer" + } + ], + "values" : [ + [ + 40, + 80 + ] + ] + }""")); } } public void testSimpleXContentRows() { try (EsqlQueryResponse response = simple(false)) { - assertThat(Strings.toString(wrapAsToXContent(response)), equalTo(""" - {"columns":[{"name":"foo","type":"integer"}],"values":[[40],[80]]}""")); + assertThat(Strings.toString(wrapAsToXContent(response), true, false), equalTo(""" + { + "documents_found" : 3, + "values_loaded" : 100, + "columns" : [ + { + "name" : "foo", + "type" : "integer" + } + ], + "values" : [ + [ + 40 + ], + [ + 80 + ] + ] + }""")); } } public void testSimpleXContentRowsAsync() { try (EsqlQueryResponse response = simple(false, true)) { - assertThat(Strings.toString(wrapAsToXContent(response)), equalTo(""" - {"is_running":false,"columns":[{"name":"foo","type":"integer"}],"values":[[40],[80]]}""")); + assertThat(Strings.toString(wrapAsToXContent(response), true, false), equalTo(""" + { + "is_running" : false, + "documents_found" : 3, + "values_loaded" : 100, + "columns" : [ + { + "name" : "foo", + "type" : "integer" + } + ], + "values" : [ + [ + 40 + ], + [ + 80 + ] + ] + }""")); } } @@ -638,6 +732,8 @@ public void testBasicXContentIdAndRunning() { EsqlQueryResponse response = new EsqlQueryResponse( List.of(new ColumnInfoImpl("foo", "integer", null)), List.of(new Page(blockFactory.newIntArrayVector(new int[] { 40, 80 }, 2).asBlock())), + 10, + 99, null, false, "id-123", @@ -646,8 +742,27 @@ public void testBasicXContentIdAndRunning() { null ) ) { - assertThat(Strings.toString(response), equalTo(""" - {"id":"id-123","is_running":true,"columns":[{"name":"foo","type":"integer"}],"values":[[40],[80]]}""")); + assertThat(Strings.toString(wrapAsToXContent(response), true, false), equalTo(""" + { + "id" : "id-123", + "is_running" : true, + "documents_found" : 10, + "values_loaded" : 99, + "columns" : [ + { + "name" : "foo", + "type" : "integer" + } + ], + "values" : [ + [ + 40 + ], + [ + 80 + ] + ] + }""")); } } @@ -656,6 +771,8 @@ public void testXContentOriginalTypes() { EsqlQueryResponse response = new EsqlQueryResponse( List.of(new ColumnInfoImpl("foo", "unsupported", List.of("foo", "bar"))), List.of(new Page(blockFactory.newConstantNullBlock(2))), + 1, + 1, null, false, null, @@ -664,8 +781,29 @@ public void testXContentOriginalTypes() { null ) ) { - assertThat(Strings.toString(response), equalTo(""" - {"columns":[{"name":"foo","type":"unsupported","original_types":["foo","bar"]}],"values":[[null],[null]]}""")); + assertThat(Strings.toString(wrapAsToXContent(response), true, false), equalTo(""" + { + "documents_found" : 1, + "values_loaded" : 1, + "columns" : [ + { + "name" : "foo", + "type" : "unsupported", + "original_types" : [ + "foo", + "bar" + ] + } + ], + "values" : [ + [ + null + ], + [ + null + ] + ] + }""")); } } @@ -674,6 +812,8 @@ public void testNullColumnsXContentDropNulls() { EsqlQueryResponse response = new EsqlQueryResponse( List.of(new ColumnInfoImpl("foo", "integer", null), new ColumnInfoImpl("all_null", "integer", null)), List.of(new Page(blockFactory.newIntArrayVector(new int[] { 40, 80 }, 2).asBlock(), blockFactory.newConstantNullBlock(2))), + 1, + 3, null, false, null, @@ -683,11 +823,41 @@ public void testNullColumnsXContentDropNulls() { ) ) { assertThat( - Strings.toString(wrapAsToXContent(response), new ToXContent.MapParams(Map.of(DROP_NULL_COLUMNS_OPTION, "true"))), - equalTo("{" + """ - "all_columns":[{"name":"foo","type":"integer"},{"name":"all_null","type":"integer"}],""" + """ - "columns":[{"name":"foo","type":"integer"}],""" + """ - "values":[[40],[80]]}""") + Strings.toString( + wrapAsToXContent(response), + new ToXContent.MapParams(Map.of(DROP_NULL_COLUMNS_OPTION, "true")), + true, + false + ), + equalTo(""" + { + "documents_found" : 1, + "values_loaded" : 3, + "all_columns" : [ + { + "name" : "foo", + "type" : "integer" + }, + { + "name" : "all_null", + "type" : "integer" + } + ], + "columns" : [ + { + "name" : "foo", + "type" : "integer" + } + ], + "values" : [ + [ + 40 + ], + [ + 80 + ] + ] + }""") ); } } @@ -704,6 +874,8 @@ public void testNullColumnsFromBuilderXContentDropNulls() { EsqlQueryResponse response = new EsqlQueryResponse( List.of(new ColumnInfoImpl("foo", "integer", null), new ColumnInfoImpl("all_null", "integer", null)), List.of(new Page(blockFactory.newIntArrayVector(new int[] { 40, 80 }, 2).asBlock(), b.build())), + 1, + 3, null, false, null, @@ -713,11 +885,41 @@ public void testNullColumnsFromBuilderXContentDropNulls() { ) ) { assertThat( - Strings.toString(wrapAsToXContent(response), new ToXContent.MapParams(Map.of(DROP_NULL_COLUMNS_OPTION, "true"))), - equalTo("{" + """ - "all_columns":[{"name":"foo","type":"integer"},{"name":"all_null","type":"integer"}],""" + """ - "columns":[{"name":"foo","type":"integer"}],""" + """ - "values":[[40],[80]]}""") + Strings.toString( + wrapAsToXContent(response), + new ToXContent.MapParams(Map.of(DROP_NULL_COLUMNS_OPTION, "true")), + true, + false + ), + equalTo(""" + { + "documents_found" : 1, + "values_loaded" : 3, + "all_columns" : [ + { + "name" : "foo", + "type" : "integer" + }, + { + "name" : "all_null", + "type" : "integer" + } + ], + "columns" : [ + { + "name" : "foo", + "type" : "integer" + } + ], + "values" : [ + [ + 40 + ], + [ + 80 + ] + ] + }""") ); } } @@ -731,6 +933,8 @@ private EsqlQueryResponse simple(boolean columnar, boolean async) { return new EsqlQueryResponse( List.of(new ColumnInfoImpl("foo", "integer", null)), List.of(new Page(blockFactory.newIntArrayVector(new int[] { 40, 80 }, 2).asBlock())), + 3, + 100, null, columnar, async, @@ -743,6 +947,8 @@ public void testProfileXContent() { EsqlQueryResponse response = new EsqlQueryResponse( List.of(new ColumnInfoImpl("foo", "integer", null)), List.of(new Page(blockFactory.newIntArrayVector(new int[] { 40, 80 }, 2).asBlock())), + 10, + 100, new EsqlQueryResponse.Profile( List.of( new DriverProfile( @@ -764,8 +970,10 @@ public void testProfileXContent() { null ); ) { - assertThat(Strings.toString(response, true, false), equalTo(""" + assertThat(Strings.toString(wrapAsToXContent(response), true, false), equalTo(""" { + "documents_found" : 10, + "values_loaded" : 100, "columns" : [ { "name" : "foo", @@ -790,6 +998,8 @@ public void testProfileXContent() { "stop_millis" : 1723489819929, "took_nanos" : 20021, "cpu_nanos" : 20000, + "documents_found" : 0, + "values_loaded" : 0, "iterations" : 12, "operators" : [ { @@ -827,7 +1037,7 @@ public void testColumns() { var longBlk2 = blockFactory.newLongArrayVector(new long[] { 300L, 400L, 500L }, 3).asBlock(); var columnInfo = List.of(new ColumnInfoImpl("foo", "integer", null), new ColumnInfoImpl("bar", "long", null)); var pages = List.of(new Page(intBlk1, longBlk1), new Page(intBlk2, longBlk2)); - try (var response = new EsqlQueryResponse(columnInfo, pages, null, false, null, false, false, null)) { + try (var response = new EsqlQueryResponse(columnInfo, pages, 0, 0, null, false, null, false, false, null)) { assertThat(columnValues(response.column(0)), contains(10, 20, 30, 40, 50)); assertThat(columnValues(response.column(1)), contains(100L, 200L, 300L, 400L, 500L)); expectThrows(IllegalArgumentException.class, () -> response.column(-1)); @@ -839,7 +1049,7 @@ public void testColumnsIllegalArg() { var intBlk1 = blockFactory.newIntArrayVector(new int[] { 10 }, 1).asBlock(); var columnInfo = List.of(new ColumnInfoImpl("foo", "integer", null)); var pages = List.of(new Page(intBlk1)); - try (var response = new EsqlQueryResponse(columnInfo, pages, null, false, null, false, false, null)) { + try (var response = new EsqlQueryResponse(columnInfo, pages, 0, 0, null, false, null, false, false, null)) { expectThrows(IllegalArgumentException.class, () -> response.column(-1)); expectThrows(IllegalArgumentException.class, () -> response.column(1)); } @@ -858,7 +1068,7 @@ public void testColumnsWithNull() { } var columnInfo = List.of(new ColumnInfoImpl("foo", "integer", null)); var pages = List.of(new Page(blk1), new Page(blk2), new Page(blk3)); - try (var response = new EsqlQueryResponse(columnInfo, pages, null, false, null, false, false, null)) { + try (var response = new EsqlQueryResponse(columnInfo, pages, 0, 0, null, false, null, false, false, null)) { assertThat(columnValues(response.column(0)), contains(10, null, 30, null, null, 60, null, 80, 90, null)); expectThrows(IllegalArgumentException.class, () -> response.column(-1)); expectThrows(IllegalArgumentException.class, () -> response.column(2)); @@ -878,7 +1088,7 @@ public void testColumnsWithMultiValue() { } var columnInfo = List.of(new ColumnInfoImpl("foo", "integer", null)); var pages = List.of(new Page(blk1), new Page(blk2), new Page(blk3)); - try (var response = new EsqlQueryResponse(columnInfo, pages, null, false, null, false, false, null)) { + try (var response = new EsqlQueryResponse(columnInfo, pages, 0, 0, null, false, null, false, false, null)) { assertThat(columnValues(response.column(0)), contains(List.of(10, 20), null, List.of(40, 50), null, 70, 80, null)); expectThrows(IllegalArgumentException.class, () -> response.column(-1)); expectThrows(IllegalArgumentException.class, () -> response.column(2)); @@ -891,7 +1101,7 @@ public void testRowValues() { List columns = randomList(numColumns, numColumns, this::randomColumnInfo); int noPages = randomIntBetween(1, 20); List pages = randomList(noPages, noPages, () -> randomPage(columns)); - try (var resp = new EsqlQueryResponse(columns, pages, null, false, "", false, false, null)) { + try (var resp = new EsqlQueryResponse(columns, pages, 0, 0, null, false, "", false, false, null)) { var rowValues = getValuesList(resp.rows()); var valValues = getValuesList(resp.values()); for (int i = 0; i < rowValues.size(); i++) { @@ -989,5 +1199,4 @@ static Page valuesToPage(BlockFactory blockFactory, List columns } return new Page(results.stream().map(Block.Builder::build).toArray(Block[]::new)); } - } diff --git a/x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/formatter/TextFormatTests.java b/x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/formatter/TextFormatTests.java index 723f6c89f62cd..9c45c998ac96a 100644 --- a/x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/formatter/TextFormatTests.java +++ b/x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/formatter/TextFormatTests.java @@ -246,7 +246,7 @@ public void testPlainTextEmptyCursorWithColumns() { public void testPlainTextEmptyCursorWithoutColumns() { assertEquals( StringUtils.EMPTY, - getTextBodyContent(PLAIN_TEXT.format(req(), new EsqlQueryResponse(emptyList(), emptyList(), null, false, false, null))) + getTextBodyContent(PLAIN_TEXT.format(req(), new EsqlQueryResponse(emptyList(), emptyList(), 0, 0, null, false, false, null))) ); } @@ -269,7 +269,16 @@ public void testTsvFormatWithDropNullColumns() { } private static EsqlQueryResponse emptyData() { - return new EsqlQueryResponse(singletonList(new ColumnInfoImpl("name", "keyword", null)), emptyList(), null, false, false, null); + return new EsqlQueryResponse( + singletonList(new ColumnInfoImpl("name", "keyword", null)), + emptyList(), + 0, + 0, + null, + false, + false, + null + ); } private static EsqlQueryResponse regularData() { @@ -303,7 +312,7 @@ private static EsqlQueryResponse regularData() { ) ); - return new EsqlQueryResponse(headers, values, null, false, false, null); + return new EsqlQueryResponse(headers, values, 0, 0, null, false, false, null); } private static EsqlQueryResponse escapedData() { @@ -327,7 +336,7 @@ private static EsqlQueryResponse escapedData() { ) ); - return new EsqlQueryResponse(headers, values, null, false, false, null); + return new EsqlQueryResponse(headers, values, 0, 0, null, false, false, null); } private static RestRequest req() { diff --git a/x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/formatter/TextFormatterTests.java b/x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/formatter/TextFormatterTests.java index ec9bb14d2a265..91456c4f44893 100644 --- a/x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/formatter/TextFormatterTests.java +++ b/x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/formatter/TextFormatterTests.java @@ -79,6 +79,8 @@ public class TextFormatterTests extends ESTestCase { blockFactory.newConstantNullBlock(2) ) ), + 0, + 0, null, randomBoolean(), randomBoolean(), @@ -181,6 +183,8 @@ public void testFormatWithoutHeader() { blockFactory.newConstantNullBlock(2) ) ), + 0, + 0, null, randomBoolean(), randomBoolean(), @@ -222,6 +226,8 @@ public void testVeryLongPadding() { .build() ) ), + 0, + 0, null, randomBoolean(), randomBoolean(), diff --git a/x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/plugin/ComputeListenerTests.java b/x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/plugin/ComputeListenerTests.java index 5ec78fd8efbee..cac20924ed3b4 100644 --- a/x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/plugin/ComputeListenerTests.java +++ b/x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/plugin/ComputeListenerTests.java @@ -13,6 +13,7 @@ import org.elasticsearch.common.breaker.CircuitBreaker; import org.elasticsearch.common.breaker.CircuitBreakingException; import org.elasticsearch.common.util.concurrent.ConcurrentCollections; +import org.elasticsearch.compute.operator.DriverCompletionInfo; import org.elasticsearch.compute.operator.DriverProfile; import org.elasticsearch.compute.operator.DriverSleeps; import org.elasticsearch.core.TimeValue; @@ -34,6 +35,7 @@ import java.util.concurrent.ExecutionException; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicInteger; +import java.util.concurrent.atomic.AtomicLong; import java.util.stream.Collectors; import java.util.stream.IntStream; @@ -56,7 +58,7 @@ public void shutdownTransportService() { terminate(threadPool); } - private List randomProfiles() { + private DriverCompletionInfo randomCompletionInfo() { int numProfiles = randomIntBetween(0, 2); List profiles = new ArrayList<>(numProfiles); for (int i = 0; i < numProfiles; i++) { @@ -75,20 +77,22 @@ private List randomProfiles() { ) ); } - return profiles; + return new DriverCompletionInfo(randomNonNegativeLong(), randomNonNegativeLong(), profiles); } public void testEmpty() { - PlainActionFuture> results = new PlainActionFuture<>(); + PlainActionFuture results = new PlainActionFuture<>(); try (var ignored = new ComputeListener(threadPool, () -> {}, results)) { assertFalse(results.isDone()); } assertTrue(results.isDone()); - assertThat(results.actionGet(10, TimeUnit.SECONDS), empty()); + assertThat(results.actionGet(10, TimeUnit.SECONDS).collectedProfiles(), empty()); } public void testCollectComputeResults() { - PlainActionFuture> future = new PlainActionFuture<>(); + PlainActionFuture future = new PlainActionFuture<>(); + long documentsFound = 0; + long valuesLoaded = 0; List allProfiles = new ArrayList<>(); AtomicInteger onFailure = new AtomicInteger(); try (var computeListener = new ComputeListener(threadPool, onFailure::incrementAndGet, future)) { @@ -102,20 +106,24 @@ public void testCollectComputeResults() { threadPool.generic() ); } else { - var profiles = randomProfiles(); - allProfiles.addAll(profiles); - ActionListener> subListener = computeListener.acquireCompute(); + var info = randomCompletionInfo(); + documentsFound += info.documentsFound(); + valuesLoaded += info.valuesLoaded(); + allProfiles.addAll(info.collectedProfiles()); + ActionListener subListener = computeListener.acquireCompute(); threadPool.schedule( - ActionRunnable.wrap(subListener, l -> l.onResponse(profiles)), + ActionRunnable.wrap(subListener, l -> l.onResponse(info)), TimeValue.timeValueNanos(between(0, 100)), threadPool.generic() ); } } } - List profiles = future.actionGet(10, TimeUnit.SECONDS); + DriverCompletionInfo actual = future.actionGet(10, TimeUnit.SECONDS); + assertThat(actual.documentsFound(), equalTo(documentsFound)); + assertThat(actual.valuesLoaded(), equalTo(valuesLoaded)); assertThat( - profiles.stream().collect(Collectors.toMap(p -> p, p -> 1, Integer::sum)), + actual.collectedProfiles().stream().collect(Collectors.toMap(p -> p, p -> 1, Integer::sum)), equalTo(allProfiles.stream().collect(Collectors.toMap(p -> p, p -> 1, Integer::sum))) ); assertThat(onFailure.get(), equalTo(0)); @@ -129,13 +137,13 @@ public void testCancelOnFailure() { ); int successTasks = between(1, 50); int failedTasks = between(1, 100); - PlainActionFuture> rootListener = new PlainActionFuture<>(); + PlainActionFuture rootListener = new PlainActionFuture<>(); final AtomicInteger onFailure = new AtomicInteger(); try (var computeListener = new ComputeListener(threadPool, onFailure::incrementAndGet, rootListener)) { for (int i = 0; i < successTasks; i++) { - ActionListener> subListener = computeListener.acquireCompute(); + ActionListener subListener = computeListener.acquireCompute(); threadPool.schedule( - ActionRunnable.wrap(subListener, l -> l.onResponse(randomProfiles())), + ActionRunnable.wrap(subListener, l -> l.onResponse(randomCompletionInfo())), TimeValue.timeValueNanos(between(0, 100)), threadPool.generic() ); @@ -160,13 +168,17 @@ public void testCancelOnFailure() { } public void testCollectWarnings() throws Exception { + AtomicLong documentsFound = new AtomicLong(); + AtomicLong valuesLoaded = new AtomicLong(); List allProfiles = new ArrayList<>(); Map> allWarnings = new HashMap<>(); - ActionListener> rootListener = new ActionListener<>() { + ActionListener rootListener = new ActionListener<>() { @Override - public void onResponse(List result) { + public void onResponse(DriverCompletionInfo result) { + assertThat(result.documentsFound(), equalTo(documentsFound.get())); + assertThat(result.valuesLoaded(), equalTo(valuesLoaded.get())); assertThat( - result.stream().collect(Collectors.toMap(p -> p, p -> 1, Integer::sum)), + result.collectedProfiles().stream().collect(Collectors.toMap(p -> p, p -> 1, Integer::sum)), equalTo(allProfiles.stream().collect(Collectors.toMap(p -> p, p -> 1, Integer::sum))) ); Map> responseHeaders = threadPool.getThreadContext() @@ -201,8 +213,10 @@ public void onFailure(Exception e) { threadPool.generic() ); } else { - var resp = randomProfiles(); - allProfiles.addAll(resp); + var resp = randomCompletionInfo(); + documentsFound.addAndGet(resp.documentsFound()); + valuesLoaded.addAndGet(resp.valuesLoaded()); + allProfiles.addAll(resp.collectedProfiles()); int numWarnings = randomIntBetween(1, 5); Map warnings = new HashMap<>(); for (int i = 0; i < numWarnings; i++) { diff --git a/x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/plugin/DataNodeRequestSenderTests.java b/x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/plugin/DataNodeRequestSenderTests.java index 0ca5d8f79ca8d..ae09d270d6f3c 100644 --- a/x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/plugin/DataNodeRequestSenderTests.java +++ b/x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/plugin/DataNodeRequestSenderTests.java @@ -22,6 +22,7 @@ import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.util.concurrent.ConcurrentCollections; import org.elasticsearch.common.util.concurrent.EsExecutors; +import org.elasticsearch.compute.operator.DriverCompletionInfo; import org.elasticsearch.compute.test.ComputeTestCase; import org.elasticsearch.index.Index; import org.elasticsearch.index.query.QueryBuilder; @@ -123,7 +124,7 @@ public void testOnePass() { Queue sent = ConcurrentCollections.newQueue(); var future = sendRequests(targetShards, randomBoolean(), -1, (node, shardIds, aliasFilters, listener) -> { sent.add(nodeRequest(node, shardIds)); - runWithDelay(() -> listener.onResponse(new DataNodeComputeResponse(List.of(), Map.of()))); + runWithDelay(() -> listener.onResponse(new DataNodeComputeResponse(DriverCompletionInfo.EMPTY, Map.of()))); }); safeGet(future); assertThat(sent.size(), equalTo(2)); @@ -142,7 +143,7 @@ public void testMissingShards() { var targetShards = List.of(targetShard(shard1, node1), targetShard(shard3), targetShard(shard4, node2, node3)); var future = sendRequests(targetShards, true, -1, (node, shardIds, aliasFilters, listener) -> { assertThat(shard3, not(in(shardIds))); - runWithDelay(() -> listener.onResponse(new DataNodeComputeResponse(List.of(), Map.of()))); + runWithDelay(() -> listener.onResponse(new DataNodeComputeResponse(DriverCompletionInfo.EMPTY, Map.of()))); }); ComputeResponse resp = safeGet(future); assertThat(resp.totalShards, equalTo(3)); @@ -173,7 +174,7 @@ public void testRetryThenSuccess() { if (node.equals(node4) && shardIds.contains(shard2)) { failures.put(shard2, new IOException("test")); } - runWithDelay(() -> listener.onResponse(new DataNodeComputeResponse(List.of(), failures))); + runWithDelay(() -> listener.onResponse(new DataNodeComputeResponse(DriverCompletionInfo.EMPTY, failures))); }); try { future.actionGet(1, TimeUnit.MINUTES); @@ -203,7 +204,7 @@ public void testRetryButFail() { if (shardIds.contains(shard5)) { failures.put(shard5, new IOException("test failure for shard5")); } - runWithDelay(() -> listener.onResponse(new DataNodeComputeResponse(List.of(), failures))); + runWithDelay(() -> listener.onResponse(new DataNodeComputeResponse(DriverCompletionInfo.EMPTY, failures))); }); var error = expectThrows(Exception.class, future::actionGet); assertNotNull(ExceptionsHelper.unwrap(error, IOException.class)); @@ -226,7 +227,7 @@ public void testDoNotRetryOnRequestLevelFailure() { if (node1.equals(node) && failed.compareAndSet(false, true)) { runWithDelay(() -> listener.onFailure(new IOException("test request level failure"), true)); } else { - runWithDelay(() -> listener.onResponse(new DataNodeComputeResponse(List.of(), Map.of()))); + runWithDelay(() -> listener.onResponse(new DataNodeComputeResponse(DriverCompletionInfo.EMPTY, Map.of()))); } }); Exception exception = expectThrows(Exception.class, future::actionGet); @@ -245,7 +246,7 @@ public void testAllowPartialResults() { if (node1.equals(node) && failed.compareAndSet(false, true)) { runWithDelay(() -> listener.onFailure(new IOException("test request level failure"), true)); } else { - runWithDelay(() -> listener.onResponse(new DataNodeComputeResponse(List.of(), Map.of()))); + runWithDelay(() -> listener.onResponse(new DataNodeComputeResponse(DriverCompletionInfo.EMPTY, Map.of()))); } }); var response = safeGet(future); @@ -265,7 +266,7 @@ public void testNonFatalErrorIsRetriedOnAnotherShard() { if (Objects.equals(node1, node)) { runWithDelay(() -> listener.onFailure(new RuntimeException("test request level non fatal failure"), false)); } else { - runWithDelay(() -> listener.onResponse(new DataNodeComputeResponse(List.of(), Map.of()))); + runWithDelay(() -> listener.onResponse(new DataNodeComputeResponse(DriverCompletionInfo.EMPTY, Map.of()))); } })); assertThat(response.totalShards, equalTo(1)); @@ -323,7 +324,7 @@ public void testLimitConcurrentNodes() { sent.add(nodeRequest(node, shardIds)); runWithDelay(() -> { concurrentRequests.decrementAndGet(); - listener.onResponse(new DataNodeComputeResponse(List.of(), Map.of())); + listener.onResponse(new DataNodeComputeResponse(DriverCompletionInfo.EMPTY, Map.of())); }); })); assertThat(sent.size(), equalTo(5)); @@ -346,7 +347,7 @@ public void testSkipNodes() { var response = safeGet(sendRequests(targetShards, randomBoolean(), 1, (node, shardIds, aliasFilters, listener) -> { runWithDelay(() -> { if (processed.incrementAndGet() == 1) { - listener.onResponse(new DataNodeComputeResponse(List.of(), Map.of())); + listener.onResponse(new DataNodeComputeResponse(DriverCompletionInfo.EMPTY, Map.of())); } else { listener.onSkip(); } @@ -368,7 +369,7 @@ public void testSkipRemovesPriorNonFatalErrors() { if (Objects.equals(node.getId(), node1.getId()) && shardIds.equals(List.of(shard1))) { listener.onFailure(new RuntimeException("test request level non fatal failure"), false); } else if (Objects.equals(node.getId(), node3.getId()) && shardIds.equals(List.of(shard2))) { - listener.onResponse(new DataNodeComputeResponse(List.of(), Map.of())); + listener.onResponse(new DataNodeComputeResponse(DriverCompletionInfo.EMPTY, Map.of())); } else if (Objects.equals(node.getId(), node2.getId()) && shardIds.equals(List.of(shard1))) { listener.onSkip(); } @@ -393,7 +394,7 @@ public void testQueryHotShardsFirst() { var sent = Collections.synchronizedList(new ArrayList()); safeGet(sendRequests(targetShards, randomBoolean(), -1, (node, shardIds, aliasFilters, listener) -> { sent.add(node.getId()); - runWithDelay(() -> listener.onResponse(new DataNodeComputeResponse(List.of(), Map.of()))); + runWithDelay(() -> listener.onResponse(new DataNodeComputeResponse(DriverCompletionInfo.EMPTY, Map.of()))); })); assertThat(sent, equalTo(List.of("node-1", "node-2", "node-3", "node-4"))); } @@ -406,7 +407,7 @@ public void testQueryHotShardsFirstWhenIlmMovesShard() { var sent = ConcurrentCollections.newQueue(); safeGet(sendRequests(targetShards, randomBoolean(), -1, (node, shardIds, aliasFilters, listener) -> { sent.add(nodeRequest(node, shardIds)); - runWithDelay(() -> listener.onResponse(new DataNodeComputeResponse(List.of(), Map.of()))); + runWithDelay(() -> listener.onResponse(new DataNodeComputeResponse(DriverCompletionInfo.EMPTY, Map.of()))); })); assertThat(take(sent, 1), containsInAnyOrder(nodeRequest(node1, shard1))); assertThat(take(sent, 1), anyOf(contains(nodeRequest(node2, shard2)), contains(nodeRequest(warmNode2, shard2)))); diff --git a/x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/querylog/EsqlQueryLogTests.java b/x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/querylog/EsqlQueryLogTests.java index add3bf77efb00..b23517dd14088 100644 --- a/x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/querylog/EsqlQueryLogTests.java +++ b/x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/querylog/EsqlQueryLogTests.java @@ -15,6 +15,7 @@ import org.elasticsearch.common.logging.Loggers; import org.elasticsearch.common.settings.ClusterSettings; import org.elasticsearch.common.settings.Settings; +import org.elasticsearch.compute.operator.DriverCompletionInfo; import org.elasticsearch.core.TimeValue; import org.elasticsearch.index.IndexSettings; import org.elasticsearch.index.SlowLogFieldProvider; @@ -105,7 +106,7 @@ public void testPrioritiesOnSuccess() { for (int i = 0; i < actualTook.length; i++) { EsqlExecutionInfo warnQuery = getEsqlExecutionInfo(actualTook[i], actualPlanningTook[i]); - queryLog.onQueryPhase(new Result(List.of(), List.of(), List.of(), warnQuery), query); + queryLog.onQueryPhase(new Result(List.of(), List.of(), DriverCompletionInfo.EMPTY, warnQuery), query); if (expectedLevel[i] != null) { assertThat(appender.lastEvent(), is(not(nullValue()))); var msg = (ESLogMessage) appender.lastMessage(); diff --git a/x-pack/plugin/src/yamlRestTest/resources/rest-api-spec/test/esql/10_basic.yml b/x-pack/plugin/src/yamlRestTest/resources/rest-api-spec/test/esql/10_basic.yml index 96145e84ad2cd..99beec034b0f7 100644 --- a/x-pack/plugin/src/yamlRestTest/resources/rest-api-spec/test/esql/10_basic.yml +++ b/x-pack/plugin/src/yamlRestTest/resources/rest-api-spec/test/esql/10_basic.yml @@ -169,7 +169,7 @@ setup: - match: {values.0.1: 40} --- -"Basic ESQL query": +basic: - do: esql.query: body: @@ -181,12 +181,66 @@ setup: - match: {values.0: [1, 1]} --- -"Test From Eval Sort Limit": +basic with documents_found: + - requires: + test_runner_features: [capabilities, contains] + capabilities: + - method: POST + path: /_query + parameters: [] + capabilities: [documents_found_and_values_loaded] + reason: "checks for documents_found and values_loaded" + + - do: + esql.query: + body: + query: 'from test | keep data | sort data | limit 2' + columnar: true + + - match: {documents_found: 10} # two documents per shard + - match: {values_loaded: 10} # one per document + - match: {columns.0.name: "data"} + - match: {columns.0.type: "long"} + - match: {values.0: [1, 1]} + +--- +FROM EVAL SORT LIMIT: + - do: + esql.query: + body: + query: 'from test | eval x = count + 7 | sort x | limit 1' + + - match: {columns.0.name: "color"} + - match: {columns.1.name: "count"} + - match: {columns.2.name: "count_d"} + - match: {columns.3.name: "data"} + - match: {columns.4.name: "data_d"} + - match: {columns.5.name: "time"} + - match: {columns.6.name: "x"} + - match: {values.0.6: 47} + - length: {values: 1} + +--- +FROM EVAL SORT LIMIT with documents_found: + - requires: + test_runner_features: [capabilities, contains] + capabilities: + - method: POST + path: /_query + parameters: [] + capabilities: [documents_found_and_values_loaded] + reason: "checks for documents_found and values_loaded" + - do: esql.query: body: query: 'from test | eval x = count + 7 | sort x | limit 1' + - match: {documents_found: 40} + # We can't be sure quite how many values we'll load. It's at least + # one per document in the index. And one per top document. But we + # might load more values because we run in more threads. + - gte: {values_loaded: 45} - match: {columns.0.name: "color"} - match: {columns.1.name: "count"} - match: {columns.2.name: "count_d"} diff --git a/x-pack/plugin/src/yamlRestTest/resources/rest-api-spec/test/esql/120_profile.yml b/x-pack/plugin/src/yamlRestTest/resources/rest-api-spec/test/esql/120_profile.yml index 17034de677b8d..a7b0c48b93ebe 100644 --- a/x-pack/plugin/src/yamlRestTest/resources/rest-api-spec/test/esql/120_profile.yml +++ b/x-pack/plugin/src/yamlRestTest/resources/rest-api-spec/test/esql/120_profile.yml @@ -1,9 +1,5 @@ --- setup: - - requires: - cluster_features: ["gte_v8.12.0"] - reason: "profile option added in 8.12" - test_runner_features: warnings - do: indices.create: index: test @@ -140,3 +136,29 @@ avg 8.14 or after: - gte: {profile.drivers.1.took_nanos: 0} - gte: {profile.drivers.1.cpu_nanos: 0} # It's hard to assert much about these because they don't come back in any particular order. + +--- +documents found: + - requires: + test_runner_features: [capabilities, contains] + capabilities: + - method: POST + path: /_query + parameters: [] + capabilities: [documents_found_and_values_loaded] + reason: "checks for documents_found and values_loaded" + + - do: + esql.query: + body: + query: 'FROM test | LIMIT 1' + profile: true + + - length: {profile.drivers: 3} + - match: {profile.drivers.0.operators.0.operator: /ExchangeSourceOperator|LuceneSourceOperator.+/} + - gte: {profile.drivers.0.documents_found: 0} + - gte: {profile.drivers.0.values_loaded: 0} + - gte: {profile.drivers.1.documents_found: 0} + - gte: {profile.drivers.1.values_loaded: 0} + - gte: {profile.drivers.2.documents_found: 0} + - gte: {profile.drivers.2.values_loaded: 0}