-
Notifications
You must be signed in to change notification settings - Fork 25.6k
ESQL: Add documents_found and values_loaded
#125631
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from 9 commits
90de284
4875c5e
fb36e7e
b5bf239
7d306c0
ce44024
aebd712
75cef17
12aafc4
62b5008
add4368
9335c3e
5067e28
4d6e8db
fa3487e
28e9fa4
62a6d3e
4201ae5
3011344
79f7c39
fbbe29c
0953790
25ec61d
a2a9a9e
4e3136a
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,5 @@ | ||
| pr: 125631 | ||
| summary: Add `documents_found` and `values_loaded` | ||
| area: ES|QL | ||
| type: enhancement | ||
| issues: [] |
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -47,6 +47,8 @@ | |
| import java.util.function.IntFunction; | ||
| import java.util.function.Supplier; | ||
|
|
||
| import static org.elasticsearch.TransportVersions.ESQL_VALUES_LOADED; | ||
|
|
||
| /** | ||
| * Operator that extracts doc_values from a Lucene index out of pages that have been produced by {@link LuceneSourceOperator} | ||
| * and outputs them to a new column. | ||
|
|
@@ -113,6 +115,7 @@ public record ShardContext(IndexReader reader, Supplier<SourceLoader> newSourceL | |
| private final BlockFactory blockFactory; | ||
|
|
||
| private final Map<String, Integer> readersBuilt = new TreeMap<>(); | ||
| private long valuesLoaded; | ||
|
|
||
| int lastShard = -1; | ||
| int lastSegment = -1; | ||
|
|
@@ -165,6 +168,9 @@ public int get(int i) { | |
| } | ||
| } | ||
| success = true; | ||
| for (Block b : blocks) { | ||
| valuesLoaded += b.getTotalValueCount(); | ||
| } | ||
| return page.appendBlocks(blocks); | ||
| } catch (IOException e) { | ||
| throw new UncheckedIOException(e); | ||
|
|
@@ -547,7 +553,7 @@ public String toString() { | |
|
|
||
| @Override | ||
| protected Status status(long processNanos, int pagesProcessed, long rowsReceived, long rowsEmitted) { | ||
| return new Status(new TreeMap<>(readersBuilt), processNanos, pagesProcessed, rowsReceived, rowsEmitted); | ||
| return new Status(new TreeMap<>(readersBuilt), processNanos, pagesProcessed, rowsReceived, rowsEmitted, valuesLoaded); | ||
| } | ||
|
|
||
| public static class Status extends AbstractPageMappingOperator.Status { | ||
|
|
@@ -558,21 +564,38 @@ public static class Status extends AbstractPageMappingOperator.Status { | |
| ); | ||
|
|
||
| private final Map<String, Integer> readersBuilt; | ||
|
|
||
| Status(Map<String, Integer> readersBuilt, long processNanos, int pagesProcessed, long rowsReceived, long rowsEmitted) { | ||
| private final long valuesLoaded; | ||
|
|
||
| Status( | ||
| Map<String, Integer> readersBuilt, | ||
| long processNanos, | ||
| int pagesProcessed, | ||
| long rowsReceived, | ||
| long rowsEmitted, | ||
| long valuesLoaded | ||
| ) { | ||
| super(processNanos, pagesProcessed, rowsReceived, rowsEmitted); | ||
| this.readersBuilt = readersBuilt; | ||
| this.valuesLoaded = valuesLoaded; | ||
| } | ||
|
|
||
| Status(StreamInput in) throws IOException { | ||
| super(in); | ||
| readersBuilt = in.readOrderedMap(StreamInput::readString, StreamInput::readVInt); | ||
| if (in.getTransportVersion().onOrAfter(ESQL_VALUES_LOADED)) { | ||
|
||
| valuesLoaded = in.readVLong(); | ||
| } else { | ||
| valuesLoaded = 0; | ||
| } | ||
| } | ||
|
|
||
| @Override | ||
| public void writeTo(StreamOutput out) throws IOException { | ||
| super.writeTo(out); | ||
| out.writeMap(readersBuilt, StreamOutput::writeVInt); | ||
| if (out.getTransportVersion().onOrAfter(ESQL_VALUES_LOADED)) { | ||
| out.writeVLong(valuesLoaded); | ||
| } | ||
| } | ||
|
|
||
| @Override | ||
|
|
@@ -584,6 +607,11 @@ public Map<String, Integer> readersBuilt() { | |
| return readersBuilt; | ||
| } | ||
|
|
||
| @Override | ||
| public long valuesLoaded() { | ||
| return valuesLoaded; | ||
| } | ||
|
|
||
| @Override | ||
| public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException { | ||
| builder.startObject(); | ||
|
|
@@ -592,6 +620,7 @@ public XContentBuilder toXContent(XContentBuilder builder, Params params) throws | |
| builder.field(e.getKey(), e.getValue()); | ||
| } | ||
| builder.endObject(); | ||
| builder.field("values_loaded", valuesLoaded); | ||
| innerToXContent(builder); | ||
| return builder.endObject(); | ||
| } | ||
|
|
@@ -600,12 +629,12 @@ public XContentBuilder toXContent(XContentBuilder builder, Params params) throws | |
| public boolean equals(Object o) { | ||
| if (super.equals(o) == false) return false; | ||
| Status status = (Status) o; | ||
| return readersBuilt.equals(status.readersBuilt); | ||
| return readersBuilt.equals(status.readersBuilt) && valuesLoaded == status.valuesLoaded; | ||
| } | ||
|
|
||
| @Override | ||
| public int hashCode() { | ||
| return Objects.hash(super.hashCode(), readersBuilt); | ||
| return Objects.hash(super.hashCode(), readersBuilt, valuesLoaded); | ||
| } | ||
|
|
||
| @Override | ||
|
|
@@ -704,6 +733,4 @@ public BlockLoader.AggregateMetricDoubleBuilder aggregateMetricDoubleBuilder(int | |
| return factory.newAggregateMetricDoubleBlockBuilder(count); | ||
| } | ||
| } | ||
|
|
||
| // TODO tests that mix source loaded fields and doc values in the same block | ||
| } | ||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,114 @@ | ||
| /* | ||
| * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one | ||
| * or more contributor license agreements. Licensed under the Elastic License | ||
| * 2.0; you may not use this file except in compliance with the Elastic License | ||
| * 2.0. | ||
| */ | ||
|
|
||
| package org.elasticsearch.compute.operator; | ||
|
|
||
| import org.elasticsearch.common.io.stream.StreamInput; | ||
| import org.elasticsearch.common.io.stream.StreamOutput; | ||
| import org.elasticsearch.common.io.stream.Writeable; | ||
|
|
||
| import java.io.IOException; | ||
| import java.util.ArrayList; | ||
| import java.util.Collections; | ||
| import java.util.List; | ||
| import java.util.concurrent.atomic.AtomicLong; | ||
|
|
||
| /** | ||
| * Information returned when one of more {@link Driver}s is completed. | ||
| * @param documentsFound The number of documents found by all lucene queries performed by these drivers. | ||
| * @param valuesLoaded The number of values loaded from lucene for all drivers. | ||
|
||
| * @param collectedProfiles {@link DriverProfile}s from each driver. These are fairly cheap to build but | ||
| * not free so this will be empty if the {@code profile} option was not set in | ||
| * the request. | ||
| */ | ||
| public record DriverCompletionInfo(long documentsFound, long valuesLoaded, List<DriverProfile> collectedProfiles) implements Writeable { | ||
|
|
||
| /** | ||
| * Completion info we use when we didn't properly complete any drivers. | ||
| * Usually this is returned with an error, but it's also used when receiving | ||
| * responses from very old nodes. | ||
| */ | ||
| public static final DriverCompletionInfo EMPTY = new DriverCompletionInfo(0, 0, List.of()); | ||
|
|
||
| /** | ||
| * Build a {@link DriverCompletionInfo} for many drivers including their profile output. | ||
| */ | ||
| public static DriverCompletionInfo includingProfiles(List<Driver> drivers) { | ||
| long documentsFound = 0; | ||
| long valuesLoaded = 0; | ||
| List<DriverProfile> collectedProfiles = new ArrayList<>(drivers.size()); | ||
| for (Driver d : drivers) { | ||
| DriverProfile p = d.profile(); | ||
| for (OperatorStatus o : p.operators()) { | ||
| documentsFound += o.documentsFound(); | ||
| valuesLoaded += o.valuesLoaded(); | ||
| } | ||
| collectedProfiles.add(p); | ||
| } | ||
| return new DriverCompletionInfo(documentsFound, valuesLoaded, collectedProfiles); | ||
| } | ||
|
|
||
| /** | ||
| * Build a {@link DriverCompletionInfo} for many drivers excluding their profile output. | ||
| */ | ||
| public static DriverCompletionInfo excludingProfiles(List<Driver> drivers) { | ||
| long documentsFound = 0; | ||
GalLalouche marked this conversation as resolved.
Show resolved
Hide resolved
|
||
| long valuesLoaded = 0; | ||
| for (Driver d : drivers) { | ||
| DriverStatus s = d.status(); | ||
| assert s.status() == DriverStatus.Status.DONE; | ||
| for (OperatorStatus o : s.completedOperators()) { | ||
| documentsFound += o.documentsFound(); | ||
| valuesLoaded += o.valuesLoaded(); | ||
| } | ||
| } | ||
| return new DriverCompletionInfo(documentsFound, valuesLoaded, List.of()); | ||
| } | ||
|
|
||
| public DriverCompletionInfo(StreamInput in) throws IOException { | ||
| this(in.readVLong(), in.readVLong(), in.readCollectionAsImmutableList(DriverProfile::readFrom)); | ||
| } | ||
|
|
||
| @Override | ||
| public void writeTo(StreamOutput out) throws IOException { | ||
| out.writeVLong(documentsFound); | ||
| out.writeVLong(valuesLoaded); | ||
| out.writeCollection(collectedProfiles, (o, v) -> v.writeTo(o)); | ||
| } | ||
|
|
||
| public static class Accumulator { | ||
| private long documentsFound; | ||
| private long valuesLoaded; | ||
| private final List<DriverProfile> collectedProfiles = new ArrayList<>(); | ||
|
|
||
| public void accumulate(DriverCompletionInfo info) { | ||
| this.documentsFound += info.documentsFound; | ||
| this.valuesLoaded += info.valuesLoaded; | ||
| this.collectedProfiles.addAll(info.collectedProfiles); | ||
| } | ||
|
|
||
| public DriverCompletionInfo finish() { | ||
| return new DriverCompletionInfo(documentsFound, valuesLoaded, collectedProfiles); | ||
| } | ||
| } | ||
|
|
||
| public static class AtomicAccumulator { | ||
| private final AtomicLong documentsFound = new AtomicLong(); | ||
| private final AtomicLong valuesLoaded = new AtomicLong(); | ||
| private final List<DriverProfile> collectedProfiles = Collections.synchronizedList(new ArrayList<>()); | ||
|
|
||
| public void accumulate(DriverCompletionInfo info) { | ||
| this.documentsFound.addAndGet(info.documentsFound); | ||
| this.valuesLoaded.addAndGet(info.valuesLoaded); | ||
| this.collectedProfiles.addAll(info.collectedProfiles); | ||
| } | ||
|
|
||
| public DriverCompletionInfo finish() { | ||
| return new DriverCompletionInfo(documentsFound.get(), valuesLoaded.get(), collectedProfiles); | ||
| } | ||
| } | ||
| } | ||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -124,6 +124,8 @@ public XContentBuilder toXContent(XContentBuilder builder, Params params) throws | |
| if (builder.humanReadable()) { | ||
| builder.field("cpu_time", TimeValue.timeValueNanos(cpuNanos)); | ||
| } | ||
| builder.field("documents_found", documentsFound()); | ||
| builder.field("values_loaded", valuesLoaded()); | ||
| builder.field("iterations", iterations); | ||
| builder.field("status", status, params); | ||
| builder.startArray("completed_operators"); | ||
|
|
@@ -145,6 +147,24 @@ public String toString() { | |
| return Strings.toString(this); | ||
| } | ||
|
|
||
| /** | ||
| * The number of documents found by this driver. | ||
| */ | ||
| public long documentsFound() { | ||
| long documentsFound = completedOperators.stream().mapToLong(OperatorStatus::documentsFound).sum(); | ||
|
||
| documentsFound += activeOperators.stream().mapToLong(OperatorStatus::documentsFound).sum(); | ||
| return documentsFound; | ||
| } | ||
|
|
||
| /** | ||
| * The number of values loaded by this operator. | ||
| */ | ||
| public long valuesLoaded() { | ||
| long valuesLoaded = completedOperators.stream().mapToLong(OperatorStatus::valuesLoaded).sum(); | ||
| valuesLoaded += activeOperators.stream().mapToLong(OperatorStatus::valuesLoaded).sum(); | ||
| return valuesLoaded; | ||
| } | ||
|
|
||
| public enum Status implements Writeable, ToXContentFragment { | ||
| QUEUED, | ||
| STARTING, | ||
|
|
||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -105,5 +105,21 @@ interface OperatorFactory extends Describable { | |
| /** | ||
| * Status of an {@link Operator} to be returned by the tasks API. | ||
| */ | ||
| interface Status extends ToXContentObject, VersionedNamedWriteable {} | ||
| interface Status extends ToXContentObject, VersionedNamedWriteable { | ||
| /** | ||
| * The number of documents found by this operator. Most operators | ||
| * don't find documents and will return {@code 0} here. | ||
| */ | ||
| default long documentsFound() { | ||
| return 0; | ||
| } | ||
|
|
||
| /** | ||
| * The number of values loaded by this operator. Most operators | ||
| * don't load values and will return {@code 0} here. | ||
| */ | ||
| default long valuesLoaded() { | ||
| return 0; | ||
| } | ||
| } | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Not related to this change, but I think it is worth revisiting operator status. I believe we should replace it with a simple map (or maybe a wrapper on top of the map). |
||
| } | ||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Should this be changed to
long, or are we not worried about overflows here?There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Overflow would be a bug in planning. That'd be us using billions of values in a single block....