Skip to content
Closed
Show file tree
Hide file tree
Changes from 5 commits
Commits
Show all changes
26 commits
Select commit Hold shift + click to select a range
19b95b4
top level plumbing; work in progress
not-napoleon Mar 13, 2025
1e795fc
collected profiles object. WIP, this still doesn't compile
not-napoleon Mar 14, 2025
fa82be2
make collected profiles writable. Still WIP
not-napoleon Mar 14, 2025
838cf28
clean up a few more usages
not-napoleon Mar 14, 2025
bd1567c
Everything Compiles!
not-napoleon Mar 14, 2025
873f2d7
[CI] Auto commit changes from spotless
Mar 14, 2025
9a09236
Profile serialization
not-napoleon Mar 14, 2025
df7f07d
Merge remote-tracking branch 'refs/remotes/not-napoleon/esql-planner-…
not-napoleon Mar 14, 2025
f28aa9e
Just use Profie instead of creating a new class
not-napoleon Mar 17, 2025
704d51a
Use profile instead of explicit lists
not-napoleon Mar 17, 2025
ac2efcd
Merge branch 'main' into esql-planner-profile
not-napoleon Mar 17, 2025
c7b782d
fix tests
not-napoleon Mar 17, 2025
73ddcb7
[CI] Auto commit changes from spotless
Mar 17, 2025
62da773
plumb planner profile through to the rule runner
not-napoleon Mar 17, 2025
9c1536c
Merge remote-tracking branch 'refs/remotes/not-napoleon/esql-planner-…
not-napoleon Mar 17, 2025
41ac4d4
push the profile one layer deeper into the runner
not-napoleon Mar 18, 2025
9bc9550
add new parameters everywhere
not-napoleon Mar 18, 2025
0530d40
[CI] Auto commit changes from spotless
Mar 18, 2025
aa6b085
rule layout draft
not-napoleon Mar 20, 2025
a8ad96d
Merge branch 'main' into esql-planner-profile
not-napoleon Mar 27, 2025
845c431
Merge remote-tracking branch 'refs/remotes/not-napoleon/esql-planner-…
not-napoleon Mar 27, 2025
6658b96
[CI] Auto commit changes from spotless
Mar 27, 2025
b9c07ee
fix some merge errors
not-napoleon Mar 27, 2025
229c11d
lay out data structures for profile
not-napoleon Mar 31, 2025
6987ac0
Merge remote-tracking branch 'refs/remotes/not-napoleon/esql-planner-…
not-napoleon Mar 31, 2025
5dd319d
[CI] Auto commit changes from spotless
Mar 31, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -180,6 +180,7 @@ static TransportVersion def(int id) {
public static final TransportVersion MAX_OPERATION_SIZE_REJECTIONS_ADDED = def(9_024_0_00);
public static final TransportVersion RETRY_ILM_ASYNC_ACTION_REQUIRE_ERROR = def(9_025_0_00);
public static final TransportVersion ESQL_SERIALIZE_BLOCK_TYPE_CODE = def(9_026_0_00);
public static final TransportVersion ESQL_PLANNER_PROFILE = def(9_027_0_00);

/*
* STOP! READ THIS FIRST! No, really,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,8 @@
import org.elasticsearch.xcontent.ToXContent;
import org.elasticsearch.xpack.core.esql.action.EsqlResponse;
import org.elasticsearch.xpack.esql.core.type.DataType;
import org.elasticsearch.xpack.esql.planner.PlannerProfile;
import org.elasticsearch.xpack.esql.plugin.CollectedProfiles;

import java.io.IOException;
import java.util.Collections;
Expand Down Expand Up @@ -348,13 +350,26 @@ public EsqlResponse responseInternal() {

public static class Profile implements Writeable, ChunkedToXContentObject {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

NIT: I wonder if we could make this a record?
This way we could avoid custom equals/hashCode/toString.
Merge could be updated to return a new instance instead of mutating this one

private final List<DriverProfile> drivers;
private final List<PlannerProfile> plannerProfile;

public Profile(List<DriverProfile> drivers) {
public Profile(List<DriverProfile> drivers, List<PlannerProfile> plannerProfile) {
this.drivers = drivers;
this.plannerProfile = plannerProfile;
}

public Profile(CollectedProfiles profiles) {
this.drivers = profiles.getDriverProfiles();
this.plannerProfile = profiles.getPlannerProfiles();
}

public Profile(StreamInput in) throws IOException {
this.drivers = in.readCollectionAsImmutableList(DriverProfile::readFrom);
if (in.getTransportVersion().onOrAfter(TransportVersions.ESQL_PLANNER_PROFILE)) {
this.plannerProfile = in.readCollectionAsImmutableList(PlannerProfile::readFrom);
} else {
this.plannerProfile = List.of();
}

}

@Override
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,51 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License
* 2.0; you may not use this file except in compliance with the Elastic License
* 2.0.
*/

package org.elasticsearch.xpack.esql.planner;

import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.common.io.stream.Writeable;
import org.elasticsearch.common.xcontent.ChunkedToXContentObject;
import org.elasticsearch.xcontent.ToXContent;

import java.io.IOException;
import java.util.Iterator;

/**
* Stores profiling information about the query plan. This can be the top level planning on the coordinating node, or the local
* planning on the data nodes.
*/
public class PlannerProfile implements Writeable, ChunkedToXContentObject {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It looks like this is still work in progress, but I would suggest we try to design it to be a record from the very beginning


public static final PlannerProfile EMPTY = new PlannerProfile();

private final boolean isLocalPlanning;

public static PlannerProfile readFrom(StreamInput in) throws IOException {
// NOCOMMIT
throw new UnsupportedOperationException();
}

public PlannerProfile() {
// NOCOMMIT
throw new UnsupportedOperationException();
}

@Override
public void writeTo(StreamOutput out) throws IOException {
// NOCOMMIT
throw new UnsupportedOperationException();
}

@Override
public Iterator<? extends ToXContent> toXContentChunked(ToXContent.Params params) {
// NOCOMMIT
throw new UnsupportedOperationException();
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,6 @@
import org.elasticsearch.action.ActionListenerResponseHandler;
import org.elasticsearch.action.OriginalIndices;
import org.elasticsearch.action.support.ChannelActionListener;
import org.elasticsearch.compute.operator.DriverProfile;
import org.elasticsearch.compute.operator.exchange.ExchangeService;
import org.elasticsearch.compute.operator.exchange.ExchangeSourceHandler;
import org.elasticsearch.core.Releasable;
Expand Down Expand Up @@ -75,7 +74,7 @@ void startComputeOnRemoteCluster(
RemoteCluster cluster,
Runnable cancelQueryOnFailure,
EsqlExecutionInfo executionInfo,
ActionListener<List<DriverProfile>> listener
ActionListener<CollectedProfiles> listener
) {
var queryPragmas = configuration.pragmas();
listener = ActionListener.runBefore(listener, exchangeSource.addEmptySink()::close);
Expand All @@ -87,10 +86,10 @@ void startComputeOnRemoteCluster(
final boolean receivedResults = finalResponse.get() != null || pagesFetched.get();
if (receivedResults == false && EsqlCCSUtils.shouldIgnoreRuntimeError(executionInfo, clusterAlias, e)) {
EsqlCCSUtils.markClusterWithFinalStateAndNoShards(executionInfo, clusterAlias, EsqlExecutionInfo.Cluster.Status.SKIPPED, e);
l.onResponse(List.of());
l.onResponse(CollectedProfiles.EMPTY);
} else if (configuration.allowPartialResults()) {
EsqlCCSUtils.markClusterWithFinalStateAndNoShards(executionInfo, clusterAlias, EsqlExecutionInfo.Cluster.Status.PARTIAL, e);
l.onResponse(List.of());
l.onResponse(CollectedProfiles.EMPTY);
} else {
l.onFailure(e);
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,49 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License
* 2.0; you may not use this file except in compliance with the Elastic License
* 2.0.
*/

package org.elasticsearch.xpack.esql.plugin;

import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.common.io.stream.Writeable;
import org.elasticsearch.compute.operator.DriverProfile;
import org.elasticsearch.xpack.esql.planner.PlannerProfile;

import java.io.IOException;
import java.util.List;

/**
* Holds the collection of driver profiles and query planning profiles from the individual nodes processing the query.
*/
public class CollectedProfiles implements Writeable {
public static final CollectedProfiles EMPTY = new CollectedProfiles(List.of(), List.of());

private List<DriverProfile> driverProfiles;
private List<PlannerProfile> plannerProfiles;

public CollectedProfiles(List<DriverProfile> driverProfiles, List<PlannerProfile> plannerProfiles) {
this.driverProfiles = driverProfiles;
this.plannerProfiles = plannerProfiles;
}

public CollectedProfiles(StreamInput in) throws IOException {

}

@Override
public void writeTo(StreamOutput out) throws IOException {

}

public List<DriverProfile> getDriverProfiles() {
return driverProfiles;
}

public List<PlannerProfile> getPlannerProfiles() {
return plannerProfiles;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
import org.elasticsearch.compute.operator.ResponseHeadersCollector;
import org.elasticsearch.core.Releasable;
import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.xpack.esql.planner.PlannerProfile;

import java.util.ArrayList;
import java.util.Collections;
Expand All @@ -29,17 +30,19 @@
final class ComputeListener implements Releasable {
private final EsqlRefCountingListener refs;
private final List<DriverProfile> collectedProfiles;
private final List<PlannerProfile> collectedPlannerProfiles;
private final ResponseHeadersCollector responseHeaders;
private final Runnable runOnFailure;

ComputeListener(ThreadPool threadPool, Runnable runOnFailure, ActionListener<List<DriverProfile>> delegate) {
ComputeListener(ThreadPool threadPool, Runnable runOnFailure, ActionListener<CollectedProfiles> delegate) {
this.runOnFailure = runOnFailure;
this.responseHeaders = new ResponseHeadersCollector(threadPool.getThreadContext());
this.collectedProfiles = Collections.synchronizedList(new ArrayList<>());
this.collectedPlannerProfiles = Collections.synchronizedList(new ArrayList<>());
// listener that executes after all the sub-listeners refs (created via acquireCompute) have completed
this.refs = new EsqlRefCountingListener(delegate.delegateFailure((l, ignored) -> {
responseHeaders.finish();
delegate.onResponse(collectedProfiles.stream().toList());
delegate.onResponse(new CollectedProfiles(collectedProfiles.stream().toList(), collectedPlannerProfiles.stream().toList()));
}));
}

Expand All @@ -60,12 +63,17 @@ ActionListener<Void> acquireAvoid() {
/**
* Acquires a new listener that collects compute result. This listener will also collect warnings emitted during compute
*/
ActionListener<List<DriverProfile>> acquireCompute() {
ActionListener<CollectedProfiles> acquireCompute() {
final ActionListener<Void> delegate = acquireAvoid();
return ActionListener.wrap(profiles -> {
responseHeaders.collect();
if (profiles != null && profiles.isEmpty() == false) {
collectedProfiles.addAll(profiles);
if (profiles != null) {
if (profiles.getDriverProfiles().isEmpty() == false) {
collectedProfiles.addAll(profiles.getDriverProfiles());
}
if (profiles.getPlannerProfiles().isEmpty() == false) {
collectedPlannerProfiles.addAll(profiles.getPlannerProfiles());
}
}
delegate.onResponse(null);
}, e -> {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@
* The compute result of {@link DataNodeRequest} or {@link ClusterComputeRequest}
*/
final class ComputeResponse extends TransportResponse {
private final List<DriverProfile> profiles;
private final CollectedProfiles profiles;

// for use with ClusterComputeRequests (cross-cluster searches)
private final TimeValue took; // overall took time for a specific cluster in a cross-cluster search
Expand All @@ -30,12 +30,12 @@ final class ComputeResponse extends TransportResponse {
public final int skippedShards;
public final int failedShards;

ComputeResponse(List<DriverProfile> profiles) {
ComputeResponse(CollectedProfiles profiles) {
this(profiles, null, null, null, null, null);
}

ComputeResponse(
List<DriverProfile> profiles,
CollectedProfiles profiles,
TimeValue took,
Integer totalShards,
Integer successfulShards,
Expand All @@ -54,7 +54,11 @@ final class ComputeResponse extends TransportResponse {
super(in);
if (in.getTransportVersion().onOrAfter(TransportVersions.V_8_12_0)) {
if (in.readBoolean()) {
profiles = in.readCollectionAsImmutableList(DriverProfile::readFrom);
if (in.getTransportVersion().onOrAfter(TransportVersions.ESQL_PLANNER_PROFILE)) {
profiles = new CollectedProfiles(in);
} else {
profiles = new CollectedProfiles(in.readCollectionAsImmutableList(DriverProfile::readFrom), List.of());
}
} else {
profiles = null;
}
Expand Down Expand Up @@ -83,7 +87,11 @@ public void writeTo(StreamOutput out) throws IOException {
out.writeBoolean(false);
} else {
out.writeBoolean(true);
out.writeCollection(profiles);
if (out.getTransportVersion().onOrAfter(TransportVersions.ESQL_PLANNER_PROFILE)) {
profiles.writeTo(out);
} else {
out.writeCollection(profiles.getDriverProfiles());
}
}
}
if (out.getTransportVersion().onOrAfter(TransportVersions.V_8_16_0)) {
Expand All @@ -95,7 +103,7 @@ public void writeTo(StreamOutput out) throws IOException {
}
}

public List<DriverProfile> getProfiles() {
public CollectedProfiles getProfiles() {
return profiles;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,6 @@
import org.elasticsearch.compute.data.BlockFactory;
import org.elasticsearch.compute.data.Page;
import org.elasticsearch.compute.operator.Driver;
import org.elasticsearch.compute.operator.DriverProfile;
import org.elasticsearch.compute.operator.DriverTaskRunner;
import org.elasticsearch.compute.operator.exchange.ExchangeService;
import org.elasticsearch.compute.operator.exchange.ExchangeSourceHandler;
Expand Down Expand Up @@ -52,6 +51,7 @@
import org.elasticsearch.xpack.esql.plan.physical.PhysicalPlan;
import org.elasticsearch.xpack.esql.planner.EsPhysicalOperationProviders;
import org.elasticsearch.xpack.esql.planner.LocalExecutionPlanner;
import org.elasticsearch.xpack.esql.planner.PlannerProfile;
import org.elasticsearch.xpack.esql.planner.PlannerUtils;
import org.elasticsearch.xpack.esql.session.Configuration;
import org.elasticsearch.xpack.esql.session.Result;
Expand Down Expand Up @@ -172,7 +172,13 @@ public void execute(
try (
var computeListener = new ComputeListener(transportService.getThreadPool(), cancelQueryOnFailure, listener.map(profiles -> {
updateExecutionInfoAfterCoordinatorOnlyQuery(execInfo);
return new Result(physicalPlan.output(), collectedPages, profiles, execInfo);
return new Result(
physicalPlan.output(),
collectedPages,
profiles.getDriverProfiles(),
profiles.getPlannerProfiles(),
execInfo
);
}))
) {
runCompute(rootTask, computeContext, coordinatorPlan, computeListener.acquireCompute());
Expand Down Expand Up @@ -204,7 +210,7 @@ public void execute(
exchangeService.addExchangeSourceHandler(sessionId, exchangeSource);
try (var computeListener = new ComputeListener(transportService.getThreadPool(), cancelQueryOnFailure, listener.map(profiles -> {
execInfo.markEndQuery(); // TODO: revisit this time recording model as part of INLINESTATS improvements
return new Result(outputAttributes, collectedPages, profiles, execInfo);
return new Result(outputAttributes, collectedPages, profiles.getDriverProfiles(), profiles.getPlannerProfiles(), execInfo);
}))) {
try (Releasable ignored = exchangeSource.addEmptySink()) {
// run compute on the coordinator
Expand Down Expand Up @@ -279,7 +285,7 @@ public void execute(
EsqlExecutionInfo.Cluster.Status.PARTIAL
).setFailures(List.of(new ShardSearchFailure(e))).build()
);
dataNodesListener.onResponse(List.of());
dataNodesListener.onResponse(CollectedProfiles.EMPTY);
} else {
dataNodesListener.onFailure(e);
}
Expand Down Expand Up @@ -339,7 +345,12 @@ private static void updateExecutionInfoAfterCoordinatorOnlyQuery(EsqlExecutionIn
}
}

void runCompute(CancellableTask task, ComputeContext context, PhysicalPlan plan, ActionListener<List<DriverProfile>> listener) {
void runCompute(
CancellableTask task,
ComputeContext context,
PhysicalPlan plan,
ActionListener<CollectedProfiles> listener
) {
listener = ActionListener.runBefore(listener, () -> Releasables.close(context.searchContexts()));
List<EsPhysicalOperationProviders.ShardContext> contexts = new ArrayList<>(context.searchContexts().size());
for (int i = 0; i < context.searchContexts().size(); i++) {
Expand All @@ -358,6 +369,7 @@ public SourceProvider createSourceProvider() {
);
}
final List<Driver> drivers;
final PlannerProfile localPlannerProfile = new PlannerProfile();
try {
LocalExecutionPlanner planner = new LocalExecutionPlanner(
context.sessionId(),
Expand Down Expand Up @@ -396,9 +408,9 @@ public SourceProvider createSourceProvider() {
}
ActionListener<Void> listenerCollectingStatus = listener.map(ignored -> {
if (context.configuration().profile()) {
return drivers.stream().map(Driver::profile).toList();
return new CollectedProfiles(drivers.stream().map(Driver::profile).toList(), List.of(localPlannerProfile));
} else {
return List.of();
return CollectedProfiles.EMPTY;
}
});
listenerCollectingStatus = ActionListener.releaseAfter(listenerCollectingStatus, () -> Releasables.close(drivers));
Expand Down
Loading