Skip to content

Commit 5d9ad67

Browse files
authored
Move node-level reduction plan to data node (#117422)
This change moves the logic for extracting the node-level plan to the data node instead of the coordinator. There are several benefits to doing this on the data node instead: 1. Minimize serialization, especially inter-cluster communications. 2. Resolve the row size estimation issue when generating this plan on data nodes. This will be addressed in a follow-up. 3. Allow each cluster to decide whether to run node-level reduction based on its own topology.
1 parent fadc752 commit 5d9ad67

File tree

7 files changed

+56
-57
lines changed

7 files changed

+56
-57
lines changed

server/src/main/java/org/elasticsearch/TransportVersions.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -206,7 +206,7 @@ static TransportVersion def(int id) {
206206
public static final TransportVersion INGEST_PIPELINE_CONFIGURATION_AS_MAP = def(8_797_00_0);
207207
public static final TransportVersion INDEXING_PRESSURE_THROTTLING_STATS = def(8_798_00_0);
208208
public static final TransportVersion REINDEX_DATA_STREAMS = def(8_799_00_0);
209-
209+
public static final TransportVersion ESQL_REMOVE_NODE_LEVEL_PLAN = def(8_800_00_0);
210210
/*
211211
* STOP! READ THIS FIRST! No, really,
212212
* ____ _____ ___ ____ _ ____ _____ _ ____ _____ _ _ ___ ____ _____ ___ ____ ____ _____ _

x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/optimizer/rules/physical/ProjectAwayColumns.java

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -73,8 +73,7 @@ public PhysicalPlan apply(PhysicalPlan plan) {
7373
Source.EMPTY,
7474
new Project(logicalFragment.source(), logicalFragment, output),
7575
fragmentExec.esFilter(),
76-
fragmentExec.estimatedRowSize(),
77-
fragmentExec.reducer()
76+
fragmentExec.estimatedRowSize()
7877
);
7978
return new ExchangeExec(exec.source(), output, exec.inBetweenAggs(), newChild);
8079
}

x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plan/physical/FragmentExec.java

Lines changed: 26 additions & 30 deletions
Original file line numberDiff line numberDiff line change
@@ -31,7 +31,6 @@ public class FragmentExec extends LeafExec implements EstimatesRowSize {
3131

3232
private final LogicalPlan fragment;
3333
private final QueryBuilder esFilter;
34-
private final PhysicalPlan reducer; // datanode-level physical plan node that performs an intermediate (not partial) reduce
3534

3635
/**
3736
* Estimate of the number of bytes that'll be loaded per position before
@@ -40,35 +39,42 @@ public class FragmentExec extends LeafExec implements EstimatesRowSize {
4039
private final int estimatedRowSize;
4140

4241
public FragmentExec(LogicalPlan fragment) {
43-
this(fragment.source(), fragment, null, 0, null);
42+
this(fragment.source(), fragment, null, 0);
4443
}
4544

46-
public FragmentExec(Source source, LogicalPlan fragment, QueryBuilder esFilter, int estimatedRowSize, PhysicalPlan reducer) {
45+
public FragmentExec(Source source, LogicalPlan fragment, QueryBuilder esFilter, int estimatedRowSize) {
4746
super(source);
4847
this.fragment = fragment;
4948
this.esFilter = esFilter;
5049
this.estimatedRowSize = estimatedRowSize;
51-
this.reducer = reducer;
5250
}
5351

5452
private FragmentExec(StreamInput in) throws IOException {
55-
this(
56-
Source.readFrom((PlanStreamInput) in),
57-
in.readNamedWriteable(LogicalPlan.class),
58-
in.readOptionalNamedWriteable(QueryBuilder.class),
59-
in.readOptionalVInt(),
60-
in.getTransportVersion().onOrAfter(TransportVersions.V_8_14_0) ? in.readOptionalNamedWriteable(PhysicalPlan.class) : null
61-
);
53+
super(Source.readFrom((PlanStreamInput) in));
54+
this.fragment = in.readNamedWriteable(LogicalPlan.class);
55+
this.esFilter = in.readOptionalNamedWriteable(QueryBuilder.class);
56+
if (in.getTransportVersion().onOrAfter(TransportVersions.ESQL_REMOVE_NODE_LEVEL_PLAN)) {
57+
this.estimatedRowSize = in.readVInt();
58+
} else {
59+
this.estimatedRowSize = Objects.requireNonNull(in.readOptionalVInt());
60+
if (in.getTransportVersion().onOrAfter(TransportVersions.V_8_14_0)) {
61+
in.readOptionalNamedWriteable(PhysicalPlan.class); // for old reducer
62+
}
63+
}
6264
}
6365

6466
@Override
6567
public void writeTo(StreamOutput out) throws IOException {
6668
Source.EMPTY.writeTo(out);
6769
out.writeNamedWriteable(fragment());
6870
out.writeOptionalNamedWriteable(esFilter());
69-
out.writeOptionalVInt(estimatedRowSize());
70-
if (out.getTransportVersion().onOrAfter(TransportVersions.V_8_14_0)) {
71-
out.writeOptionalNamedWriteable(reducer);
71+
if (out.getTransportVersion().onOrAfter(TransportVersions.ESQL_REMOVE_NODE_LEVEL_PLAN)) {
72+
out.writeVInt(estimatedRowSize);
73+
} else {
74+
out.writeOptionalVInt(estimatedRowSize());
75+
if (out.getTransportVersion().onOrAfter(TransportVersions.V_8_14_0)) {
76+
out.writeOptionalNamedWriteable(null);// for old reducer
77+
}
7278
}
7379
}
7480

@@ -89,13 +95,9 @@ public Integer estimatedRowSize() {
8995
return estimatedRowSize;
9096
}
9197

92-
public PhysicalPlan reducer() {
93-
return reducer;
94-
}
95-
9698
@Override
9799
protected NodeInfo<FragmentExec> info() {
98-
return NodeInfo.create(this, FragmentExec::new, fragment, esFilter, estimatedRowSize, reducer);
100+
return NodeInfo.create(this, FragmentExec::new, fragment, esFilter, estimatedRowSize);
99101
}
100102

101103
@Override
@@ -108,24 +110,20 @@ public PhysicalPlan estimateRowSize(State state) {
108110
int estimatedRowSize = state.consumeAllFields(false);
109111
return Objects.equals(estimatedRowSize, this.estimatedRowSize)
110112
? this
111-
: new FragmentExec(source(), fragment, esFilter, estimatedRowSize, reducer);
113+
: new FragmentExec(source(), fragment, esFilter, estimatedRowSize);
112114
}
113115

114116
public FragmentExec withFragment(LogicalPlan fragment) {
115-
return Objects.equals(fragment, this.fragment) ? this : new FragmentExec(source(), fragment, esFilter, estimatedRowSize, reducer);
117+
return Objects.equals(fragment, this.fragment) ? this : new FragmentExec(source(), fragment, esFilter, estimatedRowSize);
116118
}
117119

118120
public FragmentExec withFilter(QueryBuilder filter) {
119-
return Objects.equals(filter, this.esFilter) ? this : new FragmentExec(source(), fragment, filter, estimatedRowSize, reducer);
120-
}
121-
122-
public FragmentExec withReducer(PhysicalPlan reducer) {
123-
return Objects.equals(reducer, this.reducer) ? this : new FragmentExec(source(), fragment, esFilter, estimatedRowSize, reducer);
121+
return Objects.equals(filter, this.esFilter) ? this : new FragmentExec(source(), fragment, filter, estimatedRowSize);
124122
}
125123

126124
@Override
127125
public int hashCode() {
128-
return Objects.hash(fragment, esFilter, estimatedRowSize, reducer);
126+
return Objects.hash(fragment, esFilter, estimatedRowSize);
129127
}
130128

131129
@Override
@@ -141,8 +139,7 @@ public boolean equals(Object obj) {
141139
FragmentExec other = (FragmentExec) obj;
142140
return Objects.equals(fragment, other.fragment)
143141
&& Objects.equals(esFilter, other.esFilter)
144-
&& Objects.equals(estimatedRowSize, other.estimatedRowSize)
145-
&& Objects.equals(reducer, other.reducer);
142+
&& Objects.equals(estimatedRowSize, other.estimatedRowSize);
146143
}
147144

148145
@Override
@@ -154,7 +151,6 @@ public String nodeString() {
154151
sb.append(", estimatedRowSize=");
155152
sb.append(estimatedRowSize);
156153
sb.append(", reducer=[");
157-
sb.append(reducer == null ? "" : reducer.toString());
158154
sb.append("], fragment=[<>\n");
159155
sb.append(fragment.toString());
160156
sb.append("<>]]");

x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plugin/ComputeService.java

Lines changed: 16 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -60,6 +60,7 @@
6060
import org.elasticsearch.xpack.esql.action.EsqlQueryAction;
6161
import org.elasticsearch.xpack.esql.action.EsqlSearchShardsAction;
6262
import org.elasticsearch.xpack.esql.core.expression.Attribute;
63+
import org.elasticsearch.xpack.esql.core.util.Holder;
6364
import org.elasticsearch.xpack.esql.enrich.EnrichLookupService;
6465
import org.elasticsearch.xpack.esql.enrich.LookupFromIndexService;
6566
import org.elasticsearch.xpack.esql.plan.physical.ExchangeSinkExec;
@@ -314,14 +315,7 @@ private void startComputeOnDataNodes(
314315
EsqlExecutionInfo executionInfo,
315316
ComputeListener computeListener
316317
) {
317-
var planWithReducer = configuration.pragmas().nodeLevelReduction() == false
318-
? dataNodePlan
319-
: dataNodePlan.transformUp(FragmentExec.class, f -> {
320-
PhysicalPlan reductionNode = PlannerUtils.dataNodeReductionPlan(f.fragment(), dataNodePlan);
321-
return reductionNode == null ? f : f.withReducer(reductionNode);
322-
});
323-
324-
QueryBuilder requestFilter = PlannerUtils.requestTimestampFilter(planWithReducer);
318+
QueryBuilder requestFilter = PlannerUtils.requestTimestampFilter(dataNodePlan);
325319
var lookupListener = ActionListener.releaseAfter(computeListener.acquireAvoid(), exchangeSource.addEmptySink());
326320
// SearchShards API can_match is done in lookupDataNodes
327321
lookupDataNodes(parentTask, clusterAlias, requestFilter, concreteIndices, originalIndices, ActionListener.wrap(dataNodeResult -> {
@@ -361,7 +355,7 @@ private void startComputeOnDataNodes(
361355
clusterAlias,
362356
node.shardIds,
363357
node.aliasFilters,
364-
planWithReducer,
358+
dataNodePlan,
365359
originalIndices.indices(),
366360
originalIndices.indicesOptions()
367361
),
@@ -450,12 +444,12 @@ void runCompute(CancellableTask task, ComputeContext context, PhysicalPlan plan,
450444
);
451445

452446
LOGGER.debug("Received physical plan:\n{}", plan);
447+
453448
plan = PlannerUtils.localPlan(context.searchExecutionContexts(), context.configuration, plan);
454449
// the planner will also set the driver parallelism in LocalExecutionPlanner.LocalExecutionPlan (used down below)
455450
// it's doing this in the planning of EsQueryExec (the source of the data)
456451
// see also EsPhysicalOperationProviders.sourcePhysicalOperation
457452
LocalExecutionPlanner.LocalExecutionPlan localExecutionPlan = planner.plan(plan);
458-
459453
if (LOGGER.isDebugEnabled()) {
460454
LOGGER.debug("Local execution plan:\n{}", localExecutionPlan.describe());
461455
}
@@ -785,14 +779,23 @@ public void messageReceived(DataNodeRequest request, TransportChannel channel, T
785779
listener.onFailure(new IllegalStateException("expected a fragment plan for a remote compute; got " + request.plan()));
786780
return;
787781
}
788-
789782
var localExchangeSource = new ExchangeSourceExec(plan.source(), plan.output(), plan.isIntermediateAgg());
790-
FragmentExec fragment = (FragmentExec) fragments.get(0);
783+
Holder<PhysicalPlan> reducePlanHolder = new Holder<>();
784+
if (request.pragmas().nodeLevelReduction()) {
785+
PhysicalPlan dataNodePlan = request.plan();
786+
request.plan()
787+
.forEachUp(
788+
FragmentExec.class,
789+
f -> { reducePlanHolder.set(PlannerUtils.dataNodeReductionPlan(f.fragment(), dataNodePlan)); }
790+
);
791+
}
791792
reducePlan = new ExchangeSinkExec(
792793
plan.source(),
793794
plan.output(),
794795
plan.isIntermediateAgg(),
795-
fragment.reducer() != null ? fragment.reducer().replaceChildren(List.of(localExchangeSource)) : localExchangeSource
796+
reducePlanHolder.get() != null
797+
? reducePlanHolder.get().replaceChildren(List.of(localExchangeSource))
798+
: localExchangeSource
796799
);
797800
} else {
798801
listener.onFailure(new IllegalStateException("expected exchange sink for a remote compute; got " + request.plan()));

x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/plan/physical/ExchangeSinkExecSerializationTests.java

Lines changed: 8 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -66,12 +66,13 @@ protected boolean alwaysEmptySource() {
6666
* See {@link #testManyTypeConflicts(boolean, ByteSizeValue)} for more.
6767
*/
6868
public void testManyTypeConflicts() throws IOException {
69-
testManyTypeConflicts(false, ByteSizeValue.ofBytes(1424048));
69+
testManyTypeConflicts(false, ByteSizeValue.ofBytes(1424046L));
7070
/*
7171
* History:
7272
* 2.3mb - shorten error messages for UnsupportedAttributes #111973
7373
* 1.8mb - cache EsFields #112008
7474
* 1.4mb - string serialization #112929
75+
* 1424046b - remove node-level plan #117422
7576
*/
7677
}
7778

@@ -80,7 +81,7 @@ public void testManyTypeConflicts() throws IOException {
8081
* See {@link #testManyTypeConflicts(boolean, ByteSizeValue)} for more.
8182
*/
8283
public void testManyTypeConflictsWithParent() throws IOException {
83-
testManyTypeConflicts(true, ByteSizeValue.ofBytes(2774192));
84+
testManyTypeConflicts(true, ByteSizeValue.ofBytes(2774190));
8485
/*
8586
* History:
8687
* 2 gb+ - start
@@ -89,6 +90,7 @@ public void testManyTypeConflictsWithParent() throws IOException {
8990
* 3.1mb - cache EsFields #112008
9091
* 2774214b - string serialization #112929
9192
* 2774192b - remove field attribute #112881
93+
* 2774190b - remove node-level plan #117422
9294
*/
9395
}
9496

@@ -103,11 +105,12 @@ private void testManyTypeConflicts(boolean withParent, ByteSizeValue expected) t
103105
* with a single root field that has many children, grandchildren etc.
104106
*/
105107
public void testDeeplyNestedFields() throws IOException {
106-
ByteSizeValue expected = ByteSizeValue.ofBytes(47252411);
108+
ByteSizeValue expected = ByteSizeValue.ofBytes(47252409);
107109
/*
108110
* History:
109111
* 48223371b - string serialization #112929
110112
* 47252411b - remove field attribute #112881
113+
* 47252409b - remove node-level plan
111114
*/
112115

113116
int depth = 6;
@@ -123,11 +126,12 @@ public void testDeeplyNestedFields() throws IOException {
123126
* with a single root field that has many children, grandchildren etc.
124127
*/
125128
public void testDeeplyNestedFieldsKeepOnlyOne() throws IOException {
126-
ByteSizeValue expected = ByteSizeValue.ofBytes(9425806);
129+
ByteSizeValue expected = ByteSizeValue.ofBytes(9425804);
127130
/*
128131
* History:
129132
* 9426058b - string serialization #112929
130133
* 9425806b - remove field attribute #112881
134+
* 9425804b - remove node-level plan #117422
131135
*/
132136

133137
int depth = 6;

x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/plan/physical/FragmentExecSerializationTests.java

Lines changed: 3 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -22,8 +22,7 @@ public static FragmentExec randomFragmentExec(int depth) {
2222
LogicalPlan fragment = AbstractLogicalPlanSerializationTests.randomChild(depth);
2323
QueryBuilder esFilter = EsqlQueryRequestTests.randomQueryBuilder();
2424
int estimatedRowSize = between(0, Integer.MAX_VALUE);
25-
PhysicalPlan reducer = randomChild(depth);
26-
return new FragmentExec(source, fragment, esFilter, estimatedRowSize, reducer);
25+
return new FragmentExec(source, fragment, esFilter, estimatedRowSize);
2726
}
2827

2928
@Override
@@ -36,15 +35,13 @@ protected FragmentExec mutateInstance(FragmentExec instance) throws IOException
3635
LogicalPlan fragment = instance.fragment();
3736
QueryBuilder esFilter = instance.esFilter();
3837
int estimatedRowSize = instance.estimatedRowSize();
39-
PhysicalPlan reducer = instance.reducer();
40-
switch (between(0, 3)) {
38+
switch (between(0, 2)) {
4139
case 0 -> fragment = randomValueOtherThan(fragment, () -> AbstractLogicalPlanSerializationTests.randomChild(0));
4240
case 1 -> esFilter = randomValueOtherThan(esFilter, EsqlQueryRequestTests::randomQueryBuilder);
4341
case 2 -> estimatedRowSize = randomValueOtherThan(estimatedRowSize, () -> between(0, Integer.MAX_VALUE));
44-
case 3 -> reducer = randomValueOtherThan(reducer, () -> randomChild(0));
4542
default -> throw new UnsupportedEncodingException();
4643
}
47-
return new FragmentExec(instance.source(), fragment, esFilter, estimatedRowSize, reducer);
44+
return new FragmentExec(instance.source(), fragment, esFilter, estimatedRowSize);
4845
}
4946

5047
@Override

x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/planner/FilterTests.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -305,7 +305,7 @@ private PhysicalPlan plan(String query, QueryBuilder restFilter) {
305305
// System.out.println("physical\n" + physical);
306306
physical = physical.transformUp(
307307
FragmentExec.class,
308-
f -> new FragmentExec(f.source(), f.fragment(), restFilter, f.estimatedRowSize(), f.reducer())
308+
f -> new FragmentExec(f.source(), f.fragment(), restFilter, f.estimatedRowSize())
309309
);
310310
physical = physicalPlanOptimizer.optimize(physical);
311311
// System.out.println("optimized\n" + physical);

0 commit comments

Comments
 (0)