Skip to content

Commit 94de73a

Browse files
authored
Enable node-level reduction by default (elastic#119621) (elastic#119688)
This change enables node-level reduction by default in ES|QL. However, if the coordinator node and the target data node are the same, node-level reduction is disabled to avoid unnecessary overhead.
1 parent 4adebcf commit 94de73a

File tree

8 files changed

+127
-48
lines changed

8 files changed

+127
-48
lines changed

docs/changelog/119621.yaml

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,5 @@
1+
pr: 119621
2+
summary: Enable node-level reduction by default
3+
area: ES|QL
4+
type: enhancement
5+
issues: []

server/src/main/java/org/elasticsearch/TransportVersions.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -155,6 +155,7 @@ static TransportVersion def(int id) {
155155
public static final TransportVersion NODE_SHUTDOWN_EPHEMERAL_ID_ADDED = def(8_815_00_0);
156156
public static final TransportVersion ESQL_CCS_TELEMETRY_STATS = def(8_816_00_0);
157157
public static final TransportVersion TEXT_EMBEDDING_QUERY_VECTOR_BUILDER_INFER_MODEL_ID = def(8_817_00_0);
158+
public static final TransportVersion ESQL_ENABLE_NODE_LEVEL_REDUCTION = def(8_818_00_0);
158159

159160
/*
160161
* STOP! READ THIS FIRST! No, really,

x-pack/plugin/esql/src/internalClusterTest/java/org/elasticsearch/xpack/esql/action/EsqlActionTaskIT.java

Lines changed: 37 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -240,13 +240,13 @@ private ActionFuture<EsqlQueryResponse> startEsql(String query) {
240240
// Report the status after every action
241241
.put("status_interval", "0ms");
242242

243-
if (nodeLevelReduction == false) {
244-
// explicitly set the default (false) or don't
243+
if (nodeLevelReduction) {
244+
// explicitly set the default (true) or don't
245245
if (randomBoolean()) {
246-
settingsBuilder.put("node_level_reduction", nodeLevelReduction);
246+
settingsBuilder.put("node_level_reduction", true);
247247
}
248248
} else {
249-
settingsBuilder.put("node_level_reduction", nodeLevelReduction);
249+
settingsBuilder.put("node_level_reduction", false);
250250
}
251251

252252
var pragmas = new QueryPragmas(settingsBuilder.build());
@@ -273,14 +273,7 @@ private void cancelTask(TaskId taskId) {
273273
private List<TaskInfo> getTasksStarting() throws Exception {
274274
List<TaskInfo> foundTasks = new ArrayList<>();
275275
assertBusy(() -> {
276-
List<TaskInfo> tasks = client().admin()
277-
.cluster()
278-
.prepareListTasks()
279-
.setActions(DriverTaskRunner.ACTION_NAME)
280-
.setDetailed(true)
281-
.get()
282-
.getTasks();
283-
assertThat(tasks, hasSize(equalTo(3)));
276+
List<TaskInfo> tasks = getDriverTasks();
284277
for (TaskInfo task : tasks) {
285278
assertThat(task.action(), equalTo(DriverTaskRunner.ACTION_NAME));
286279
DriverStatus status = (DriverStatus) task.status();
@@ -305,14 +298,7 @@ private List<TaskInfo> getTasksStarting() throws Exception {
305298
private List<TaskInfo> getTasksRunning() throws Exception {
306299
List<TaskInfo> foundTasks = new ArrayList<>();
307300
assertBusy(() -> {
308-
List<TaskInfo> tasks = client().admin()
309-
.cluster()
310-
.prepareListTasks()
311-
.setActions(DriverTaskRunner.ACTION_NAME)
312-
.setDetailed(true)
313-
.get()
314-
.getTasks();
315-
assertThat(tasks, hasSize(equalTo(3)));
301+
List<TaskInfo> tasks = getDriverTasks();
316302
for (TaskInfo task : tasks) {
317303
assertThat(task.action(), equalTo(DriverTaskRunner.ACTION_NAME));
318304
DriverStatus status = (DriverStatus) task.status();
@@ -328,6 +314,37 @@ private List<TaskInfo> getTasksRunning() throws Exception {
328314
return foundTasks;
329315
}
330316

317+
/**
318+
* Fetches tasks until all three driver tasks have started
319+
*/
320+
private List<TaskInfo> getDriverTasks() throws Exception {
321+
List<TaskInfo> foundTasks = new ArrayList<>();
322+
assertBusy(() -> {
323+
List<TaskInfo> tasks = client().admin()
324+
.cluster()
325+
.prepareListTasks()
326+
.setActions(DriverTaskRunner.ACTION_NAME)
327+
.setDetailed(true)
328+
.get()
329+
.getTasks();
330+
assertThat(tasks, hasSize(equalTo(3)));
331+
List<TaskInfo> readTasks = tasks.stream().filter(t -> t.description().equals(READ_DESCRIPTION)).toList();
332+
List<TaskInfo> mergeTasks = tasks.stream().filter(t -> t.description().equals(MERGE_DESCRIPTION)).toList();
333+
assertThat(readTasks, hasSize(1));
334+
assertThat(mergeTasks, hasSize(1));
335+
// node-level reduction is disabled when the target data node is also the coordinator
336+
if (readTasks.get(0).node().equals(mergeTasks.get(0).node())) {
337+
REDUCE_DESCRIPTION = """
338+
\\_ExchangeSourceOperator[]
339+
\\_ExchangeSinkOperator""";
340+
}
341+
List<TaskInfo> reduceTasks = tasks.stream().filter(t -> t.description().equals(REDUCE_DESCRIPTION)).toList();
342+
assertThat(reduceTasks, hasSize(1));
343+
foundTasks.addAll(tasks);
344+
});
345+
return foundTasks;
346+
}
347+
331348
private void assertCancelled(ActionFuture<EsqlQueryResponse> response) throws Exception {
332349
Exception e = expectThrows(Exception.class, response);
333350
Throwable cancelException = ExceptionsHelper.unwrap(e, TaskCancelledException.class);

x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plugin/ComputeService.java

Lines changed: 16 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -358,19 +358,22 @@ private void startComputeOnDataNodes(
358358
exchangeSource.addRemoteSink(remoteSink, true, queryPragmas.concurrentExchangeClients(), ActionListener.noop());
359359
ActionListener<ComputeResponse> computeResponseListener = computeListener.acquireCompute(clusterAlias);
360360
var dataNodeListener = ActionListener.runBefore(computeResponseListener, () -> l.onResponse(null));
361+
final boolean sameNode = transportService.getLocalNode().getId().equals(node.connection.getNode().getId());
362+
var dataNodeRequest = new DataNodeRequest(
363+
childSessionId,
364+
configuration,
365+
clusterAlias,
366+
node.shardIds,
367+
node.aliasFilters,
368+
dataNodePlan,
369+
originalIndices.indices(),
370+
originalIndices.indicesOptions(),
371+
sameNode == false && queryPragmas.nodeLevelReduction()
372+
);
361373
transportService.sendChildRequest(
362374
node.connection,
363375
DATA_ACTION_NAME,
364-
new DataNodeRequest(
365-
childSessionId,
366-
configuration,
367-
clusterAlias,
368-
node.shardIds,
369-
node.aliasFilters,
370-
dataNodePlan,
371-
originalIndices.indices(),
372-
originalIndices.indicesOptions()
373-
),
376+
dataNodeRequest,
374377
parentTask,
375378
TransportRequestOptions.EMPTY,
376379
new ActionListenerResponseHandler<>(dataNodeListener, ComputeResponse::new, esqlExecutor)
@@ -803,7 +806,7 @@ public void messageReceived(DataNodeRequest request, TransportChannel channel, T
803806
final ActionListener<ComputeResponse> listener = new ChannelActionListener<>(channel);
804807
final PhysicalPlan reductionPlan;
805808
if (request.plan() instanceof ExchangeSinkExec plan) {
806-
reductionPlan = reductionPlan(plan, request.pragmas().nodeLevelReduction());
809+
reductionPlan = reductionPlan(plan, request.runNodeLevelReduction());
807810
} else {
808811
listener.onFailure(new IllegalStateException("expected exchange sink for a remote compute; got " + request.plan()));
809812
return;
@@ -817,7 +820,8 @@ public void messageReceived(DataNodeRequest request, TransportChannel channel, T
817820
request.aliasFilters(),
818821
request.plan(),
819822
request.indices(),
820-
request.indicesOptions()
823+
request.indicesOptions(),
824+
request.runNodeLevelReduction()
821825
);
822826
try (var computeListener = ComputeListener.create(transportService, (CancellableTask) task, listener)) {
823827
runComputeOnDataNode((CancellableTask) task, sessionId, reductionPlan, request, computeListener);

x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plugin/DataNodeRequest.java

Lines changed: 29 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -54,6 +54,7 @@ final class DataNodeRequest extends TransportRequest implements IndicesRequest.R
5454
private List<ShardId> shardIds;
5555
private String[] indices;
5656
private final IndicesOptions indicesOptions;
57+
private final boolean runNodeLevelReduction;
5758

5859
DataNodeRequest(
5960
String sessionId,
@@ -63,7 +64,8 @@ final class DataNodeRequest extends TransportRequest implements IndicesRequest.R
6364
Map<Index, AliasFilter> aliasFilters,
6465
PhysicalPlan plan,
6566
String[] indices,
66-
IndicesOptions indicesOptions
67+
IndicesOptions indicesOptions,
68+
boolean runNodeLevelReduction
6769
) {
6870
this.sessionId = sessionId;
6971
this.configuration = configuration;
@@ -73,6 +75,7 @@ final class DataNodeRequest extends TransportRequest implements IndicesRequest.R
7375
this.plan = plan;
7476
this.indices = indices;
7577
this.indicesOptions = indicesOptions;
78+
this.runNodeLevelReduction = runNodeLevelReduction;
7679
}
7780

7881
DataNodeRequest(StreamInput in) throws IOException {
@@ -97,6 +100,11 @@ final class DataNodeRequest extends TransportRequest implements IndicesRequest.R
97100
this.indices = shardIds.stream().map(ShardId::getIndexName).distinct().toArray(String[]::new);
98101
this.indicesOptions = IndicesOptions.strictSingleIndexNoExpandForbidClosed();
99102
}
103+
if (in.getTransportVersion().onOrAfter(TransportVersions.ESQL_ENABLE_NODE_LEVEL_REDUCTION)) {
104+
this.runNodeLevelReduction = in.readBoolean();
105+
} else {
106+
this.runNodeLevelReduction = false;
107+
}
100108
}
101109

102110
@Override
@@ -114,6 +122,9 @@ public void writeTo(StreamOutput out) throws IOException {
114122
out.writeStringArray(indices);
115123
indicesOptions.writeIndicesOptions(out);
116124
}
125+
if (out.getTransportVersion().onOrAfter(TransportVersions.ESQL_ENABLE_NODE_LEVEL_REDUCTION)) {
126+
out.writeBoolean(runNodeLevelReduction);
127+
}
117128
}
118129

119130
@Override
@@ -186,6 +197,10 @@ PhysicalPlan plan() {
186197
return plan;
187198
}
188199

200+
boolean runNodeLevelReduction() {
201+
return runNodeLevelReduction;
202+
}
203+
189204
@Override
190205
public String getDescription() {
191206
return "shards=" + shardIds + " plan=" + plan;
@@ -209,11 +224,22 @@ public boolean equals(Object o) {
209224
&& plan.equals(request.plan)
210225
&& getParentTask().equals(request.getParentTask())
211226
&& Arrays.equals(indices, request.indices)
212-
&& indicesOptions.equals(request.indicesOptions);
227+
&& indicesOptions.equals(request.indicesOptions)
228+
&& runNodeLevelReduction == request.runNodeLevelReduction;
213229
}
214230

215231
@Override
216232
public int hashCode() {
217-
return Objects.hash(sessionId, configuration, clusterAlias, shardIds, aliasFilters, plan, Arrays.hashCode(indices), indicesOptions);
233+
return Objects.hash(
234+
sessionId,
235+
configuration,
236+
clusterAlias,
237+
shardIds,
238+
aliasFilters,
239+
plan,
240+
Arrays.hashCode(indices),
241+
indicesOptions,
242+
runNodeLevelReduction
243+
);
218244
}
219245
}

x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plugin/QueryPragmas.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -55,7 +55,7 @@ public final class QueryPragmas implements Writeable {
5555

5656
public static final Setting<Integer> MAX_CONCURRENT_SHARDS_PER_NODE = Setting.intSetting("max_concurrent_shards_per_node", 10, 1, 100);
5757

58-
public static final Setting<Boolean> NODE_LEVEL_REDUCTION = Setting.boolSetting("node_level_reduction", false);
58+
public static final Setting<Boolean> NODE_LEVEL_REDUCTION = Setting.boolSetting("node_level_reduction", true);
5959

6060
public static final QueryPragmas EMPTY = new QueryPragmas(Settings.EMPTY);
6161

x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/plugin/DataNodeRequestSerializationTests.java

Lines changed: 36 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -92,15 +92,16 @@ protected DataNodeRequest createTestInstance() {
9292
aliasFilters,
9393
physicalPlan,
9494
generateRandomStringArray(10, 10, false, false),
95-
IndicesOptions.fromOptions(randomBoolean(), randomBoolean(), randomBoolean(), randomBoolean())
95+
IndicesOptions.fromOptions(randomBoolean(), randomBoolean(), randomBoolean(), randomBoolean()),
96+
randomBoolean()
9697
);
9798
request.setParentTask(randomAlphaOfLength(10), randomNonNegativeLong());
9899
return request;
99100
}
100101

101102
@Override
102103
protected DataNodeRequest mutateInstance(DataNodeRequest in) throws IOException {
103-
return switch (between(0, 8)) {
104+
return switch (between(0, 9)) {
104105
case 0 -> {
105106
var request = new DataNodeRequest(
106107
randomAlphaOfLength(20),
@@ -110,7 +111,8 @@ protected DataNodeRequest mutateInstance(DataNodeRequest in) throws IOException
110111
in.aliasFilters(),
111112
in.plan(),
112113
in.indices(),
113-
in.indicesOptions()
114+
in.indicesOptions(),
115+
in.runNodeLevelReduction()
114116
);
115117
request.setParentTask(in.getParentTask());
116118
yield request;
@@ -124,7 +126,8 @@ protected DataNodeRequest mutateInstance(DataNodeRequest in) throws IOException
124126
in.aliasFilters(),
125127
in.plan(),
126128
in.indices(),
127-
in.indicesOptions()
129+
in.indicesOptions(),
130+
in.runNodeLevelReduction()
128131
);
129132
request.setParentTask(in.getParentTask());
130133
yield request;
@@ -139,7 +142,8 @@ protected DataNodeRequest mutateInstance(DataNodeRequest in) throws IOException
139142
in.aliasFilters(),
140143
in.plan(),
141144
in.indices(),
142-
in.indicesOptions()
145+
in.indicesOptions(),
146+
in.runNodeLevelReduction()
143147
);
144148
request.setParentTask(in.getParentTask());
145149
yield request;
@@ -166,7 +170,8 @@ protected DataNodeRequest mutateInstance(DataNodeRequest in) throws IOException
166170
in.aliasFilters(),
167171
mapAndMaybeOptimize(parse(newQuery)),
168172
in.indices(),
169-
in.indicesOptions()
173+
in.indicesOptions(),
174+
in.runNodeLevelReduction()
170175
);
171176
request.setParentTask(in.getParentTask());
172177
yield request;
@@ -186,7 +191,8 @@ protected DataNodeRequest mutateInstance(DataNodeRequest in) throws IOException
186191
aliasFilters,
187192
in.plan(),
188193
in.indices(),
189-
in.indicesOptions()
194+
in.indicesOptions(),
195+
in.runNodeLevelReduction()
190196
);
191197
request.setParentTask(request.getParentTask());
192198
yield request;
@@ -200,7 +206,8 @@ protected DataNodeRequest mutateInstance(DataNodeRequest in) throws IOException
200206
in.aliasFilters(),
201207
in.plan(),
202208
in.indices(),
203-
in.indicesOptions()
209+
in.indicesOptions(),
210+
in.runNodeLevelReduction()
204211
);
205212
request.setParentTask(
206213
randomValueOtherThan(request.getParentTask().getNodeId(), () -> randomAlphaOfLength(10)),
@@ -218,7 +225,8 @@ protected DataNodeRequest mutateInstance(DataNodeRequest in) throws IOException
218225
in.aliasFilters(),
219226
in.plan(),
220227
in.indices(),
221-
in.indicesOptions()
228+
in.indicesOptions(),
229+
in.runNodeLevelReduction()
222230
);
223231
request.setParentTask(request.getParentTask());
224232
yield request;
@@ -233,7 +241,8 @@ protected DataNodeRequest mutateInstance(DataNodeRequest in) throws IOException
233241
in.aliasFilters(),
234242
in.plan(),
235243
indices,
236-
in.indicesOptions()
244+
in.indicesOptions(),
245+
in.runNodeLevelReduction()
237246
);
238247
request.setParentTask(request.getParentTask());
239248
yield request;
@@ -251,7 +260,23 @@ protected DataNodeRequest mutateInstance(DataNodeRequest in) throws IOException
251260
in.aliasFilters(),
252261
in.plan(),
253262
in.indices(),
254-
indicesOptions
263+
indicesOptions,
264+
in.runNodeLevelReduction()
265+
);
266+
request.setParentTask(request.getParentTask());
267+
yield request;
268+
}
269+
case 9 -> {
270+
var request = new DataNodeRequest(
271+
in.sessionId(),
272+
in.configuration(),
273+
in.clusterAlias(),
274+
in.shardIds(),
275+
in.aliasFilters(),
276+
in.plan(),
277+
in.indices(),
278+
in.indicesOptions(),
279+
in.runNodeLevelReduction() == false
255280
);
256281
request.setParentTask(request.getParentTask());
257282
yield request;

x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/plugin/DataNodeRequestTests.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -39,7 +39,8 @@ public void testNoIndexPlaceholder() {
3939
Collections.emptyMap(),
4040
null,
4141
generateRandomStringArray(10, 10, false, false),
42-
IndicesOptions.fromOptions(randomBoolean(), randomBoolean(), randomBoolean(), randomBoolean())
42+
IndicesOptions.fromOptions(randomBoolean(), randomBoolean(), randomBoolean(), randomBoolean()),
43+
randomBoolean()
4344
);
4445

4546
assertThat(request.shardIds(), equalTo(shardIds));

0 commit comments

Comments
 (0)