Skip to content

Commit 0955a82

Browse files
authored
ES|QL - Avoid retrieving unnecessary fields on node-reduce phase (#137920)
1 parent 26a95cb commit 0955a82

File tree

3 files changed

+84
-13
lines changed

3 files changed

+84
-13
lines changed

docs/changelog/137920.yaml

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,6 @@
1+
pr: 137920
2+
summary: Avoid retrieving unnecessary fields on node-reduce phase
3+
area: "ES|QL"
4+
type: enhancement
5+
issues:
6+
- 134363

x-pack/plugin/esql/src/internalClusterTest/java/org/elasticsearch/xpack/esql/action/EsqlReductionLateMaterializationTestCase.java

Lines changed: 64 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -26,14 +26,14 @@
2626

2727
import java.util.ArrayList;
2828
import java.util.Collection;
29+
import java.util.Set;
2930
import java.util.concurrent.TimeUnit;
3031
import java.util.stream.IntStream;
3132

3233
import static org.elasticsearch.xpack.esql.EsqlTestUtils.singleValue;
3334
import static org.elasticsearch.xpack.esql.action.EsqlQueryRequest.syncEsqlQueryRequest;
3435
import static org.hamcrest.Matchers.equalTo;
3536
import static org.hamcrest.Matchers.greaterThan;
36-
import static org.hamcrest.Matchers.hasSize;
3737

3838
/**
3939
* Verifies that the {@link org.elasticsearch.compute.lucene.read.ValuesSourceReaderOperator}} is optimized into the reduce driver instead
@@ -76,12 +76,17 @@ public void setupIndex() throws Exception {
7676
mapping.startObject("sorted").field("type", "long").endObject();
7777
mapping.startObject("filtered").field("type", "long").endObject();
7878
mapping.startObject("read").field("type", "long").endObject();
79+
mapping.startObject("more").field("type", "long").endObject();
80+
mapping.startObject("some_more").field("type", "long").endObject();
7981
}
8082
mapping.endObject();
8183
client().admin().indices().prepareCreate("test").setSettings(indexSettings(10, 0)).setMapping(mapping.endObject()).get();
8284

8385
var builders = IntStream.range(0, 1024)
84-
.mapToObj(i -> prepareIndex("test").setId(Integer.toString(i)).setSource("read", i, "sorted", i * 2, "filtered", i * 3))
86+
.mapToObj(
87+
i -> prepareIndex("test").setId(Integer.toString(i))
88+
.setSource("read", i, "sorted", i * 2, "filtered", i * 3, "more", i * 4, "some_more", i * 5)
89+
)
8590
.toList();
8691
indexRandom(true, builders);
8792
}
@@ -101,20 +106,64 @@ protected Settings nodeSettings(int nodeOrdinal, Settings otherSettings) {
101106
}
102107

103108
public void testNoPushdowns() throws Exception {
104-
testLateMaterializationAfterReduceTopN("from test | sort sorted + 1 desc | limit 3 | stats sum(read)");
109+
testLateMaterializationAfterReduceTopN(
110+
"from test | sort sorted + 1 desc | limit 3 | stats sum(read)",
111+
Set.of("sorted"),
112+
Set.of("read")
113+
);
105114
}
106115

107116
public void testPushdownTopN() throws Exception {
108-
testLateMaterializationAfterReduceTopN("from test | sort sorted desc | limit 3 | stats sum(read)");
117+
testLateMaterializationAfterReduceTopN(
118+
"from test | sort sorted desc | limit 3 | stats sum(read)",
119+
Set.of("sorted"),
120+
Set.of("read")
121+
);
109122
}
110123

111-
private void testLateMaterializationAfterReduceTopN(String query) throws Exception {
124+
public void testPushdownTopNMultipleSortedFields() throws Exception {
125+
testLateMaterializationAfterReduceTopN(
126+
"from test | sort sorted desc, more asc | limit 3 | stats sum(read)",
127+
Set.of("sorted", "more"),
128+
Set.of("read")
129+
);
130+
}
131+
132+
public void testPushdownTopNMultipleRetrievedFields() throws Exception {
133+
testLateMaterializationAfterReduceTopN(
134+
"from test | sort sorted desc, more asc | limit 3 | stats x = sum(read), y = max(some_more)",
135+
Set.of("sorted", "more"),
136+
Set.of("read", "some_more")
137+
);
138+
}
139+
140+
public void testPushdownTopFilterOnNonProjected() throws Exception {
141+
testLateMaterializationAfterReduceTopN(
142+
"from test | where filtered > 0 | sort sorted desc | limit 3 | stats sum(read)",
143+
Set.of("sorted"),
144+
Set.of("read")
145+
);
146+
}
147+
148+
public void testPushdownTopFilterOnProjected() throws Exception {
149+
testLateMaterializationAfterReduceTopN(
150+
"from test | sort sorted desc | limit 3 | where filtered > 0 | stats sum(read)",
151+
Set.of("sorted"),
152+
Set.of("read", "filtered")
153+
);
154+
}
155+
156+
private void testLateMaterializationAfterReduceTopN(
157+
String query,
158+
Set<String> expectedDataLoadedFields,
159+
Set<String> expectedNodeReduceFields
160+
) throws Exception {
112161
setupIndex();
113162
try (var result = sendQuery(query)) {
114163
assertThat(result.isRunning(), equalTo(false));
115164
assertThat(result.isPartial(), equalTo(false));
116-
assertSingleKeyFieldExtracted(result, "data");
117-
assertSingleKeyFieldExtracted(result, "node_reduce");
165+
assertSingleKeyFieldExtracted(result, "data", expectedDataLoadedFields);
166+
assertSingleKeyFieldExtracted(result, "node_reduce", expectedNodeReduceFields);
118167
var page = singleValue(result.pages());
119168
assertThat(page.getPositionCount(), equalTo(1));
120169
LongVectorBlock block = page.getBlock(0);
@@ -123,7 +172,7 @@ private void testLateMaterializationAfterReduceTopN(String query) throws Excepti
123172
}
124173
}
125174

126-
private static void assertSingleKeyFieldExtracted(EsqlQueryResponse response, String driverName) {
175+
private static void assertSingleKeyFieldExtracted(EsqlQueryResponse response, String driverName, Set<String> expectedLoadedFields) {
127176
long totalValuesLoader = 0;
128177
for (var driverProfile : response.profile().drivers().stream().filter(d -> d.description().equals(driverName)).toList()) {
129178
OperatorStatus operatorStatus = singleValue(
@@ -140,7 +189,13 @@ private static void assertSingleKeyFieldExtracted(EsqlQueryResponse response, St
140189
// This can happen if the indexRandom created dummy documents which led to empty segments.
141190
continue;
142191
}
143-
assertThat("A single reader should have been built", status.readersBuilt().keySet(), hasSize(1));
192+
assertThat(status.readersBuilt().size(), equalTo(expectedLoadedFields.size()));
193+
for (String field : status.readersBuilt().keySet()) {
194+
assertTrue(
195+
"Field " + field + " was not expected to be loaded in driver " + driverName,
196+
expectedLoadedFields.stream().anyMatch(field::contains)
197+
);
198+
}
144199
}
145200
assertThat("Values should have been loaded", totalValuesLoader, greaterThan(0L));
146201
}

x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plugin/LateMaterializationPlanner.java

Lines changed: 14 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,7 @@
99

1010
import org.elasticsearch.index.IndexMode;
1111
import org.elasticsearch.xpack.esql.core.expression.Attribute;
12+
import org.elasticsearch.xpack.esql.core.expression.AttributeSet;
1213
import org.elasticsearch.xpack.esql.core.expression.FieldAttribute;
1314
import org.elasticsearch.xpack.esql.core.tree.Source;
1415
import org.elasticsearch.xpack.esql.core.util.CollectionUtils;
@@ -30,6 +31,7 @@
3031
import org.elasticsearch.xpack.esql.planner.mapper.LocalMapper;
3132
import org.elasticsearch.xpack.esql.stats.SearchStats;
3233

34+
import java.util.ArrayList;
3335
import java.util.List;
3436
import java.util.Optional;
3537
import java.util.function.Function;
@@ -95,8 +97,9 @@ public static Optional<ReductionPlan> planReduceDriverTopN(
9597
}
9698

9799
LocalPhysicalOptimizerContext context = contextFactory.apply(SEARCH_STATS_TOP_N_REPLACEMENT);
98-
List<Attribute> expectedDataOutput = toPhysical(topN, context).output();
99-
Attribute doc = expectedDataOutput.stream().filter(EsQueryExec::isDocAttribute).findFirst().orElse(null);
100+
101+
List<Attribute> physicalPlanOutput = toPhysical(topN, context).output();
102+
Attribute doc = physicalPlanOutput.stream().filter(EsQueryExec::isDocAttribute).findFirst().orElse(null);
100103
if (doc == null) {
101104
return Optional.empty();
102105
}
@@ -114,8 +117,15 @@ public static Optional<ReductionPlan> planReduceDriverTopN(
114117
return Optional.empty();
115118
}
116119

117-
// We need to add the doc attribute to the project since otherwise when the fragment is converted to a physical plan for the data
118-
// driver, the resulting ProjectExec won't have the doc attribute in its output, which is needed by the reduce driver.
120+
AttributeSet orderRefsSet = AttributeSet.of(topN.order().stream().flatMap(o -> o.references().stream()).toList());
121+
// Get the output from the physical plan below the TopN, and filter it to only the attributes needed for the final output (either
122+
// because they are in the top-level Project's output, or because they are needed for ordering)
123+
List<Attribute> expectedDataOutput = new ArrayList<>();
124+
for (Attribute a : physicalPlanOutput) {
125+
if (topLevelProject.outputSet().contains(a) || orderRefsSet.contains(a) || EsQueryExec.isDocAttribute(a)) {
126+
expectedDataOutput.add(a);
127+
}
128+
}
119129
var updatedFragment = new Project(Source.EMPTY, withAddedDocToRelation, expectedDataOutput);
120130
FragmentExec updatedFragmentExec = fragmentExec.withFragment(updatedFragment);
121131
ExchangeSinkExec updatedDataPlan = originalPlan.replaceChild(updatedFragmentExec);

0 commit comments

Comments
 (0)