Skip to content

Commit 1e67573

Browse files
authored
[ESQL] Enable distributed pipeline breakers for external sources via FragmentExec (elastic#143696)
Wrap ExternalRelation in FragmentExec (like EsRelation) so that pipeline breakers (Aggregate, Limit, TopN) above external sources are naturally distributed to data nodes via ExchangeExec. Previously, ExternalRelation was mapped directly to ExternalSourceExec on the coordinator, bypassing the FragmentExec-based distribution that EsRelation uses. This meant all three data-node optimization stages were skipped: LocalLogicalPlanOptimizer, LocalMapper, and LocalPhysicalPlanOptimizer (including filter pushdown via FilterPushdownRegistry). The filter pushdown is especially critical since PushFiltersToSource creates an opaque pushedFilter on ExternalSourceExec that is not serializable and must be created on the same JVM where the operator runs. With this change, on data nodes localPlan() expands the FragmentExec through LocalMapper into ExternalSourceExec, enabling local optimizations. The coordinator-only path correctly discovers and injects splits after localPlan expansion. AdaptiveStrategy now also recognizes TopN as a pipeline breaker for distribution decisions. ReplaceFieldWithConstantOrNull is taught to retain fields from ExternalRelation (like it already does for lookup index fields) since SearchStats is empty for external sources. ExchangeExec nodes wrapping external FragmentExec are collapsed and TopNExec InputOrdering is reset when falling back to coordinator-only execution. Developed using AI-assisted tooling
1 parent 20ca2f4 commit 1e67573

File tree

16 files changed

+643
-68
lines changed

16 files changed

+643
-68
lines changed

docs/changelog/143696.yaml

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,5 @@
1+
area: ES|QL
2+
issues: []
3+
pr: 143696
4+
summary: Enable distributed pipeline breakers for external sources via `FragmentExec`
5+
type: enhancement

x-pack/plugin/esql/qa/testFixtures/src/main/resources/external-basic.csv-spec

Lines changed: 31 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -212,3 +212,34 @@ emp_no:integer | height:double | height_float_rounded:double | height.scaled_flo
212212
10004 | 1.78 | 1.78 | 1.78 | 1.78
213213
10005 | 2.05 | 2.05 | 2.05 | 2.05
214214
;
215+
216+
// Pipeline breaker distribution tests
217+
// These exercise distributed execution paths for pipeline breakers (STATS, TopN, LIMIT)
218+
// across all distribution modes via ExternalDistributedSpecIT
219+
220+
topNSortBySalaryDesc
221+
required_capability: external_command
222+
EXTERNAL "{{employees}}"
223+
| KEEP emp_no, first_name, salary
224+
| SORT salary DESC
225+
| LIMIT 3;
226+
227+
emp_no:integer | first_name:keyword | salary:integer
228+
10029 | "Otmar" | 74999
229+
10045 | "Moss" | 74970
230+
10007 | "Tzvetan" | 74572
231+
;
232+
233+
topNFilteredSortBySalary
234+
required_capability: external_command
235+
EXTERNAL "{{employees}}"
236+
| WHERE gender == "F"
237+
| KEEP emp_no, first_name, salary
238+
| SORT salary DESC
239+
| LIMIT 3;
240+
241+
emp_no:integer | first_name:keyword | salary:integer
242+
10007 | "Tzvetan" | 74572
243+
10027 | "Divier" | 73851
244+
10099 | "Valter" | 73578
245+
;

x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/optimizer/rules/logical/local/ReplaceFieldWithConstantOrNull.java

Lines changed: 7 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,7 @@
2121
import org.elasticsearch.xpack.esql.plan.logical.CompoundOutputEval;
2222
import org.elasticsearch.xpack.esql.plan.logical.EsRelation;
2323
import org.elasticsearch.xpack.esql.plan.logical.Eval;
24+
import org.elasticsearch.xpack.esql.plan.logical.ExternalRelation;
2425
import org.elasticsearch.xpack.esql.plan.logical.Filter;
2526
import org.elasticsearch.xpack.esql.plan.logical.LogicalPlan;
2627
import org.elasticsearch.xpack.esql.plan.logical.OrderBy;
@@ -47,7 +48,9 @@ public class ReplaceFieldWithConstantOrNull extends ParameterizedRule<LogicalPla
4748
@Override
4849
public LogicalPlan apply(LogicalPlan plan, LocalLogicalOptimizerContext localLogicalOptimizerContext) {
4950
var lookupFieldsBuilder = AttributeSet.builder();
51+
var externalFieldsBuilder = AttributeSet.builder();
5052
Map<Attribute, Expression> attrToConstant = new HashMap<>();
53+
plan.forEachUp(ExternalRelation.class, external -> externalFieldsBuilder.addAll(external.output()));
5154
plan.forEachUp(EsRelation.class, esRelation -> {
5255
// Looking for indices in LOOKUP mode is correct: during parsing, we assign the expected mode and even if a lookup index
5356
// is used in the FROM command, it will not be marked with LOOKUP mode there - but STANDARD.
@@ -77,14 +80,16 @@ else if (esRelation.indexMode() == IndexMode.STANDARD) {
7780
}
7881
});
7982
AttributeSet lookupFields = lookupFieldsBuilder.build();
83+
AttributeSet externalFields = externalFieldsBuilder.build();
8084

8185
// Do not use the attribute name, this can deviate from the field name for union types; use fieldName() instead.
82-
// Also retain fields from lookup indices because we do not have stats for these.
86+
// Also retain fields from lookup indices and external sources because we do not have stats for these.
8387
Predicate<FieldAttribute> shouldBeRetained = f -> f.field() instanceof PotentiallyUnmappedKeywordEsField
8488
// The source (or doc) field is added to the relation output as a hack to enable late materialization in the reduce driver.
8589
|| EsQueryExec.isDocAttribute(f)
8690
|| localLogicalOptimizerContext.searchStats().exists(f.fieldName())
87-
|| lookupFields.contains(f);
91+
|| lookupFields.contains(f)
92+
|| externalFields.contains(f);
8893

8994
return plan.transformUp(p -> replaceWithNullOrConstant(p, shouldBeRetained, attrToConstant));
9095
}

x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plan/PlanWritables.java

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,7 @@
1313
import org.elasticsearch.xpack.esql.plan.logical.Enrich;
1414
import org.elasticsearch.xpack.esql.plan.logical.EsRelation;
1515
import org.elasticsearch.xpack.esql.plan.logical.Eval;
16+
import org.elasticsearch.xpack.esql.plan.logical.ExternalRelation;
1617
import org.elasticsearch.xpack.esql.plan.logical.Filter;
1718
import org.elasticsearch.xpack.esql.plan.logical.Grok;
1819
import org.elasticsearch.xpack.esql.plan.logical.InlineStats;
@@ -88,6 +89,7 @@ public static List<NamedWriteableRegistry.Entry> logical() {
8889
Enrich.ENTRY,
8990
EsRelation.ENTRY,
9091
Eval.ENTRY,
92+
ExternalRelation.ENTRY,
9193
Filter.ENTRY,
9294
Grok.ENTRY,
9395
InlineJoin.ENTRY,

x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plan/logical/ExternalRelation.java

Lines changed: 40 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -6,29 +6,35 @@
66
*/
77
package org.elasticsearch.xpack.esql.plan.logical;
88

9+
import org.elasticsearch.common.io.stream.NamedWriteableRegistry;
10+
import org.elasticsearch.common.io.stream.StreamInput;
911
import org.elasticsearch.common.io.stream.StreamOutput;
1012
import org.elasticsearch.xpack.esql.core.expression.Attribute;
1113
import org.elasticsearch.xpack.esql.core.tree.NodeInfo;
1214
import org.elasticsearch.xpack.esql.core.tree.NodeUtils;
1315
import org.elasticsearch.xpack.esql.core.tree.Source;
1416
import org.elasticsearch.xpack.esql.datasources.FileSet;
17+
import org.elasticsearch.xpack.esql.datasources.spi.SimpleSourceMetadata;
1518
import org.elasticsearch.xpack.esql.datasources.spi.SourceMetadata;
19+
import org.elasticsearch.xpack.esql.io.stream.PlanStreamInput;
1620
import org.elasticsearch.xpack.esql.plan.physical.ExternalSourceExec;
1721

22+
import java.io.IOException;
1823
import java.util.List;
24+
import java.util.Map;
1925
import java.util.Objects;
2026

2127
/**
2228
* Logical plan node for external data source relations (e.g., Iceberg table, Parquet file).
23-
* This plan node is executed on the coordinator only (no dispatch to data nodes).
2429
* <p>
25-
* Unlike EsRelation which wraps into FragmentExec for data node dispatch,
26-
* ExternalRelation maps directly to physical source operators via LocalMapper,
27-
* similar to how LocalRelation works.
30+
* Like {@link EsRelation}, the Mapper wraps this into a {@link org.elasticsearch.xpack.esql.plan.physical.FragmentExec}
31+
* so that pipeline breakers (Aggregate, Limit, TopN) above it are distributed to data nodes
32+
* via ExchangeExec. On data nodes, {@code localPlan()} expands the FragmentExec through
33+
* LocalMapper into {@link ExternalSourceExec}, enabling local optimizations such as
34+
* filter pushdown via FilterPushdownRegistry.
2835
* <p>
29-
* This class provides a source-agnostic logical plan node for external data sources.
30-
* It can represent any external source (Iceberg, Parquet, CSV, etc.) without requiring
31-
* source-specific subclasses in core ESQL code.
36+
* The {@link ExecutesOn.Coordinator} marker is retained for logical plan validation
37+
* (e.g., Enrich/Join hoist rules that inspect whether a relation executes on the coordinator).
3238
* <p>
3339
* The source-specific metadata is stored in the {@link SourceMetadata} interface, which
3440
* provides:
@@ -45,6 +51,12 @@
4551
*/
4652
public class ExternalRelation extends LeafPlan implements ExecutesOn.Coordinator {
4753

54+
public static final NamedWriteableRegistry.Entry ENTRY = new NamedWriteableRegistry.Entry(
55+
LogicalPlan.class,
56+
"ExternalRelation",
57+
ExternalRelation::readFrom
58+
);
59+
4860
private final String sourcePath;
4961
private final List<Attribute> output;
5062
private final SourceMetadata metadata;
@@ -71,14 +83,32 @@ public ExternalRelation(Source source, String sourcePath, SourceMetadata metadat
7183
this(source, sourcePath, metadata, output, FileSet.UNRESOLVED);
7284
}
7385

86+
private static ExternalRelation readFrom(StreamInput in) throws IOException {
87+
var source = Source.readFrom((PlanStreamInput) in);
88+
String sourcePath = in.readString();
89+
String sourceType = in.readString();
90+
var output = in.readNamedWriteableCollectionAsList(Attribute.class);
91+
@SuppressWarnings("unchecked")
92+
Map<String, Object> config = (Map<String, Object>) in.readGenericValue();
93+
@SuppressWarnings("unchecked")
94+
Map<String, Object> sourceMetadata = (Map<String, Object>) in.readGenericValue();
95+
var metadata = new SimpleSourceMetadata(output, sourceType, sourcePath, null, null, sourceMetadata, config);
96+
return new ExternalRelation(source, sourcePath, metadata, output, FileSet.UNRESOLVED);
97+
}
98+
7499
@Override
75-
public void writeTo(StreamOutput out) {
76-
throw new UnsupportedOperationException("ExternalRelation is not yet serializable for cross-cluster operations");
100+
public void writeTo(StreamOutput out) throws IOException {
101+
Source.EMPTY.writeTo(out);
102+
out.writeString(sourcePath);
103+
out.writeString(metadata.sourceType());
104+
out.writeNamedWriteableCollection(output);
105+
out.writeGenericValue(metadata.config());
106+
out.writeGenericValue(metadata.sourceMetadata());
77107
}
78108

79109
@Override
80110
public String getWriteableName() {
81-
throw new UnsupportedOperationException("ExternalRelation is not yet serializable for cross-cluster operations");
111+
return ENTRY.name;
82112
}
83113

84114
@Override

x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plan/physical/ExternalSourceExec.java

Lines changed: 9 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -19,7 +19,6 @@
1919
import org.elasticsearch.xpack.esql.datasources.spi.ExternalSplit;
2020
import org.elasticsearch.xpack.esql.datasources.spi.FormatReader;
2121
import org.elasticsearch.xpack.esql.io.stream.PlanStreamInput;
22-
import org.elasticsearch.xpack.esql.plan.logical.ExecutesOn;
2322

2423
import java.io.IOException;
2524
import java.util.List;
@@ -29,7 +28,7 @@
2928
/**
3029
* Generic physical plan node for reading from external data sources (e.g., Iceberg tables, Parquet files).
3130
* <p>
32-
* This is the unified physical plan node for all external sources, replacing source-specific nodes
31+
* This is the unified physical plan node for all external sources, replacing source-specific nodes.
3332
* It uses generic maps for configuration and metadata to avoid leaking
3433
* source-specific types (like S3Configuration) into core ESQL code.
3534
* <p>
@@ -40,13 +39,13 @@
4039
* <li><b>Opaque metadata</b>: Source-specific data (native schema, etc.) is stored in
4140
* {@link #sourceMetadata()} and passed through without core understanding it</li>
4241
* <li><b>Opaque pushed filter</b>: The {@link #pushedFilter()} is an opaque Object that only
43-
* the source-specific operator factory interprets. It is NOT serialized because external
44-
* sources execute on coordinator only ({@link ExecutesOn.Coordinator})</li>
45-
* <li><b>Coordinator-only execution</b>: External sources run entirely on the coordinator node,
46-
* so no cross-node serialization of source-specific data is needed</li>
42+
* the source-specific operator factory interprets. It is NOT serialized; it is created
43+
* locally on each data node by the LocalPhysicalPlanOptimizer via FilterPushdownRegistry</li>
44+
* <li><b>Data node execution</b>: Created on data nodes by LocalMapper from
45+
* {@link org.elasticsearch.xpack.esql.plan.logical.ExternalRelation} inside FragmentExec</li>
4746
* </ul>
4847
*/
49-
public class ExternalSourceExec extends LeafExec implements EstimatesRowSize, ExecutesOn.Coordinator {
48+
public class ExternalSourceExec extends LeafExec implements EstimatesRowSize {
5049

5150
public static final NamedWriteableRegistry.Entry ENTRY = new NamedWriteableRegistry.Entry(
5251
PhysicalPlan.class,
@@ -61,10 +60,10 @@ public class ExternalSourceExec extends LeafExec implements EstimatesRowSize, Ex
6160
private final List<Attribute> attributes;
6261
private final Map<String, Object> config;
6362
private final Map<String, Object> sourceMetadata;
64-
private final Object pushedFilter; // Opaque filter - NOT serialized (coordinator only)
65-
private final int pushedLimit; // NOT serialized (coordinator only)
63+
private final Object pushedFilter; // Opaque filter - NOT serialized, created locally on data nodes
64+
private final int pushedLimit; // NOT serialized, set locally on data nodes
6665
private final Integer estimatedRowSize;
67-
private final FileSet fileSet; // NOT serialized - coordinator only
66+
private final FileSet fileSet; // NOT serialized - resolved on coordinator, null on data nodes
6867
private final List<ExternalSplit> splits;
6968

7069
public ExternalSourceExec(

x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/planner/mapper/LocalMapper.java

Lines changed: 5 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,7 @@
1414
import org.elasticsearch.xpack.esql.plan.logical.Aggregate;
1515
import org.elasticsearch.xpack.esql.plan.logical.BinaryPlan;
1616
import org.elasticsearch.xpack.esql.plan.logical.EsRelation;
17+
import org.elasticsearch.xpack.esql.plan.logical.ExternalRelation;
1718
import org.elasticsearch.xpack.esql.plan.logical.Filter;
1819
import org.elasticsearch.xpack.esql.plan.logical.LeafPlan;
1920
import org.elasticsearch.xpack.esql.plan.logical.Limit;
@@ -71,8 +72,10 @@ private PhysicalPlan mapLeaf(LeafPlan leaf) {
7172
return new EsSourceExec(esRelation);
7273
}
7374

74-
// ExternalRelation is handled by MapperUtils.mapLeaf()
75-
// via its toPhysicalExec() method, bypassing FragmentExec/ExchangeExec dispatch
75+
if (leaf instanceof ExternalRelation external) {
76+
return external.toPhysicalExec();
77+
}
78+
7679
return MapperUtils.mapLeaf(leaf);
7780
}
7881

x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/planner/mapper/Mapper.java

Lines changed: 5 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,7 @@
1616
import org.elasticsearch.xpack.esql.plan.logical.BinaryPlan;
1717
import org.elasticsearch.xpack.esql.plan.logical.Enrich;
1818
import org.elasticsearch.xpack.esql.plan.logical.EsRelation;
19+
import org.elasticsearch.xpack.esql.plan.logical.ExternalRelation;
1920
import org.elasticsearch.xpack.esql.plan.logical.Filter;
2021
import org.elasticsearch.xpack.esql.plan.logical.Fork;
2122
import org.elasticsearch.xpack.esql.plan.logical.LeafPlan;
@@ -86,8 +87,10 @@ private PhysicalPlan mapLeaf(LeafPlan leaf) {
8687
return new FragmentExec(esRelation);
8788
}
8889

89-
// ExternalRelation is handled by MapperUtils.mapLeaf()
90-
// which calls toPhysicalExec() to create coordinator-only source operators
90+
if (leaf instanceof ExternalRelation external) {
91+
return new FragmentExec(external);
92+
}
93+
9194
return MapperUtils.mapLeaf(leaf);
9295
}
9396

x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/planner/mapper/MapperUtils.java

Lines changed: 0 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -16,7 +16,6 @@
1616
import org.elasticsearch.xpack.esql.plan.logical.Dissect;
1717
import org.elasticsearch.xpack.esql.plan.logical.Enrich;
1818
import org.elasticsearch.xpack.esql.plan.logical.Eval;
19-
import org.elasticsearch.xpack.esql.plan.logical.ExternalRelation;
2019
import org.elasticsearch.xpack.esql.plan.logical.Filter;
2120
import org.elasticsearch.xpack.esql.plan.logical.Grok;
2221
import org.elasticsearch.xpack.esql.plan.logical.LeafPlan;
@@ -70,12 +69,6 @@ static PhysicalPlan mapLeaf(LeafPlan p) {
7069
return new LocalSourceExec(local.source(), local.output(), local.supplier());
7170
}
7271

73-
// External data sources (Iceberg, Parquet, etc.)
74-
// These are executed on the coordinator only, bypassing FragmentExec/ExchangeExec dispatch
75-
if (p instanceof ExternalRelation external) {
76-
return external.toPhysicalExec();
77-
}
78-
7972
// Commands
8073
if (p instanceof ShowInfo showInfo) {
8174
return new ShowExec(showInfo.source(), showInfo.output(), showInfo.values());

x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plugin/AdaptiveStrategy.java

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -19,9 +19,9 @@
1919
/**
2020
* Adaptive distribution strategy for external sources.
2121
* <p>
22-
* Distributes when the plan contains aggregations and there are multiple splits,
23-
* or when the split count exceeds the number of eligible nodes.
24-
* Stays on the coordinator for single splits or LIMIT-only plans.
22+
* Distributes when the plan contains pipeline breakers (aggregations, TopN)
23+
* and there are multiple splits, or when the split count exceeds the number
24+
* of eligible nodes. Stays on the coordinator for single splits or LIMIT-only plans.
2525
*/
2626
public final class AdaptiveStrategy implements ExternalDistributionStrategy {
2727

@@ -56,10 +56,10 @@ public ExternalDistributionPlan planDistribution(ExternalDistributionContext con
5656
return ExternalDistributionPlan.LOCAL;
5757
}
5858

59-
boolean hasAggregation = plan.anyMatch(n -> n instanceof AggregateExec);
59+
boolean hasPipelineBreaker = plan.anyMatch(n -> n instanceof AggregateExec || n instanceof TopNExec);
6060
boolean manySplits = splits.size() > nodes.size();
6161

62-
if (hasAggregation || manySplits) {
62+
if (hasPipelineBreaker || manySplits) {
6363
boolean allHaveSize = true;
6464
for (ExternalSplit split : splits) {
6565
if (split.estimatedSizeInBytes() <= 0) {

0 commit comments

Comments
 (0)