Skip to content
Closed
Show file tree
Hide file tree
Changes from 13 commits
Commits
Show all changes
26 commits
Select commit Hold shift + click to select a range
19b95b4
top level plumbing; work in progress
not-napoleon Mar 13, 2025
1e795fc
collected profiles object. WIP, this still doesn't compile
not-napoleon Mar 14, 2025
fa82be2
make collected profiles writable. Still WIP
not-napoleon Mar 14, 2025
838cf28
clean up a few more usages
not-napoleon Mar 14, 2025
bd1567c
Everything Compiles!
not-napoleon Mar 14, 2025
873f2d7
[CI] Auto commit changes from spotless
Mar 14, 2025
9a09236
Profile serialization
not-napoleon Mar 14, 2025
df7f07d
Merge remote-tracking branch 'refs/remotes/not-napoleon/esql-planner-…
not-napoleon Mar 14, 2025
f28aa9e
Just use Profie instead of creating a new class
not-napoleon Mar 17, 2025
704d51a
Use profile instead of explicit lists
not-napoleon Mar 17, 2025
ac2efcd
Merge branch 'main' into esql-planner-profile
not-napoleon Mar 17, 2025
c7b782d
fix tests
not-napoleon Mar 17, 2025
73ddcb7
[CI] Auto commit changes from spotless
Mar 17, 2025
62da773
plumb planner profile through to the rule runner
not-napoleon Mar 17, 2025
9c1536c
Merge remote-tracking branch 'refs/remotes/not-napoleon/esql-planner-…
not-napoleon Mar 17, 2025
41ac4d4
push the profile one layer deeper into the runner
not-napoleon Mar 18, 2025
9bc9550
add new parameters everywhere
not-napoleon Mar 18, 2025
0530d40
[CI] Auto commit changes from spotless
Mar 18, 2025
aa6b085
rule layout draft
not-napoleon Mar 20, 2025
a8ad96d
Merge branch 'main' into esql-planner-profile
not-napoleon Mar 27, 2025
845c431
Merge remote-tracking branch 'refs/remotes/not-napoleon/esql-planner-…
not-napoleon Mar 27, 2025
6658b96
[CI] Auto commit changes from spotless
Mar 27, 2025
b9c07ee
fix some merge errors
not-napoleon Mar 27, 2025
229c11d
lay out data structures for profile
not-napoleon Mar 31, 2025
6987ac0
Merge remote-tracking branch 'refs/remotes/not-napoleon/esql-planner-…
not-napoleon Mar 31, 2025
5dd319d
[CI] Auto commit changes from spotless
Mar 31, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -186,6 +186,7 @@ static TransportVersion def(int id) {
public static final TransportVersion INFERENCE_CONTEXT = def(9_028_0_00);
public static final TransportVersion ML_INFERENCE_DEEPSEEK = def(9_029_00_0);
public static final TransportVersion ESQL_FAILURE_FROM_REMOTE = def(9_030_00_0);
public static final TransportVersion ESQL_PLANNER_PROFILE = def(9_031_0_00);

/*
* STOP! READ THIS FIRST! No, really,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,8 +27,10 @@
import org.elasticsearch.xcontent.ToXContent;
import org.elasticsearch.xpack.core.esql.action.EsqlResponse;
import org.elasticsearch.xpack.esql.core.type.DataType;
import org.elasticsearch.xpack.esql.planner.PlannerProfile;

import java.io.IOException;
import java.util.ArrayList;
import java.util.Collections;
import java.util.Iterator;
import java.util.List;
Expand Down Expand Up @@ -346,18 +348,48 @@ public EsqlResponse responseInternal() {

public static class Profile implements Writeable, ChunkedToXContentObject {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

NIT: I wonder if we could make this a record?
This way we could avoid custom equals/hashCode/toString.
Merge could be updated to return a new instance instead of mutating this one

private final List<DriverProfile> drivers;
private final List<PlannerProfile> plannerProfile;

public Profile(List<DriverProfile> drivers) {
public static final Profile EMPTY = new Profile(List.of(), List.of());

public Profile() {
this.drivers = new ArrayList<>();
this.plannerProfile = new ArrayList<>();
}

public Profile(List<DriverProfile> drivers, List<PlannerProfile> plannerProfile) {
this.drivers = drivers;
this.plannerProfile = plannerProfile;
}

public Profile(StreamInput in) throws IOException {
this.drivers = in.readCollectionAsImmutableList(DriverProfile::readFrom);
if (in.getTransportVersion().onOrAfter(TransportVersions.ESQL_PLANNER_PROFILE)) {
this.plannerProfile = in.readCollectionAsImmutableList(PlannerProfile::readFrom);
} else {
this.plannerProfile = List.of();
}
}

public void merge(Profile other) {
this.drivers.addAll(other.drivers);
this.plannerProfile.addAll(other.plannerProfile);
}

public List<DriverProfile> getDriverProfiles() {
return drivers;
}

public List<PlannerProfile> getPlannerProfiles() {
return plannerProfile;
}

@Override
public void writeTo(StreamOutput out) throws IOException {
out.writeCollection(drivers);
if (out.getTransportVersion().onOrAfter(TransportVersions.ESQL_PLANNER_PROFILE)) {
out.writeCollection(plannerProfile);
}
}

@Override
Expand All @@ -369,12 +401,12 @@ public boolean equals(Object o) {
return false;
}
Profile profile = (Profile) o;
return Objects.equals(drivers, profile.drivers);
return Objects.equals(drivers, profile.drivers) && Objects.equals(plannerProfile, profile.plannerProfile);
}

@Override
public int hashCode() {
return Objects.hash(drivers);
return Objects.hash(drivers, plannerProfile);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,49 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License
* 2.0; you may not use this file except in compliance with the Elastic License
* 2.0.
*/

package org.elasticsearch.xpack.esql.planner;

import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.common.io.stream.Writeable;
import org.elasticsearch.common.xcontent.ChunkedToXContentObject;
import org.elasticsearch.xcontent.ToXContent;

import java.io.IOException;
import java.util.Iterator;

/**
* Stores profiling information about the query plan. This can be the top level planning on the coordinating node, or the local
* planning on the data nodes.
*/
public class PlannerProfile implements Writeable, ChunkedToXContentObject {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It looks like this is still work in progress, but I would suggest we try to design it to be a record from the very beginning


public static final PlannerProfile EMPTY = new PlannerProfile(false);

private final boolean isLocalPlanning;

public static PlannerProfile readFrom(StreamInput in) throws IOException {
boolean isLocalPlanning = in.readBoolean();
return new PlannerProfile(isLocalPlanning);
}

public PlannerProfile(boolean isLocalPlanning) {
this.isLocalPlanning = isLocalPlanning;
}

@Override
public void writeTo(StreamOutput out) throws IOException {
out.writeBoolean(isLocalPlanning);
}

@Override
public Iterator<? extends ToXContent> toXContentChunked(ToXContent.Params params) {
// NOCOMMIT
throw new UnsupportedOperationException();
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,6 @@
import org.elasticsearch.action.ActionListenerResponseHandler;
import org.elasticsearch.action.OriginalIndices;
import org.elasticsearch.action.support.ChannelActionListener;
import org.elasticsearch.compute.operator.DriverProfile;
import org.elasticsearch.compute.operator.exchange.ExchangeService;
import org.elasticsearch.compute.operator.exchange.ExchangeSourceHandler;
import org.elasticsearch.core.Releasable;
Expand All @@ -27,6 +26,7 @@
import org.elasticsearch.transport.TransportRequestOptions;
import org.elasticsearch.transport.TransportService;
import org.elasticsearch.xpack.esql.action.EsqlExecutionInfo;
import org.elasticsearch.xpack.esql.action.EsqlQueryResponse;
import org.elasticsearch.xpack.esql.plan.physical.ExchangeSinkExec;
import org.elasticsearch.xpack.esql.plan.physical.PhysicalPlan;
import org.elasticsearch.xpack.esql.session.Configuration;
Expand Down Expand Up @@ -75,7 +75,7 @@ void startComputeOnRemoteCluster(
RemoteCluster cluster,
Runnable cancelQueryOnFailure,
EsqlExecutionInfo executionInfo,
ActionListener<List<DriverProfile>> listener
ActionListener<EsqlQueryResponse.Profile> listener
) {
var queryPragmas = configuration.pragmas();
listener = ActionListener.runBefore(listener, exchangeSource.addEmptySink()::close);
Expand All @@ -87,10 +87,10 @@ void startComputeOnRemoteCluster(
final boolean receivedResults = finalResponse.get() != null || pagesFetched.get();
if (receivedResults == false && EsqlCCSUtils.shouldIgnoreRuntimeError(executionInfo, clusterAlias, e)) {
EsqlCCSUtils.markClusterWithFinalStateAndNoShards(executionInfo, clusterAlias, EsqlExecutionInfo.Cluster.Status.SKIPPED, e);
l.onResponse(List.of());
l.onResponse(EsqlQueryResponse.Profile.EMPTY);
} else if (configuration.allowPartialResults()) {
EsqlCCSUtils.markClusterWithFinalStateAndNoShards(executionInfo, clusterAlias, EsqlExecutionInfo.Cluster.Status.PARTIAL, e);
l.onResponse(List.of());
l.onResponse(EsqlQueryResponse.Profile.EMPTY);
} else {
l.onFailure(e);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,8 @@
import org.elasticsearch.compute.operator.ResponseHeadersCollector;
import org.elasticsearch.core.Releasable;
import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.xpack.esql.action.EsqlQueryResponse;
import org.elasticsearch.xpack.esql.planner.PlannerProfile;

import java.util.ArrayList;
import java.util.Collections;
Expand All @@ -29,17 +31,21 @@
final class ComputeListener implements Releasable {
private final EsqlRefCountingListener refs;
private final List<DriverProfile> collectedProfiles;
private final List<PlannerProfile> collectedPlannerProfiles;
private final ResponseHeadersCollector responseHeaders;
private final Runnable runOnFailure;

ComputeListener(ThreadPool threadPool, Runnable runOnFailure, ActionListener<List<DriverProfile>> delegate) {
ComputeListener(ThreadPool threadPool, Runnable runOnFailure, ActionListener<EsqlQueryResponse.Profile> delegate) {
this.runOnFailure = runOnFailure;
this.responseHeaders = new ResponseHeadersCollector(threadPool.getThreadContext());
this.collectedProfiles = Collections.synchronizedList(new ArrayList<>());
this.collectedPlannerProfiles = Collections.synchronizedList(new ArrayList<>());
// listener that executes after all the sub-listeners refs (created via acquireCompute) have completed
this.refs = new EsqlRefCountingListener(delegate.delegateFailure((l, ignored) -> {
responseHeaders.finish();
delegate.onResponse(collectedProfiles.stream().toList());
delegate.onResponse(
new EsqlQueryResponse.Profile(collectedProfiles.stream().toList(), collectedPlannerProfiles.stream().toList())
);
}));
}

Expand All @@ -60,12 +66,18 @@ ActionListener<Void> acquireAvoid() {
/**
* Acquires a new listener that collects compute result. This listener will also collect warnings emitted during compute
*/
ActionListener<List<DriverProfile>> acquireCompute() {
ActionListener<EsqlQueryResponse.Profile> acquireCompute() {
final ActionListener<Void> delegate = acquireAvoid();
return ActionListener.wrap(profiles -> {
responseHeaders.collect();
if (profiles != null && profiles.isEmpty() == false) {
collectedProfiles.addAll(profiles);
if (profiles != null) {
// TODO: move profile merging onto profile object
if (profiles.getDriverProfiles().isEmpty() == false) {
collectedProfiles.addAll(profiles.getDriverProfiles());
}
if (profiles.getPlannerProfiles().isEmpty() == false) {
collectedPlannerProfiles.addAll(profiles.getPlannerProfiles());
}
}
delegate.onResponse(null);
}, e -> {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
import org.elasticsearch.compute.operator.DriverProfile;
import org.elasticsearch.core.TimeValue;
import org.elasticsearch.transport.TransportResponse;
import org.elasticsearch.xpack.esql.action.EsqlQueryResponse;

import java.io.IOException;
import java.util.List;
Expand All @@ -22,7 +23,7 @@
* The compute result of {@link DataNodeRequest} or {@link ClusterComputeRequest}
*/
final class ComputeResponse extends TransportResponse {
private final List<DriverProfile> profiles;
private final EsqlQueryResponse.Profile profiles;

// for use with ClusterComputeRequests (cross-cluster searches)
private final TimeValue took; // overall took time for a specific cluster in a cross-cluster search
Expand All @@ -32,12 +33,12 @@ final class ComputeResponse extends TransportResponse {
public final int failedShards;
public final List<ShardSearchFailure> failures;

ComputeResponse(List<DriverProfile> profiles) {
ComputeResponse(EsqlQueryResponse.Profile profiles) {
this(profiles, null, null, null, null, null, List.of());
}

ComputeResponse(
List<DriverProfile> profiles,
EsqlQueryResponse.Profile profiles,
TimeValue took,
Integer totalShards,
Integer successfulShards,
Expand All @@ -57,7 +58,11 @@ final class ComputeResponse extends TransportResponse {
ComputeResponse(StreamInput in) throws IOException {
if (in.getTransportVersion().onOrAfter(TransportVersions.V_8_12_0)) {
if (in.readBoolean()) {
profiles = in.readCollectionAsImmutableList(DriverProfile::readFrom);
if (in.getTransportVersion().onOrAfter(TransportVersions.ESQL_PLANNER_PROFILE)) {
profiles = new EsqlQueryResponse.Profile(in);
} else {
profiles = new EsqlQueryResponse.Profile(in.readCollectionAsImmutableList(DriverProfile::readFrom), List.of());
}
} else {
profiles = null;
}
Expand Down Expand Up @@ -91,7 +96,11 @@ public void writeTo(StreamOutput out) throws IOException {
out.writeBoolean(false);
} else {
out.writeBoolean(true);
out.writeCollection(profiles);
if (out.getTransportVersion().onOrAfter(TransportVersions.ESQL_PLANNER_PROFILE)) {
profiles.writeTo(out);
} else {
out.writeCollection(profiles.getDriverProfiles());
}
}
}
if (out.getTransportVersion().onOrAfter(TransportVersions.V_8_16_0)) {
Expand All @@ -106,7 +115,7 @@ public void writeTo(StreamOutput out) throws IOException {
}
}

public List<DriverProfile> getProfiles() {
public EsqlQueryResponse.Profile getProfiles() {
return profiles;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,6 @@
import org.elasticsearch.compute.data.BlockFactory;
import org.elasticsearch.compute.data.Page;
import org.elasticsearch.compute.operator.Driver;
import org.elasticsearch.compute.operator.DriverProfile;
import org.elasticsearch.compute.operator.DriverTaskRunner;
import org.elasticsearch.compute.operator.exchange.ExchangeService;
import org.elasticsearch.compute.operator.exchange.ExchangeSourceHandler;
Expand All @@ -42,6 +41,7 @@
import org.elasticsearch.transport.TransportService;
import org.elasticsearch.xpack.esql.action.EsqlExecutionInfo;
import org.elasticsearch.xpack.esql.action.EsqlQueryAction;
import org.elasticsearch.xpack.esql.action.EsqlQueryResponse;
import org.elasticsearch.xpack.esql.core.expression.Attribute;
import org.elasticsearch.xpack.esql.core.expression.FoldContext;
import org.elasticsearch.xpack.esql.enrich.EnrichLookupService;
Expand All @@ -52,6 +52,7 @@
import org.elasticsearch.xpack.esql.plan.physical.PhysicalPlan;
import org.elasticsearch.xpack.esql.planner.EsPhysicalOperationProviders;
import org.elasticsearch.xpack.esql.planner.LocalExecutionPlanner;
import org.elasticsearch.xpack.esql.planner.PlannerProfile;
import org.elasticsearch.xpack.esql.planner.PlannerUtils;
import org.elasticsearch.xpack.esql.session.Configuration;
import org.elasticsearch.xpack.esql.session.Result;
Expand Down Expand Up @@ -280,7 +281,7 @@ public void execute(
EsqlExecutionInfo.Cluster.Status.PARTIAL
).setFailures(List.of(new ShardSearchFailure(e))).build()
);
dataNodesListener.onResponse(List.of());
dataNodesListener.onResponse(EsqlQueryResponse.Profile.EMPTY);
} else {
dataNodesListener.onFailure(e);
}
Expand Down Expand Up @@ -340,7 +341,7 @@ private static void updateExecutionInfoAfterCoordinatorOnlyQuery(EsqlExecutionIn
}
}

void runCompute(CancellableTask task, ComputeContext context, PhysicalPlan plan, ActionListener<List<DriverProfile>> listener) {
void runCompute(CancellableTask task, ComputeContext context, PhysicalPlan plan, ActionListener<EsqlQueryResponse.Profile> listener) {
listener = ActionListener.runBefore(listener, () -> Releasables.close(context.searchContexts()));
List<EsPhysicalOperationProviders.ShardContext> contexts = new ArrayList<>(context.searchContexts().size());
for (int i = 0; i < context.searchContexts().size(); i++) {
Expand All @@ -359,6 +360,7 @@ public SourceProvider createSourceProvider() {
);
}
final List<Driver> drivers;
final PlannerProfile localPlannerProfile = new PlannerProfile(true);
try {
LocalExecutionPlanner planner = new LocalExecutionPlanner(
context.sessionId(),
Expand Down Expand Up @@ -397,9 +399,9 @@ public SourceProvider createSourceProvider() {
}
ActionListener<Void> listenerCollectingStatus = listener.map(ignored -> {
if (context.configuration().profile()) {
return drivers.stream().map(Driver::profile).toList();
return new EsqlQueryResponse.Profile(drivers.stream().map(Driver::profile).toList(), List.of(localPlannerProfile));
} else {
return List.of();
return EsqlQueryResponse.Profile.EMPTY;
}
});
listenerCollectingStatus = ActionListener.releaseAfter(listenerCollectingStatus, () -> Releasables.close(drivers));
Expand Down
Loading