Skip to content
Merged
Show file tree
Hide file tree
Changes from 8 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -281,6 +281,7 @@ static TransportVersion def(int id) {
public static final TransportVersion IDP_CUSTOM_SAML_ATTRIBUTES = def(9_087_0_00);
public static final TransportVersion JOIN_ON_ALIASES = def(9_088_0_00);
public static final TransportVersion ILM_ADD_SKIP_SETTING = def(9_089_0_00);
public static final TransportVersion ESQL_PROFILE_INCLUDE_PLAN = def(9_090_0_00);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I had said to auto-backport this. but it needs a transport version change. I should have know it does.

I think it's still worth it. Imagine someone giving us an 8.19 profile a year from now - we'll be happy that you added the plan....


/*
* STOP! READ THIS FIRST! No, really,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2713,9 +2713,11 @@ public static Request newXContentRequest(HttpMethod method, String endpoint, ToX
}

protected static MapMatcher getProfileMatcher() {
return matchesMap().entry("query", instanceOf(Map.class))
return matchesMap() //
.entry("query", instanceOf(Map.class))
.entry("planning", instanceOf(Map.class))
.entry("drivers", instanceOf(List.class));
.entry("drivers", instanceOf(List.class))
.entry("plans", instanceOf(List.class));
}

protected static MapMatcher getResultMatcher(boolean includeMetadata, boolean includePartial, boolean includeDocumentsFound) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@

package org.elasticsearch.compute.operator;

import org.elasticsearch.TransportVersions;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.common.io.stream.Writeable;
Expand All @@ -24,23 +25,34 @@
* <strong>roughly</strong> the number of documents times the number of
* fields per document. Except {@code null} values don't count.
* And multivalued fields count as many times as there are values.
* @param collectedProfiles {@link DriverProfile}s from each driver. These are fairly cheap to build but
* @param driverProfiles {@link DriverProfile}s from each driver. These are fairly cheap to build but
* not free so this will be empty if the {@code profile} option was not set in
* the request.
*/
public record DriverCompletionInfo(long documentsFound, long valuesLoaded, List<DriverProfile> collectedProfiles) implements Writeable {
public record DriverCompletionInfo(
long documentsFound,
long valuesLoaded,
List<DriverProfile> driverProfiles,
List<PlanProfile> planProfiles
) implements Writeable {

/**
* Completion info we use when we didn't properly complete any drivers.
* Usually this is returned with an error, but it's also used when receiving
* responses from very old nodes.
*/
public static final DriverCompletionInfo EMPTY = new DriverCompletionInfo(0, 0, List.of());
public static final DriverCompletionInfo EMPTY = new DriverCompletionInfo(0, 0, List.of(), List.of());

/**
* Build a {@link DriverCompletionInfo} for many drivers including their profile output.
*/
public static DriverCompletionInfo includingProfiles(List<Driver> drivers) {
public static DriverCompletionInfo includingProfiles(
List<Driver> drivers,
String description,
String clusterName,
String nodeName,
String planTree
) {
long documentsFound = 0;
long valuesLoaded = 0;
List<DriverProfile> collectedProfiles = new ArrayList<>(drivers.size());
Expand All @@ -52,7 +64,12 @@ public static DriverCompletionInfo includingProfiles(List<Driver> drivers) {
}
collectedProfiles.add(p);
}
return new DriverCompletionInfo(documentsFound, valuesLoaded, collectedProfiles);
return new DriverCompletionInfo(
documentsFound,
valuesLoaded,
collectedProfiles,
List.of(new PlanProfile(description, clusterName, nodeName, planTree))
);
}

/**
Expand All @@ -69,49 +86,63 @@ public static DriverCompletionInfo excludingProfiles(List<Driver> drivers) {
valuesLoaded += o.valuesLoaded();
}
}
return new DriverCompletionInfo(documentsFound, valuesLoaded, List.of());
return new DriverCompletionInfo(documentsFound, valuesLoaded, List.of(), List.of());
}

public DriverCompletionInfo(StreamInput in) throws IOException {
this(in.readVLong(), in.readVLong(), in.readCollectionAsImmutableList(DriverProfile::readFrom));
public static DriverCompletionInfo readFrom(StreamInput in) throws IOException {
return new DriverCompletionInfo(
in.readVLong(),
in.readVLong(),
in.readCollectionAsImmutableList(DriverProfile::readFrom),
in.getTransportVersion().onOrAfter(TransportVersions.ESQL_PROFILE_INCLUDE_PLAN)
? in.readCollectionAsImmutableList(PlanProfile::readFrom)
: List.of()
);
}

@Override
public void writeTo(StreamOutput out) throws IOException {
out.writeVLong(documentsFound);
out.writeVLong(valuesLoaded);
out.writeCollection(collectedProfiles, (o, v) -> v.writeTo(o));
out.writeCollection(driverProfiles);
if (out.getTransportVersion().onOrAfter(TransportVersions.ESQL_PROFILE_INCLUDE_PLAN)) {
out.writeCollection(planProfiles);
}
}

public static class Accumulator {
private long documentsFound;
private long valuesLoaded;
private final List<DriverProfile> collectedProfiles = new ArrayList<>();
private final List<DriverProfile> driverProfiles = new ArrayList<>();
private final List<PlanProfile> planProfiles = new ArrayList<>();

public void accumulate(DriverCompletionInfo info) {
this.documentsFound += info.documentsFound;
this.valuesLoaded += info.valuesLoaded;
this.collectedProfiles.addAll(info.collectedProfiles);
this.driverProfiles.addAll(info.driverProfiles);
this.planProfiles.addAll(info.planProfiles);
}

public DriverCompletionInfo finish() {
return new DriverCompletionInfo(documentsFound, valuesLoaded, collectedProfiles);
return new DriverCompletionInfo(documentsFound, valuesLoaded, driverProfiles, planProfiles);
}
}

public static class AtomicAccumulator {
private final AtomicLong documentsFound = new AtomicLong();
private final AtomicLong valuesLoaded = new AtomicLong();
private final List<DriverProfile> collectedProfiles = Collections.synchronizedList(new ArrayList<>());
private final List<PlanProfile> planProfiles = Collections.synchronizedList(new ArrayList<>());

public void accumulate(DriverCompletionInfo info) {
this.documentsFound.addAndGet(info.documentsFound);
this.valuesLoaded.addAndGet(info.valuesLoaded);
this.collectedProfiles.addAll(info.collectedProfiles);
this.collectedProfiles.addAll(info.driverProfiles);
this.planProfiles.addAll(info.planProfiles);
}

public DriverCompletionInfo finish() {
return new DriverCompletionInfo(documentsFound.get(), valuesLoaded.get(), collectedProfiles);
return new DriverCompletionInfo(documentsFound.get(), valuesLoaded.get(), collectedProfiles, planProfiles);
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,41 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License
* 2.0; you may not use this file except in compliance with the Elastic License
* 2.0.
*/

package org.elasticsearch.compute.operator;

import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.common.io.stream.Writeable;
import org.elasticsearch.xcontent.ToXContentObject;
import org.elasticsearch.xcontent.XContentBuilder;

import java.io.IOException;

public record PlanProfile(String description, String clusterName, String nodeName, String planTree) implements Writeable, ToXContentObject {

public static PlanProfile readFrom(StreamInput in) throws IOException {
return new PlanProfile(in.readString(), in.readString(), in.readString(), in.readString());
}

@Override
public void writeTo(StreamOutput out) throws IOException {
out.writeString(description);
out.writeString(clusterName);
out.writeString(nodeName);
out.writeString(planTree);
}

@Override
public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException {
return builder.startObject()
.field("description", description)
.field("cluster_name", clusterName)
.field("node_name", nodeName)
.field("plan", planTree)
.endObject();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -222,7 +222,9 @@ private void testPushQuery(String value, String esqlQuery, List<String> luceneQu
result,
getResultMatcher(result).entry(
"profile",
matchesMap().entry("drivers", instanceOf(List.class))
matchesMap() //
.entry("drivers", instanceOf(List.class))
.entry("plans", instanceOf(List.class))
.entry("planning", matchesMap().extraOk())
.entry("query", matchesMap().extraOk())
),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -112,7 +112,9 @@ private void testQuery(Double percent, String query, int documentsFound, boolean
matchesMap().entry("documents_found", documentsFound)
.entry(
"profile",
matchesMap().entry("drivers", instanceOf(List.class))
matchesMap() //
.entry("drivers", instanceOf(List.class))
.entry("plans", instanceOf(List.class))
.entry("planning", matchesMap().extraOk())
.entry("query", matchesMap().extraOk())
)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
import org.elasticsearch.compute.data.BlockStreamInput;
import org.elasticsearch.compute.data.Page;
import org.elasticsearch.compute.operator.DriverProfile;
import org.elasticsearch.compute.operator.PlanProfile;
import org.elasticsearch.core.AbstractRefCounted;
import org.elasticsearch.core.Nullable;
import org.elasticsearch.core.Releasable;
Expand Down Expand Up @@ -122,7 +123,7 @@ static EsqlQueryResponse deserialize(BlockStreamInput in) throws IOException {
long documentsFound = in.getTransportVersion().onOrAfter(ESQL_DOCUMENTS_FOUND_AND_VALUES_LOADED) ? in.readVLong() : 0;
long valuesLoaded = in.getTransportVersion().onOrAfter(ESQL_DOCUMENTS_FOUND_AND_VALUES_LOADED) ? in.readVLong() : 0;
if (in.getTransportVersion().onOrAfter(TransportVersions.V_8_12_0)) {
profile = in.readOptionalWriteable(Profile::new);
profile = in.readOptionalWriteable(Profile::readFrom);
}
boolean columnar = in.readBoolean();
EsqlExecutionInfo executionInfo = null;
Expand Down Expand Up @@ -286,13 +287,19 @@ private Iterator<ToXContent> profileRenderer(ToXContent.Params params) {
if (profile == null) {
return Collections.emptyIterator();
}
return Iterators.concat(ChunkedToXContentHelper.startObject("profile"), ChunkedToXContentHelper.chunk((b, p) -> {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I wonder if we should do like:

List<Iterator> all = new ArrayList<>();
all.add(ChunkedToXContentHelper.startObject("profile"));
if (executionInfo != null) {
  all.add(ChunkedToXContentHelper.chunk((b, p) -> b.field("query", executionInfo.overallTimeSpan()),field("planning", executionInfo.planningTimeSpan());
}
...

That keeps everything on one line nicely and means expansions don't make big changes to the layout. And the ArrayList isn't a big allocation - just a bunch of pointers in it.

if (executionInfo != null) {
b.field("query", executionInfo.overallTimeSpan());
b.field("planning", executionInfo.planningTimeSpan());
}
return b;
}), ChunkedToXContentHelper.array("drivers", profile.drivers.iterator(), params), ChunkedToXContentHelper.endObject());
return Iterators.concat(
ChunkedToXContentHelper.startObject("profile"), //
ChunkedToXContentHelper.chunk((b, p) -> {
if (executionInfo != null) {
b.field("query", executionInfo.overallTimeSpan());
b.field("planning", executionInfo.planningTimeSpan());
}
return b;
}),
ChunkedToXContentHelper.array("drivers", profile.drivers.iterator(), params),
ChunkedToXContentHelper.array("plans", profile.plans.iterator()),
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This adds following section:

  "plans": [
    {
      "description": "data",
      "cluster_name": "runTask",
      "node_name": "runTask-0",
      "plan_tree": "ExchangeSinkExec[[language_code{f}#2],false]\n\\_ProjectExec[[language_code{f}#2]]\n  \\_FieldExtractExec[language_code{f}#2]<[],[]>\n    \\_EsQueryExec[data], indexMode[standard], query[][_doc{f}#3], limit[1000], sort[] estimatedRowSize[12]"
    },
    {
      "description": "node_reduce",
      "cluster_name": "runTask",
      "node_name": "runTask-0",
      "plan_tree": "ExchangeSinkExec[[language_code{f}#2],false]\n\\_ExchangeSourceExec[[language_code{f}#2],false]"
    },
    {
      "description": "data",
      "cluster_name": "runTask",
      "node_name": "runTask-1",
      "plan_tree": "ExchangeSinkExec[[language_code{f}#1],false]\n\\_ProjectExec[[language_code{f}#1]]\n  \\_FieldExtractExec[language_code{f}#1]<[],[]>\n    \\_EsQueryExec[data], indexMode[standard], query[][_doc{f}#2], limit[1000], sort[] estimatedRowSize[12]"
    },
    {
      "description": "node_reduce",
      "cluster_name": "runTask",
      "node_name": "runTask-1",
      "plan_tree": "ExchangeSinkExec[[language_code{f}#1],false]\n\\_LimitExec[1000[INTEGER],8]\n  \\_ExchangeSourceExec[[language_code{f}#1],false]"
    },
    {
      "description": "final",
      "cluster_name": "runTask",
      "node_name": "runTask-0",
      "plan_tree": "OutputExec[org.elasticsearch.xpack.esql.plugin.ComputeService$$Lambda/0x000000000c464e08@35b2b181]\n\\_LimitExec[1000[INTEGER],8]\n  \\_ExchangeSourceExec[[language_code{f}#2],false]"
    }
  ]

json is not particularly great when displaying multi-line strings. I wonder if there is any trick that could help us make tree a bit more readable? May be display array of lines instead of songle line with breaks?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'd be fine with breaking on \n into an array. It's easier to read.

I'd also be fine keeping the big string. It's quite honest about what's really there.

I suppose the best thing would be to build some kind of object tree structure. The plan tree rendering code already does some of that - but it's optimized for string-y-ness. It'll, like, discard long stuff too. And we don't need that in this json response. At least, not for readability. It'd be bad to have a 40mb plan. But I don't think this is worth doing in this PR. It's a bunch more work and either the array of lines or the giant string is good.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The plan-tree is actually an ASCII-art rendering of a tree structure. We should instead export that tree as a JSON tree (with parent-child nesting). I was planning to do this based on feedback from the work I did for query plan visualization. Costin pointed out that there was precedence in the QL code-base for exporting to graphviz. We do not need to go and support graphviz, but leave something as specific as that to external tools. We should at least print the plan tree as JSON instead of ascii-art. This would be a good improvement.

ChunkedToXContentHelper.endObject()
);
}

public boolean[] nullColumns() {
Expand Down Expand Up @@ -396,41 +403,23 @@ public EsqlResponse responseInternal() {
return esqlResponse;
}

public static class Profile implements Writeable {
private final List<DriverProfile> drivers;
public record Profile(List<DriverProfile> drivers, List<PlanProfile> plans) implements Writeable {

public Profile(List<DriverProfile> drivers) {
this.drivers = drivers;
}

public Profile(StreamInput in) throws IOException {
this.drivers = in.readCollectionAsImmutableList(DriverProfile::readFrom);
public static Profile readFrom(StreamInput in) throws IOException {
return new Profile(
in.readCollectionAsImmutableList(DriverProfile::readFrom),
in.getTransportVersion().onOrAfter(TransportVersions.ESQL_PROFILE_INCLUDE_PLAN)
? in.readCollectionAsImmutableList(PlanProfile::readFrom)
: List.of()
);
}

@Override
public void writeTo(StreamOutput out) throws IOException {
out.writeCollection(drivers);
}

@Override
public boolean equals(Object o) {
if (this == o) {
return true;
if (out.getTransportVersion().onOrAfter(TransportVersions.ESQL_PROFILE_INCLUDE_PLAN)) {
out.writeCollection(plans);
}
if (o == null || getClass() != o.getClass()) {
return false;
}
Profile profile = (Profile) o;
return Objects.equals(drivers, profile.drivers);
}

@Override
public int hashCode() {
return Objects.hash(drivers);
}

List<DriverProfile> drivers() {
return drivers;
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -59,10 +59,10 @@ final class ComputeResponse extends TransportResponse {

ComputeResponse(StreamInput in) throws IOException {
if (in.getTransportVersion().onOrAfter(ESQL_DOCUMENTS_FOUND_AND_VALUES_LOADED)) {
completionInfo = new DriverCompletionInfo(in);
completionInfo = DriverCompletionInfo.readFrom(in);
} else if (in.getTransportVersion().onOrAfter(TransportVersions.V_8_12_0)) {
if (in.readBoolean()) {
completionInfo = new DriverCompletionInfo(0, 0, in.readCollectionAsImmutableList(DriverProfile::readFrom));
completionInfo = new DriverCompletionInfo(0, 0, in.readCollectionAsImmutableList(DriverProfile::readFrom), List.of());
} else {
completionInfo = DriverCompletionInfo.EMPTY;
}
Expand Down Expand Up @@ -96,7 +96,7 @@ public void writeTo(StreamOutput out) throws IOException {
completionInfo.writeTo(out);
} else if (out.getTransportVersion().onOrAfter(TransportVersions.V_8_12_0)) {
out.writeBoolean(true);
out.writeCollection(completionInfo.collectedProfiles());
out.writeCollection(completionInfo.driverProfiles());
}
if (out.getTransportVersion().onOrAfter(TransportVersions.V_8_16_0)) {
out.writeOptionalTimeValue(took);
Expand Down
Loading