Skip to content

Commit 1c14843

Browse files
authored
Make intermediate LocalRelation limit configurable (#135339)
This makes configurable the limit that the intermediate LocalRelation used in INLINE STATS execution can grow to. By default, this can grow up to .1% of the heap. Related #134455
1 parent b7b890f commit 1c14843

File tree

24 files changed

+106
-129
lines changed

24 files changed

+106
-129
lines changed

x-pack/plugin/esql/qa/server/single-node/src/javaRestTest/java/org/elasticsearch/xpack/esql/qa/single_node/EsqlSpecIT.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -16,7 +16,7 @@
1616
import org.elasticsearch.xcontent.XContentBuilder;
1717
import org.elasticsearch.xcontent.json.JsonXContent;
1818
import org.elasticsearch.xpack.esql.CsvSpecReader.CsvTestCase;
19-
import org.elasticsearch.xpack.esql.planner.PhysicalSettings;
19+
import org.elasticsearch.xpack.esql.planner.PlannerSettings;
2020
import org.elasticsearch.xpack.esql.plugin.ComputeService;
2121
import org.elasticsearch.xpack.esql.qa.rest.EsqlSpecTestCase;
2222
import org.junit.Before;
@@ -56,7 +56,7 @@ public void configureChunks() throws IOException {
5656
boolean smallChunks = randomBoolean();
5757
Request request = new Request("PUT", "/_cluster/settings");
5858
XContentBuilder builder = JsonXContent.contentBuilder().startObject().startObject("persistent");
59-
builder.field(PhysicalSettings.VALUES_LOADING_JUMBO_SIZE.getKey(), smallChunks ? "1kb" : null);
59+
builder.field(PlannerSettings.VALUES_LOADING_JUMBO_SIZE.getKey(), smallChunks ? "1kb" : null);
6060
request.setJsonEntity(Strings.toString(builder.endObject().endObject()));
6161
assertOK(client().performRequest(request));
6262
}

x-pack/plugin/esql/qa/server/single-node/src/javaRestTest/java/org/elasticsearch/xpack/esql/qa/single_node/RestEsqlIT.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -59,7 +59,7 @@
5959
import static org.elasticsearch.test.MapMatcher.assertMap;
6060
import static org.elasticsearch.test.MapMatcher.matchesMap;
6161
import static org.elasticsearch.xpack.esql.core.type.DataType.isMillisOrNanos;
62-
import static org.elasticsearch.xpack.esql.planner.PhysicalSettings.LUCENE_TOPN_LIMIT;
62+
import static org.elasticsearch.xpack.esql.planner.PlannerSettings.LUCENE_TOPN_LIMIT;
6363
import static org.elasticsearch.xpack.esql.qa.rest.RestEsqlTestCase.Mode.SYNC;
6464
import static org.elasticsearch.xpack.esql.tools.ProfileParser.parseProfile;
6565
import static org.elasticsearch.xpack.esql.tools.ProfileParser.readProfileFromResponse;

x-pack/plugin/esql/qa/testFixtures/src/main/java/org/elasticsearch/xpack/esql/EsqlTestUtils.java

Lines changed: 12 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,7 @@
2424
import org.elasticsearch.common.lucene.BytesRefs;
2525
import org.elasticsearch.common.regex.Regex;
2626
import org.elasticsearch.common.settings.Settings;
27+
import org.elasticsearch.common.unit.ByteSizeValue;
2728
import org.elasticsearch.common.util.BigArrays;
2829
import org.elasticsearch.common.util.concurrent.EsExecutors;
2930
import org.elasticsearch.common.xcontent.XContentHelper;
@@ -35,6 +36,7 @@
3536
import org.elasticsearch.compute.data.DoubleBlock;
3637
import org.elasticsearch.compute.data.IntBlock;
3738
import org.elasticsearch.compute.data.LongBlock;
39+
import org.elasticsearch.compute.lucene.DataPartitioning;
3840
import org.elasticsearch.core.PathUtils;
3941
import org.elasticsearch.core.SuppressForbidden;
4042
import org.elasticsearch.core.Tuple;
@@ -95,6 +97,7 @@
9597
import org.elasticsearch.xpack.esql.plan.logical.local.EmptyLocalSupplier;
9698
import org.elasticsearch.xpack.esql.plan.logical.local.LocalRelation;
9799
import org.elasticsearch.xpack.esql.plan.logical.local.LocalSupplier;
100+
import org.elasticsearch.xpack.esql.planner.PlannerSettings;
98101
import org.elasticsearch.xpack.esql.planner.PlannerUtils;
99102
import org.elasticsearch.xpack.esql.plugin.EsqlPlugin;
100103
import org.elasticsearch.xpack.esql.plugin.QueryPragmas;
@@ -424,6 +427,13 @@ public static LogicalOptimizerContext unboundLogicalOptimizerContext() {
424427

425428
public static final Verifier TEST_VERIFIER = new Verifier(new Metrics(new EsqlFunctionRegistry()), new XPackLicenseState(() -> 0L));
426429

430+
public static final PlannerSettings TEST_PLANNER_SETTINGS = new PlannerSettings(
431+
DataPartitioning.AUTO,
432+
ByteSizeValue.ofMb(1),
433+
10_000,
434+
ByteSizeValue.ofMb(1)
435+
);
436+
427437
public static final TransportActionServices MOCK_TRANSPORT_ACTION_SERVICES = new TransportActionServices(
428438
createMockTransportService(),
429439
mock(SearchService.class),
@@ -433,7 +443,8 @@ public static LogicalOptimizerContext unboundLogicalOptimizerContext() {
433443
mock(IndexNameExpressionResolver.class),
434444
null,
435445
new InferenceService(mock(Client.class)),
436-
new BlockFactoryProvider(PlannerUtils.NON_BREAKING_BLOCK_FACTORY)
446+
new BlockFactoryProvider(PlannerUtils.NON_BREAKING_BLOCK_FACTORY),
447+
TEST_PLANNER_SETTINGS
437448
);
438449

439450
private static TransportService createMockTransportService() {

x-pack/plugin/esql/src/internalClusterTest/java/org/elasticsearch/xpack/esql/action/LookupFromIndexIT.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -71,7 +71,7 @@
7171
import org.elasticsearch.xpack.esql.plan.physical.FragmentExec;
7272
import org.elasticsearch.xpack.esql.plan.physical.PhysicalPlan;
7373
import org.elasticsearch.xpack.esql.planner.EsPhysicalOperationProviders;
74-
import org.elasticsearch.xpack.esql.planner.PhysicalSettings;
74+
import org.elasticsearch.xpack.esql.planner.PlannerSettings;
7575
import org.elasticsearch.xpack.esql.planner.PlannerUtils;
7676
import org.elasticsearch.xpack.esql.plugin.EsqlPlugin;
7777
import org.elasticsearch.xpack.esql.plugin.QueryPragmas;
@@ -358,7 +358,7 @@ private void runLookup(List<DataType> keyTypes, PopulateIndices populateIndices,
358358
);
359359
}
360360
ValuesSourceReaderOperator.Factory reader = new ValuesSourceReaderOperator.Factory(
361-
PhysicalSettings.VALUES_LOADING_JUMBO_SIZE.getDefault(Settings.EMPTY),
361+
PlannerSettings.VALUES_LOADING_JUMBO_SIZE.getDefault(Settings.EMPTY),
362362
fieldInfos,
363363
List.of(new ValuesSourceReaderOperator.ShardContext(searchContext.getSearchExecutionContext().getIndexReader(), () -> {
364364
throw new IllegalStateException("can't load source here");

x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/optimizer/LocalPhysicalOptimizerContext.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -8,13 +8,13 @@
88
package org.elasticsearch.xpack.esql.optimizer;
99

1010
import org.elasticsearch.xpack.esql.core.expression.FoldContext;
11-
import org.elasticsearch.xpack.esql.planner.PhysicalSettings;
11+
import org.elasticsearch.xpack.esql.planner.PlannerSettings;
1212
import org.elasticsearch.xpack.esql.plugin.EsqlFlags;
1313
import org.elasticsearch.xpack.esql.session.Configuration;
1414
import org.elasticsearch.xpack.esql.stats.SearchStats;
1515

1616
public record LocalPhysicalOptimizerContext(
17-
PhysicalSettings physicalSettings,
17+
PlannerSettings plannerSettings,
1818
EsqlFlags flags,
1919
Configuration configuration,
2020
FoldContext foldCtx,

x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/optimizer/rules/physical/local/PushTopNToSource.java

Lines changed: 7 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -29,7 +29,7 @@
2929
import org.elasticsearch.xpack.esql.plan.physical.EvalExec;
3030
import org.elasticsearch.xpack.esql.plan.physical.PhysicalPlan;
3131
import org.elasticsearch.xpack.esql.plan.physical.TopNExec;
32-
import org.elasticsearch.xpack.esql.planner.PhysicalSettings;
32+
import org.elasticsearch.xpack.esql.planner.PlannerSettings;
3333

3434
import java.util.ArrayList;
3535
import java.util.LinkedHashMap;
@@ -66,7 +66,7 @@ public class PushTopNToSource extends PhysicalOptimizerRules.ParameterizedOptimi
6666
@Override
6767
protected PhysicalPlan rule(TopNExec topNExec, LocalPhysicalOptimizerContext ctx) {
6868
Pushable pushable = evaluatePushable(
69-
ctx.physicalSettings(),
69+
ctx.plannerSettings(),
7070
ctx.foldCtx(),
7171
topNExec,
7272
LucenePushdownPredicates.from(ctx.searchStats(), ctx.flags())
@@ -132,7 +132,7 @@ public PhysicalPlan rewrite(TopNExec topNExec) {
132132
}
133133

134134
private static Pushable evaluatePushable(
135-
PhysicalSettings physicalSettings,
135+
PlannerSettings plannerSettings,
136136
FoldContext ctx,
137137
TopNExec topNExec,
138138
LucenePushdownPredicates lucenePushdownPredicates
@@ -141,14 +141,14 @@ private static Pushable evaluatePushable(
141141
if (child instanceof EsQueryExec queryExec
142142
&& queryExec.canPushSorts()
143143
&& canPushDownOrders(topNExec.order(), lucenePushdownPredicates)
144-
&& canPushLimit(topNExec, physicalSettings)) {
144+
&& canPushLimit(topNExec, plannerSettings)) {
145145
// With the simplest case of `FROM index | SORT ...` we only allow pushing down if the sort is on a field
146146
return new PushableQueryExec(queryExec);
147147
}
148148
if (child instanceof EvalExec evalExec
149149
&& evalExec.child() instanceof EsQueryExec queryExec
150150
&& queryExec.canPushSorts()
151-
&& canPushLimit(topNExec, physicalSettings)) {
151+
&& canPushLimit(topNExec, plannerSettings)) {
152152
// When we have an EVAL between the FROM and the SORT, we consider pushing down if the sort is on a field and/or
153153
// a distance function defined in the EVAL. We also move the EVAL to after the SORT.
154154
List<Order> orders = topNExec.order();
@@ -220,8 +220,8 @@ private static boolean canPushDownOrders(List<Order> orders, LucenePushdownPredi
220220
return orders.stream().allMatch(o -> isSortableAttribute.apply(o.child(), lucenePushdownPredicates));
221221
}
222222

223-
private static boolean canPushLimit(TopNExec topn, PhysicalSettings physicalSettings) {
224-
return Foldables.limitValue(topn.limit(), topn.sourceText()) <= physicalSettings.luceneTopNLimit();
223+
private static boolean canPushLimit(TopNExec topn, PlannerSettings plannerSettings) {
224+
return Foldables.limitValue(topn.limit(), topn.sourceText()) <= plannerSettings.luceneTopNLimit();
225225
}
226226

227227
private static List<EsQueryExec.Sort> buildFieldSorts(List<Order> orders) {

x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/planner/EsPhysicalOperationProviders.java

Lines changed: 7 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -144,17 +144,17 @@ public boolean hasReferences() {
144144
}
145145

146146
private final List<ShardContext> shardContexts;
147-
private final PhysicalSettings physicalSettings;
147+
private final PlannerSettings plannerSettings;
148148

149149
public EsPhysicalOperationProviders(
150150
FoldContext foldContext,
151151
List<ShardContext> shardContexts,
152152
AnalysisRegistry analysisRegistry,
153-
PhysicalSettings physicalSettings
153+
PlannerSettings plannerSettings
154154
) {
155155
super(foldContext, analysisRegistry);
156156
this.shardContexts = shardContexts;
157-
this.physicalSettings = physicalSettings;
157+
this.plannerSettings = plannerSettings;
158158
}
159159

160160
@Override
@@ -176,7 +176,7 @@ public final PhysicalOperation fieldExtractPhysicalOperation(FieldExtractExec fi
176176
}
177177
var fields = extractFields(fieldExtractExec);
178178
return source.with(
179-
new ValuesSourceReaderOperator.Factory(physicalSettings.valuesLoadingJumboSize(), fields, readers, docChannel),
179+
new ValuesSourceReaderOperator.Factory(plannerSettings.valuesLoadingJumboSize(), fields, readers, docChannel),
180180
layout.build()
181181
);
182182
}
@@ -299,7 +299,7 @@ public final PhysicalOperation sourcePhysicalOperation(EsQueryExec esQueryExec,
299299
luceneFactory = new LuceneTopNSourceOperator.Factory(
300300
shardContexts,
301301
querySupplier(esQueryExec.query()),
302-
context.queryPragmas().dataPartitioning(physicalSettings.defaultDataPartitioning()),
302+
context.queryPragmas().dataPartitioning(plannerSettings.defaultDataPartitioning()),
303303
context.queryPragmas().taskConcurrency(),
304304
context.pageSize(rowEstimatedSize),
305305
limit,
@@ -319,7 +319,7 @@ public final PhysicalOperation sourcePhysicalOperation(EsQueryExec esQueryExec,
319319
luceneFactory = new LuceneSourceOperator.Factory(
320320
shardContexts,
321321
querySupplier(esQueryExec.queryBuilderAndTags()),
322-
context.queryPragmas().dataPartitioning(physicalSettings.defaultDataPartitioning()),
322+
context.queryPragmas().dataPartitioning(plannerSettings.defaultDataPartitioning()),
323323
context.autoPartitioningStrategy().get(),
324324
context.queryPragmas().taskConcurrency(),
325325
context.pageSize(rowEstimatedSize),
@@ -388,7 +388,7 @@ public LuceneCountOperator.Factory countSource(LocalExecutionPlannerContext cont
388388
return new LuceneCountOperator.Factory(
389389
shardContexts,
390390
querySupplier(queryBuilder),
391-
context.queryPragmas().dataPartitioning(physicalSettings.defaultDataPartitioning()),
391+
context.queryPragmas().dataPartitioning(plannerSettings.defaultDataPartitioning()),
392392
context.queryPragmas().taskConcurrency(),
393393
List.of(),
394394
limit == null ? NO_LIMIT : (Integer) limit.fold(context.foldCtx())
Lines changed: 26 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -19,7 +19,7 @@
1919
/**
2020
* Values for cluster level settings used in physical planning.
2121
*/
22-
public class PhysicalSettings {
22+
public class PlannerSettings {
2323
public static final Setting<DataPartitioning> DEFAULT_DATA_PARTITIONING = Setting.enumSetting(
2424
DataPartitioning.class,
2525
"esql.default_data_partitioning",
@@ -45,26 +45,42 @@ public class PhysicalSettings {
4545
Setting.Property.Dynamic
4646
);
4747

48+
public static final Setting<ByteSizeValue> INTERMEDIATE_LOCAL_RELATION_MAX_SIZE = Setting.memorySizeSetting(
49+
"esql.intermediate_local_relation_max_size",
50+
"0.1%",
51+
Setting.Property.NodeScope,
52+
Setting.Property.Dynamic
53+
);
54+
4855
private volatile DataPartitioning defaultDataPartitioning;
4956
private volatile ByteSizeValue valuesLoadingJumboSize;
5057
private volatile int luceneTopNLimit;
58+
private volatile ByteSizeValue intermediateLocalRelationMaxSize;
5159

5260
/**
5361
* Ctor for prod that listens for updates from the {@link ClusterService}.
5462
*/
55-
public PhysicalSettings(ClusterService clusterService) {
56-
clusterService.getClusterSettings().initializeAndWatch(DEFAULT_DATA_PARTITIONING, v -> this.defaultDataPartitioning = v);
57-
clusterService.getClusterSettings().initializeAndWatch(VALUES_LOADING_JUMBO_SIZE, v -> this.valuesLoadingJumboSize = v);
58-
clusterService.getClusterSettings().initializeAndWatch(LUCENE_TOPN_LIMIT, v -> this.luceneTopNLimit = v);
63+
public PlannerSettings(ClusterService clusterService) {
64+
var clusterSettings = clusterService.getClusterSettings();
65+
clusterSettings.initializeAndWatch(DEFAULT_DATA_PARTITIONING, v -> this.defaultDataPartitioning = v);
66+
clusterSettings.initializeAndWatch(VALUES_LOADING_JUMBO_SIZE, v -> this.valuesLoadingJumboSize = v);
67+
clusterSettings.initializeAndWatch(LUCENE_TOPN_LIMIT, v -> this.luceneTopNLimit = v);
68+
clusterSettings.initializeAndWatch(INTERMEDIATE_LOCAL_RELATION_MAX_SIZE, v -> this.intermediateLocalRelationMaxSize = v);
5969
}
6070

6171
/**
6272
* Ctor for testing.
6373
*/
64-
public PhysicalSettings(DataPartitioning defaultDataPartitioning, ByteSizeValue valuesLoadingJumboSize, int luceneTopNLimit) {
74+
public PlannerSettings(
75+
DataPartitioning defaultDataPartitioning,
76+
ByteSizeValue valuesLoadingJumboSize,
77+
int luceneTopNLimit,
78+
ByteSizeValue intermediateLocalRelationMaxSize
79+
) {
6580
this.defaultDataPartitioning = defaultDataPartitioning;
6681
this.valuesLoadingJumboSize = valuesLoadingJumboSize;
6782
this.luceneTopNLimit = luceneTopNLimit;
83+
this.intermediateLocalRelationMaxSize = intermediateLocalRelationMaxSize;
6884
}
6985

7086
public DataPartitioning defaultDataPartitioning() {
@@ -92,4 +108,8 @@ public ByteSizeValue valuesLoadingJumboSize() {
92108
public int luceneTopNLimit() {
93109
return luceneTopNLimit;
94110
}
111+
112+
public ByteSizeValue intermediateLocalRelationMaxSize() {
113+
return intermediateLocalRelationMaxSize;
114+
}
95115
}

x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/planner/PlannerUtils.java

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -180,18 +180,18 @@ private static void forEachRelation(PhysicalPlan plan, Consumer<EsRelation> acti
180180
}
181181

182182
public static PhysicalPlan localPlan(
183-
PhysicalSettings physicalSettings,
183+
PlannerSettings plannerSettings,
184184
EsqlFlags flags,
185185
List<SearchExecutionContext> searchContexts,
186186
Configuration configuration,
187187
FoldContext foldCtx,
188188
PhysicalPlan plan
189189
) {
190-
return localPlan(physicalSettings, flags, configuration, foldCtx, plan, SearchContextStats.from(searchContexts));
190+
return localPlan(plannerSettings, flags, configuration, foldCtx, plan, SearchContextStats.from(searchContexts));
191191
}
192192

193193
public static PhysicalPlan localPlan(
194-
PhysicalSettings physicalSettings,
194+
PlannerSettings plannerSettings,
195195
EsqlFlags flags,
196196
Configuration configuration,
197197
FoldContext foldCtx,
@@ -200,7 +200,7 @@ public static PhysicalPlan localPlan(
200200
) {
201201
final var logicalOptimizer = new LocalLogicalPlanOptimizer(new LocalLogicalOptimizerContext(configuration, foldCtx, searchStats));
202202
var physicalOptimizer = new LocalPhysicalPlanOptimizer(
203-
new LocalPhysicalOptimizerContext(physicalSettings, flags, configuration, foldCtx, searchStats)
203+
new LocalPhysicalOptimizerContext(plannerSettings, flags, configuration, foldCtx, searchStats)
204204
);
205205

206206
return localPlan(plan, logicalOptimizer, physicalOptimizer);

x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plugin/ComputeService.java

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -61,7 +61,7 @@
6161
import org.elasticsearch.xpack.esql.plan.physical.PhysicalPlan;
6262
import org.elasticsearch.xpack.esql.planner.EsPhysicalOperationProviders;
6363
import org.elasticsearch.xpack.esql.planner.LocalExecutionPlanner;
64-
import org.elasticsearch.xpack.esql.planner.PhysicalSettings;
64+
import org.elasticsearch.xpack.esql.planner.PlannerSettings;
6565
import org.elasticsearch.xpack.esql.planner.PlannerUtils;
6666
import org.elasticsearch.xpack.esql.session.Configuration;
6767
import org.elasticsearch.xpack.esql.session.EsqlCCSUtils;
@@ -140,7 +140,7 @@ public class ComputeService {
140140
private final DataNodeComputeHandler dataNodeComputeHandler;
141141
private final ClusterComputeHandler clusterComputeHandler;
142142
private final ExchangeService exchangeService;
143-
private final PhysicalSettings physicalSettings;
143+
private final PlannerSettings plannerSettings;
144144

145145
@SuppressWarnings("this-escape")
146146
public ComputeService(
@@ -179,7 +179,7 @@ public ComputeService(
179179
esqlExecutor,
180180
dataNodeComputeHandler
181181
);
182-
this.physicalSettings = new PhysicalSettings(clusterService);
182+
this.plannerSettings = transportActionServices.plannerSettings();
183183
}
184184

185185
public void execute(
@@ -615,7 +615,7 @@ public SourceProvider createSourceProvider(SourceFilter sourceFilter) {
615615
context.foldCtx(),
616616
contexts,
617617
searchService.getIndicesService().getAnalysis(),
618-
physicalSettings
618+
plannerSettings
619619
);
620620
try {
621621
LocalExecutionPlanner planner = new LocalExecutionPlanner(
@@ -638,7 +638,7 @@ public SourceProvider createSourceProvider(SourceFilter sourceFilter) {
638638
LOGGER.debug("Received physical plan:\n{}", plan);
639639

640640
var localPlan = PlannerUtils.localPlan(
641-
physicalSettings,
641+
plannerSettings,
642642
context.flags(),
643643
context.searchExecutionContexts(),
644644
context.configuration(),

0 commit comments

Comments
 (0)