Skip to content

Commit bd1567c

Browse files
committed
Everything Compiles!
1 parent 838cf28 commit bd1567c

File tree

16 files changed

+130
-99
lines changed

16 files changed

+130
-99
lines changed

x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/action/EsqlQueryResponse.java

Lines changed: 10 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -28,6 +28,7 @@
2828
import org.elasticsearch.xpack.core.esql.action.EsqlResponse;
2929
import org.elasticsearch.xpack.esql.core.type.DataType;
3030
import org.elasticsearch.xpack.esql.planner.PlannerProfile;
31+
import org.elasticsearch.xpack.esql.plugin.CollectedProfiles;
3132

3233
import java.io.IOException;
3334
import java.util.Collections;
@@ -349,19 +350,24 @@ public EsqlResponse responseInternal() {
349350

350351
public static class Profile implements Writeable, ChunkedToXContentObject {
351352
private final List<DriverProfile> drivers;
352-
private final PlannerProfile plannerProfile;
353+
private final List<PlannerProfile> plannerProfile;
353354

354-
public Profile(List<DriverProfile> drivers, PlannerProfile plannerProfile) {
355+
public Profile(List<DriverProfile> drivers, List<PlannerProfile> plannerProfile) {
355356
this.drivers = drivers;
356357
this.plannerProfile = plannerProfile;
357358
}
358359

360+
public Profile(CollectedProfiles profiles) {
361+
this.drivers = profiles.getDriverProfiles();
362+
this.plannerProfile = profiles.getPlannerProfiles();
363+
}
364+
359365
public Profile(StreamInput in) throws IOException {
360366
this.drivers = in.readCollectionAsImmutableList(DriverProfile::readFrom);
361367
if (in.getTransportVersion().onOrAfter(TransportVersions.ESQL_PLANNER_PROFILE)) {
362-
this.plannerProfile = new PlannerProfile(in);
368+
this.plannerProfile = in.readCollectionAsImmutableList(PlannerProfile::readFrom);
363369
} else {
364-
this.plannerProfile = PlannerProfile.EMPTY;
370+
this.plannerProfile = List.of();
365371
}
366372

367373
}

x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/planner/PlannerProfile.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -26,12 +26,12 @@ public class PlannerProfile implements Writeable, ChunkedToXContentObject {
2626

2727
private final boolean isLocalPlanning;
2828

29-
public PlannerProfile() {
29+
public static PlannerProfile readFrom(StreamInput in) throws IOException {
3030
// NOCOMMIT
3131
throw new UnsupportedOperationException();
3232
}
3333

34-
public PlannerProfile(StreamInput in) throws IOException {
34+
public PlannerProfile() {
3535
// NOCOMMIT
3636
throw new UnsupportedOperationException();
3737
}

x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plugin/ClusterComputeHandler.java

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -74,7 +74,7 @@ void startComputeOnRemoteCluster(
7474
RemoteCluster cluster,
7575
Runnable cancelQueryOnFailure,
7676
EsqlExecutionInfo executionInfo,
77-
ActionListener<ComputeListener.CollectedProfiles> listener
77+
ActionListener<CollectedProfiles> listener
7878
) {
7979
var queryPragmas = configuration.pragmas();
8080
listener = ActionListener.runBefore(listener, exchangeSource.addEmptySink()::close);
@@ -86,10 +86,10 @@ void startComputeOnRemoteCluster(
8686
final boolean receivedResults = finalResponse.get() != null || pagesFetched.get();
8787
if (receivedResults == false && EsqlCCSUtils.shouldIgnoreRuntimeError(executionInfo, clusterAlias, e)) {
8888
EsqlCCSUtils.markClusterWithFinalStateAndNoShards(executionInfo, clusterAlias, EsqlExecutionInfo.Cluster.Status.SKIPPED, e);
89-
l.onResponse(ComputeListener.CollectedProfiles.EMPTY);
89+
l.onResponse(CollectedProfiles.EMPTY);
9090
} else if (configuration.allowPartialResults()) {
9191
EsqlCCSUtils.markClusterWithFinalStateAndNoShards(executionInfo, clusterAlias, EsqlExecutionInfo.Cluster.Status.PARTIAL, e);
92-
l.onResponse(ComputeListener.CollectedProfiles.EMPTY);
92+
l.onResponse(CollectedProfiles.EMPTY);
9393
} else {
9494
l.onFailure(e);
9595
}
Lines changed: 49 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,49 @@
1+
/*
2+
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
3+
* or more contributor license agreements. Licensed under the Elastic License
4+
* 2.0; you may not use this file except in compliance with the Elastic License
5+
* 2.0.
6+
*/
7+
8+
package org.elasticsearch.xpack.esql.plugin;
9+
10+
import org.elasticsearch.common.io.stream.StreamInput;
11+
import org.elasticsearch.common.io.stream.StreamOutput;
12+
import org.elasticsearch.common.io.stream.Writeable;
13+
import org.elasticsearch.compute.operator.DriverProfile;
14+
import org.elasticsearch.xpack.esql.planner.PlannerProfile;
15+
16+
import java.io.IOException;
17+
import java.util.List;
18+
19+
/**
20+
* Holds the collection of driver profiles and query planning profiles from the individual nodes processing the query.
21+
*/
22+
public class CollectedProfiles implements Writeable {
23+
public static final CollectedProfiles EMPTY = new CollectedProfiles(List.of(), List.of());
24+
25+
private List<DriverProfile> driverProfiles;
26+
private List<PlannerProfile> plannerProfiles;
27+
28+
public CollectedProfiles(List<DriverProfile> driverProfiles, List<PlannerProfile> plannerProfiles) {
29+
this.driverProfiles = driverProfiles;
30+
this.plannerProfiles = plannerProfiles;
31+
}
32+
33+
public CollectedProfiles(StreamInput in) throws IOException {
34+
35+
}
36+
37+
@Override
38+
public void writeTo(StreamOutput out) throws IOException {
39+
40+
}
41+
42+
public List<DriverProfile> getDriverProfiles() {
43+
return driverProfiles;
44+
}
45+
46+
public List<PlannerProfile> getPlannerProfiles() {
47+
return plannerProfiles;
48+
}
49+
}

x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plugin/ComputeListener.java

Lines changed: 0 additions & 36 deletions
Original file line numberDiff line numberDiff line change
@@ -9,17 +9,13 @@
99

1010
import org.elasticsearch.action.ActionListener;
1111
import org.elasticsearch.action.support.RefCountingListener;
12-
import org.elasticsearch.common.io.stream.StreamInput;
13-
import org.elasticsearch.common.io.stream.StreamOutput;
14-
import org.elasticsearch.common.io.stream.Writeable;
1512
import org.elasticsearch.compute.EsqlRefCountingListener;
1613
import org.elasticsearch.compute.operator.DriverProfile;
1714
import org.elasticsearch.compute.operator.ResponseHeadersCollector;
1815
import org.elasticsearch.core.Releasable;
1916
import org.elasticsearch.threadpool.ThreadPool;
2017
import org.elasticsearch.xpack.esql.planner.PlannerProfile;
2118

22-
import java.io.IOException;
2319
import java.util.ArrayList;
2420
import java.util.Collections;
2521
import java.util.List;
@@ -38,38 +34,6 @@ final class ComputeListener implements Releasable {
3834
private final ResponseHeadersCollector responseHeaders;
3935
private final Runnable runOnFailure;
4036

41-
/**
42-
* Holds the collection of driver profiles and query planning profiles from the individual nodes processing the query.
43-
*/
44-
public static class CollectedProfiles implements Writeable {
45-
public static final CollectedProfiles EMPTY = new CollectedProfiles(List.of(), List.of());
46-
47-
private List<DriverProfile> driverProfiles;
48-
private List<PlannerProfile> plannerProfiles;
49-
50-
CollectedProfiles(List<DriverProfile> driverProfiles, List<PlannerProfile> plannerProfiles) {
51-
this.driverProfiles = driverProfiles;
52-
this.plannerProfiles = plannerProfiles;
53-
}
54-
55-
CollectedProfiles(StreamInput in) throws IOException {
56-
57-
}
58-
59-
@Override
60-
public void writeTo(StreamOutput out) throws IOException {
61-
62-
}
63-
64-
public List<DriverProfile> getDriverProfiles() {
65-
return driverProfiles;
66-
}
67-
68-
public List<PlannerProfile> getPlannerProfiles() {
69-
return plannerProfiles;
70-
}
71-
}
72-
7337
ComputeListener(ThreadPool threadPool, Runnable runOnFailure, ActionListener<CollectedProfiles> delegate) {
7438
this.runOnFailure = runOnFailure;
7539
this.responseHeaders = new ResponseHeadersCollector(threadPool.getThreadContext());

x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plugin/ComputeResponse.java

Lines changed: 6 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -21,7 +21,7 @@
2121
* The compute result of {@link DataNodeRequest} or {@link ClusterComputeRequest}
2222
*/
2323
final class ComputeResponse extends TransportResponse {
24-
private final ComputeListener.CollectedProfiles profiles;
24+
private final CollectedProfiles profiles;
2525

2626
// for use with ClusterComputeRequests (cross-cluster searches)
2727
private final TimeValue took; // overall took time for a specific cluster in a cross-cluster search
@@ -30,12 +30,12 @@ final class ComputeResponse extends TransportResponse {
3030
public final int skippedShards;
3131
public final int failedShards;
3232

33-
ComputeResponse(ComputeListener.CollectedProfiles profiles) {
33+
ComputeResponse(CollectedProfiles profiles) {
3434
this(profiles, null, null, null, null, null);
3535
}
3636

3737
ComputeResponse(
38-
ComputeListener.CollectedProfiles profiles,
38+
CollectedProfiles profiles,
3939
TimeValue took,
4040
Integer totalShards,
4141
Integer successfulShards,
@@ -55,9 +55,9 @@ final class ComputeResponse extends TransportResponse {
5555
if (in.getTransportVersion().onOrAfter(TransportVersions.V_8_12_0)) {
5656
if (in.readBoolean()) {
5757
if (in.getTransportVersion().onOrAfter(TransportVersions.ESQL_PLANNER_PROFILE)) {
58-
profiles = new ComputeListener.CollectedProfiles(in);
58+
profiles = new CollectedProfiles(in);
5959
} else {
60-
profiles = new ComputeListener.CollectedProfiles(in.readCollectionAsImmutableList(DriverProfile::readFrom), List.of());
60+
profiles = new CollectedProfiles(in.readCollectionAsImmutableList(DriverProfile::readFrom), List.of());
6161
}
6262
} else {
6363
profiles = null;
@@ -103,7 +103,7 @@ public void writeTo(StreamOutput out) throws IOException {
103103
}
104104
}
105105

106-
public ComputeListener.CollectedProfiles getProfiles() {
106+
public CollectedProfiles getProfiles() {
107107
return profiles;
108108
}
109109

x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plugin/ComputeService.java

Lines changed: 6 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -51,6 +51,7 @@
5151
import org.elasticsearch.xpack.esql.plan.physical.PhysicalPlan;
5252
import org.elasticsearch.xpack.esql.planner.EsPhysicalOperationProviders;
5353
import org.elasticsearch.xpack.esql.planner.LocalExecutionPlanner;
54+
import org.elasticsearch.xpack.esql.planner.PlannerProfile;
5455
import org.elasticsearch.xpack.esql.planner.PlannerUtils;
5556
import org.elasticsearch.xpack.esql.session.Configuration;
5657
import org.elasticsearch.xpack.esql.session.Result;
@@ -284,7 +285,7 @@ public void execute(
284285
EsqlExecutionInfo.Cluster.Status.PARTIAL
285286
).setFailures(List.of(new ShardSearchFailure(e))).build()
286287
);
287-
dataNodesListener.onResponse(ComputeListener.CollectedProfiles.EMPTY);
288+
dataNodesListener.onResponse(CollectedProfiles.EMPTY);
288289
} else {
289290
dataNodesListener.onFailure(e);
290291
}
@@ -348,7 +349,7 @@ void runCompute(
348349
CancellableTask task,
349350
ComputeContext context,
350351
PhysicalPlan plan,
351-
ActionListener<ComputeListener.CollectedProfiles> listener
352+
ActionListener<CollectedProfiles> listener
352353
) {
353354
listener = ActionListener.runBefore(listener, () -> Releasables.close(context.searchContexts()));
354355
List<EsPhysicalOperationProviders.ShardContext> contexts = new ArrayList<>(context.searchContexts().size());
@@ -368,6 +369,7 @@ public SourceProvider createSourceProvider() {
368369
);
369370
}
370371
final List<Driver> drivers;
372+
final PlannerProfile localPlannerProfile = new PlannerProfile();
371373
try {
372374
LocalExecutionPlanner planner = new LocalExecutionPlanner(
373375
context.sessionId(),
@@ -406,9 +408,9 @@ public SourceProvider createSourceProvider() {
406408
}
407409
ActionListener<Void> listenerCollectingStatus = listener.map(ignored -> {
408410
if (context.configuration().profile()) {
409-
return drivers.stream().map(Driver::profile).toList();
411+
return new CollectedProfiles(drivers.stream().map(Driver::profile).toList(), List.of(localPlannerProfile));
410412
} else {
411-
return List.of();
413+
return CollectedProfiles.EMPTY;
412414
}
413415
});
414416
listenerCollectingStatus = ActionListener.releaseAfter(listenerCollectingStatus, () -> Releasables.close(drivers));

x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plugin/DataNodeComputeHandler.java

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -237,11 +237,11 @@ private void runBatch(int startBatchIndex) {
237237
final int endBatchIndex = Math.min(startBatchIndex + maxConcurrentShards, request.shardIds().size());
238238
final AtomicInteger pagesProduced = new AtomicInteger();
239239
List<ShardId> shardIds = request.shardIds().subList(startBatchIndex, endBatchIndex);
240-
ActionListener<ComputeListener.CollectedProfiles> batchListener = new ActionListener<>() {
241-
final ActionListener<ComputeListener.CollectedProfiles> ref = computeListener.acquireCompute();
240+
ActionListener<CollectedProfiles> batchListener = new ActionListener<>() {
241+
final ActionListener<CollectedProfiles> ref = computeListener.acquireCompute();
242242

243243
@Override
244-
public void onResponse(ComputeListener.CollectedProfiles result) {
244+
public void onResponse(CollectedProfiles result) {
245245
try {
246246
onBatchCompleted(endBatchIndex);
247247
} finally {
@@ -255,7 +255,7 @@ public void onFailure(Exception e) {
255255
for (ShardId shardId : shardIds) {
256256
addShardLevelFailure(shardId, e);
257257
}
258-
onResponse(ComputeListener.CollectedProfiles.EMPTY);
258+
onResponse(CollectedProfiles.EMPTY);
259259
} else {
260260
// TODO: add these to fatal failures so we can continue processing other shards.
261261
try {
@@ -269,7 +269,7 @@ public void onFailure(Exception e) {
269269
acquireSearchContexts(clusterAlias, shardIds, configuration, request.aliasFilters(), ActionListener.wrap(searchContexts -> {
270270
assert ThreadPool.assertCurrentThreadPool(ThreadPool.Names.SEARCH, ESQL_WORKER_THREAD_POOL_NAME);
271271
if (searchContexts.isEmpty()) {
272-
batchListener.onResponse(ComputeListener.CollectedProfiles.EMPTY);
272+
batchListener.onResponse(CollectedProfiles.EMPTY);
273273
return;
274274
}
275275
var computeContext = new ComputeContext(

x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plugin/DataNodeComputeResponse.java

Lines changed: 7 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -13,7 +13,6 @@
1313
import org.elasticsearch.compute.operator.DriverProfile;
1414
import org.elasticsearch.index.shard.ShardId;
1515
import org.elasticsearch.transport.TransportResponse;
16-
import org.elasticsearch.xpack.esql.planner.PlannerProfile;
1716

1817
import java.io.IOException;
1918
import java.util.List;
@@ -24,27 +23,27 @@
2423
* The compute result of {@link DataNodeRequest}
2524
*/
2625
final class DataNodeComputeResponse extends TransportResponse {
27-
private final ComputeListener.CollectedProfiles profiles;
26+
private final CollectedProfiles profiles;
2827
private final Map<ShardId, Exception> shardLevelFailures;
2928

30-
DataNodeComputeResponse(ComputeListener.CollectedProfiles profiles, Map<ShardId, Exception> shardLevelFailures) {
29+
DataNodeComputeResponse(CollectedProfiles profiles, Map<ShardId, Exception> shardLevelFailures) {
3130
this.profiles = profiles;
3231
this.shardLevelFailures = shardLevelFailures;
3332
}
3433

3534
DataNodeComputeResponse(StreamInput in) throws IOException {
3635
if (DataNodeComputeHandler.supportShardLevelRetryFailure(in.getTransportVersion())) {
3736
if (in.getTransportVersion().onOrAfter(TransportVersions.ESQL_PLANNER_PROFILE)) {
38-
this.profiles = new ComputeListener.CollectedProfiles(in);
37+
this.profiles = new CollectedProfiles(in);
3938
} else {
40-
this.profiles = new ComputeListener.CollectedProfiles(in.readCollectionAsImmutableList(DriverProfile::readFrom), List.of());
39+
this.profiles = new CollectedProfiles(in.readCollectionAsImmutableList(DriverProfile::readFrom), List.of());
4140
}
4241
this.shardLevelFailures = in.readMap(ShardId::new, StreamInput::readException);
4342
} else {
4443
if (in.getTransportVersion().onOrAfter(TransportVersions.ESQL_PLANNER_PROFILE)) {
45-
this.profiles = new ComputeListener.CollectedProfiles(in);
44+
this.profiles = new CollectedProfiles(in);
4645
} else {
47-
this.profiles = new ComputeListener.CollectedProfiles(
46+
this.profiles = new CollectedProfiles(
4847
Objects.requireNonNullElse(new ComputeResponse(in).getProfiles().getDriverProfiles(), List.of()),
4948
List.of()
5049
);
@@ -70,7 +69,7 @@ public void writeTo(StreamOutput out) throws IOException {
7069
}
7170
}
7271

73-
ComputeListener.CollectedProfiles profiles() {
72+
CollectedProfiles profiles() {
7473
return profiles;
7574
}
7675

x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plugin/DataNodeRequestSender.java

Lines changed: 3 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -19,7 +19,6 @@
1919
import org.elasticsearch.cluster.node.DiscoveryNode;
2020
import org.elasticsearch.common.breaker.CircuitBreakingException;
2121
import org.elasticsearch.common.util.concurrent.ConcurrentCollections;
22-
import org.elasticsearch.compute.operator.DriverProfile;
2322
import org.elasticsearch.compute.operator.FailureCollector;
2423
import org.elasticsearch.core.TimeValue;
2524
import org.elasticsearch.index.Index;
@@ -155,9 +154,9 @@ private void reportFailures(ComputeListener computeListener) {
155154
}
156155

157156
private void sendOneNodeRequest(TargetShards targetShards, ComputeListener computeListener, NodeRequest request) {
158-
final ActionListener<ComputeListener.CollectedProfiles> listener = computeListener.acquireCompute();
157+
final ActionListener<CollectedProfiles> listener = computeListener.acquireCompute();
159158
sendRequest(request.node, request.shardIds, request.aliasFilters, new NodeListener() {
160-
void onAfter(ComputeListener.CollectedProfiles profiles) {
159+
void onAfter(CollectedProfiles profiles) {
161160
nodePermits.get(request.node).release();
162161
trySendingRequestsForPendingShards(targetShards, computeListener);
163162
listener.onResponse(profiles);
@@ -185,7 +184,7 @@ public void onFailure(Exception e, boolean receivedData) {
185184
trackShardLevelFailure(shardId, receivedData, e);
186185
pendingShardIds.add(shardId);
187186
}
188-
onAfter(ComputeListener.CollectedProfiles.EMPTY);
187+
onAfter(CollectedProfiles.EMPTY);
189188
}
190189
});
191190
}

0 commit comments

Comments
 (0)