diff --git a/x-pack/plugin/esql/qa/server/single-node/src/javaRestTest/java/org/elasticsearch/xpack/esql/qa/single_node/EsqlSpecIT.java b/x-pack/plugin/esql/qa/server/single-node/src/javaRestTest/java/org/elasticsearch/xpack/esql/qa/single_node/EsqlSpecIT.java index f4ec3f099e068..a21c416ba8741 100644 --- a/x-pack/plugin/esql/qa/server/single-node/src/javaRestTest/java/org/elasticsearch/xpack/esql/qa/single_node/EsqlSpecIT.java +++ b/x-pack/plugin/esql/qa/server/single-node/src/javaRestTest/java/org/elasticsearch/xpack/esql/qa/single_node/EsqlSpecIT.java @@ -16,7 +16,7 @@ import org.elasticsearch.xcontent.XContentBuilder; import org.elasticsearch.xcontent.json.JsonXContent; import org.elasticsearch.xpack.esql.CsvSpecReader.CsvTestCase; -import org.elasticsearch.xpack.esql.planner.PhysicalSettings; +import org.elasticsearch.xpack.esql.planner.PlannerSettings; import org.elasticsearch.xpack.esql.plugin.ComputeService; import org.elasticsearch.xpack.esql.qa.rest.EsqlSpecTestCase; import org.junit.Before; @@ -56,7 +56,7 @@ public void configureChunks() throws IOException { boolean smallChunks = randomBoolean(); Request request = new Request("PUT", "/_cluster/settings"); XContentBuilder builder = JsonXContent.contentBuilder().startObject().startObject("persistent"); - builder.field(PhysicalSettings.VALUES_LOADING_JUMBO_SIZE.getKey(), smallChunks ? "1kb" : null); + builder.field(PlannerSettings.VALUES_LOADING_JUMBO_SIZE.getKey(), smallChunks ? "1kb" : null); request.setJsonEntity(Strings.toString(builder.endObject().endObject())); assertOK(client().performRequest(request)); } diff --git a/x-pack/plugin/esql/qa/server/single-node/src/javaRestTest/java/org/elasticsearch/xpack/esql/qa/single_node/RestEsqlIT.java b/x-pack/plugin/esql/qa/server/single-node/src/javaRestTest/java/org/elasticsearch/xpack/esql/qa/single_node/RestEsqlIT.java index 419ccced11e5b..3d47b5735f51a 100644 --- a/x-pack/plugin/esql/qa/server/single-node/src/javaRestTest/java/org/elasticsearch/xpack/esql/qa/single_node/RestEsqlIT.java +++ b/x-pack/plugin/esql/qa/server/single-node/src/javaRestTest/java/org/elasticsearch/xpack/esql/qa/single_node/RestEsqlIT.java @@ -59,7 +59,7 @@ import static org.elasticsearch.test.MapMatcher.assertMap; import static org.elasticsearch.test.MapMatcher.matchesMap; import static org.elasticsearch.xpack.esql.core.type.DataType.isMillisOrNanos; -import static org.elasticsearch.xpack.esql.planner.PhysicalSettings.LUCENE_TOPN_LIMIT; +import static org.elasticsearch.xpack.esql.planner.PlannerSettings.LUCENE_TOPN_LIMIT; import static org.elasticsearch.xpack.esql.qa.rest.RestEsqlTestCase.Mode.SYNC; import static org.elasticsearch.xpack.esql.tools.ProfileParser.parseProfile; import static org.elasticsearch.xpack.esql.tools.ProfileParser.readProfileFromResponse; diff --git a/x-pack/plugin/esql/qa/testFixtures/src/main/java/org/elasticsearch/xpack/esql/EsqlTestUtils.java b/x-pack/plugin/esql/qa/testFixtures/src/main/java/org/elasticsearch/xpack/esql/EsqlTestUtils.java index d89c65a620a98..64fe2a1a409e9 100644 --- a/x-pack/plugin/esql/qa/testFixtures/src/main/java/org/elasticsearch/xpack/esql/EsqlTestUtils.java +++ b/x-pack/plugin/esql/qa/testFixtures/src/main/java/org/elasticsearch/xpack/esql/EsqlTestUtils.java @@ -24,6 +24,7 @@ import org.elasticsearch.common.lucene.BytesRefs; import org.elasticsearch.common.regex.Regex; import org.elasticsearch.common.settings.Settings; +import org.elasticsearch.common.unit.ByteSizeValue; import org.elasticsearch.common.util.BigArrays; import org.elasticsearch.common.util.concurrent.EsExecutors; import org.elasticsearch.common.xcontent.XContentHelper; @@ -35,6 +36,7 @@ import org.elasticsearch.compute.data.DoubleBlock; import org.elasticsearch.compute.data.IntBlock; import org.elasticsearch.compute.data.LongBlock; +import org.elasticsearch.compute.lucene.DataPartitioning; import org.elasticsearch.core.PathUtils; import org.elasticsearch.core.SuppressForbidden; import org.elasticsearch.core.Tuple; @@ -95,6 +97,7 @@ import org.elasticsearch.xpack.esql.plan.logical.local.EmptyLocalSupplier; import org.elasticsearch.xpack.esql.plan.logical.local.LocalRelation; import org.elasticsearch.xpack.esql.plan.logical.local.LocalSupplier; +import org.elasticsearch.xpack.esql.planner.PlannerSettings; import org.elasticsearch.xpack.esql.planner.PlannerUtils; import org.elasticsearch.xpack.esql.plugin.EsqlPlugin; import org.elasticsearch.xpack.esql.plugin.QueryPragmas; @@ -424,6 +427,13 @@ public static LogicalOptimizerContext unboundLogicalOptimizerContext() { public static final Verifier TEST_VERIFIER = new Verifier(new Metrics(new EsqlFunctionRegistry()), new XPackLicenseState(() -> 0L)); + public static final PlannerSettings TEST_PLANNER_SETTINGS = new PlannerSettings( + DataPartitioning.AUTO, + ByteSizeValue.ofMb(1), + 10_000, + ByteSizeValue.ofMb(1) + ); + public static final TransportActionServices MOCK_TRANSPORT_ACTION_SERVICES = new TransportActionServices( createMockTransportService(), mock(SearchService.class), @@ -433,7 +443,8 @@ public static LogicalOptimizerContext unboundLogicalOptimizerContext() { mock(IndexNameExpressionResolver.class), null, new InferenceService(mock(Client.class)), - new BlockFactoryProvider(PlannerUtils.NON_BREAKING_BLOCK_FACTORY) + new BlockFactoryProvider(PlannerUtils.NON_BREAKING_BLOCK_FACTORY), + TEST_PLANNER_SETTINGS ); private static TransportService createMockTransportService() { diff --git a/x-pack/plugin/esql/src/internalClusterTest/java/org/elasticsearch/xpack/esql/action/LookupFromIndexIT.java b/x-pack/plugin/esql/src/internalClusterTest/java/org/elasticsearch/xpack/esql/action/LookupFromIndexIT.java index 27dce0b9bcbae..7da6db1b5235e 100644 --- a/x-pack/plugin/esql/src/internalClusterTest/java/org/elasticsearch/xpack/esql/action/LookupFromIndexIT.java +++ b/x-pack/plugin/esql/src/internalClusterTest/java/org/elasticsearch/xpack/esql/action/LookupFromIndexIT.java @@ -71,7 +71,7 @@ import org.elasticsearch.xpack.esql.plan.physical.FragmentExec; import org.elasticsearch.xpack.esql.plan.physical.PhysicalPlan; import org.elasticsearch.xpack.esql.planner.EsPhysicalOperationProviders; -import org.elasticsearch.xpack.esql.planner.PhysicalSettings; +import org.elasticsearch.xpack.esql.planner.PlannerSettings; import org.elasticsearch.xpack.esql.planner.PlannerUtils; import org.elasticsearch.xpack.esql.plugin.EsqlPlugin; import org.elasticsearch.xpack.esql.plugin.QueryPragmas; @@ -358,7 +358,7 @@ private void runLookup(List keyTypes, PopulateIndices populateIndices, ); } ValuesSourceReaderOperator.Factory reader = new ValuesSourceReaderOperator.Factory( - PhysicalSettings.VALUES_LOADING_JUMBO_SIZE.getDefault(Settings.EMPTY), + PlannerSettings.VALUES_LOADING_JUMBO_SIZE.getDefault(Settings.EMPTY), fieldInfos, List.of(new ValuesSourceReaderOperator.ShardContext(searchContext.getSearchExecutionContext().getIndexReader(), () -> { throw new IllegalStateException("can't load source here"); diff --git a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/optimizer/LocalPhysicalOptimizerContext.java b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/optimizer/LocalPhysicalOptimizerContext.java index 5d6c2890aef1f..33cd4a13d9d4d 100644 --- a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/optimizer/LocalPhysicalOptimizerContext.java +++ b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/optimizer/LocalPhysicalOptimizerContext.java @@ -8,13 +8,13 @@ package org.elasticsearch.xpack.esql.optimizer; import org.elasticsearch.xpack.esql.core.expression.FoldContext; -import org.elasticsearch.xpack.esql.planner.PhysicalSettings; +import org.elasticsearch.xpack.esql.planner.PlannerSettings; import org.elasticsearch.xpack.esql.plugin.EsqlFlags; import org.elasticsearch.xpack.esql.session.Configuration; import org.elasticsearch.xpack.esql.stats.SearchStats; public record LocalPhysicalOptimizerContext( - PhysicalSettings physicalSettings, + PlannerSettings plannerSettings, EsqlFlags flags, Configuration configuration, FoldContext foldCtx, diff --git a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/optimizer/rules/physical/local/PushTopNToSource.java b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/optimizer/rules/physical/local/PushTopNToSource.java index 798e58c64bf3d..dc6eab059d8ba 100644 --- a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/optimizer/rules/physical/local/PushTopNToSource.java +++ b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/optimizer/rules/physical/local/PushTopNToSource.java @@ -29,7 +29,7 @@ import org.elasticsearch.xpack.esql.plan.physical.EvalExec; import org.elasticsearch.xpack.esql.plan.physical.PhysicalPlan; import org.elasticsearch.xpack.esql.plan.physical.TopNExec; -import org.elasticsearch.xpack.esql.planner.PhysicalSettings; +import org.elasticsearch.xpack.esql.planner.PlannerSettings; import java.util.ArrayList; import java.util.LinkedHashMap; @@ -66,7 +66,7 @@ public class PushTopNToSource extends PhysicalOptimizerRules.ParameterizedOptimi @Override protected PhysicalPlan rule(TopNExec topNExec, LocalPhysicalOptimizerContext ctx) { Pushable pushable = evaluatePushable( - ctx.physicalSettings(), + ctx.plannerSettings(), ctx.foldCtx(), topNExec, LucenePushdownPredicates.from(ctx.searchStats(), ctx.flags()) @@ -132,7 +132,7 @@ public PhysicalPlan rewrite(TopNExec topNExec) { } private static Pushable evaluatePushable( - PhysicalSettings physicalSettings, + PlannerSettings plannerSettings, FoldContext ctx, TopNExec topNExec, LucenePushdownPredicates lucenePushdownPredicates @@ -141,14 +141,14 @@ private static Pushable evaluatePushable( if (child instanceof EsQueryExec queryExec && queryExec.canPushSorts() && canPushDownOrders(topNExec.order(), lucenePushdownPredicates) - && canPushLimit(topNExec, physicalSettings)) { + && canPushLimit(topNExec, plannerSettings)) { // With the simplest case of `FROM index | SORT ...` we only allow pushing down if the sort is on a field return new PushableQueryExec(queryExec); } if (child instanceof EvalExec evalExec && evalExec.child() instanceof EsQueryExec queryExec && queryExec.canPushSorts() - && canPushLimit(topNExec, physicalSettings)) { + && canPushLimit(topNExec, plannerSettings)) { // When we have an EVAL between the FROM and the SORT, we consider pushing down if the sort is on a field and/or // a distance function defined in the EVAL. We also move the EVAL to after the SORT. List orders = topNExec.order(); @@ -220,8 +220,8 @@ private static boolean canPushDownOrders(List orders, LucenePushdownPredi return orders.stream().allMatch(o -> isSortableAttribute.apply(o.child(), lucenePushdownPredicates)); } - private static boolean canPushLimit(TopNExec topn, PhysicalSettings physicalSettings) { - return Foldables.limitValue(topn.limit(), topn.sourceText()) <= physicalSettings.luceneTopNLimit(); + private static boolean canPushLimit(TopNExec topn, PlannerSettings plannerSettings) { + return Foldables.limitValue(topn.limit(), topn.sourceText()) <= plannerSettings.luceneTopNLimit(); } private static List buildFieldSorts(List orders) { diff --git a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/planner/EsPhysicalOperationProviders.java b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/planner/EsPhysicalOperationProviders.java index 7b9966bc7926b..29c98a073938e 100644 --- a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/planner/EsPhysicalOperationProviders.java +++ b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/planner/EsPhysicalOperationProviders.java @@ -144,17 +144,17 @@ public boolean hasReferences() { } private final List shardContexts; - private final PhysicalSettings physicalSettings; + private final PlannerSettings plannerSettings; public EsPhysicalOperationProviders( FoldContext foldContext, List shardContexts, AnalysisRegistry analysisRegistry, - PhysicalSettings physicalSettings + PlannerSettings plannerSettings ) { super(foldContext, analysisRegistry); this.shardContexts = shardContexts; - this.physicalSettings = physicalSettings; + this.plannerSettings = plannerSettings; } @Override @@ -176,7 +176,7 @@ public final PhysicalOperation fieldExtractPhysicalOperation(FieldExtractExec fi } var fields = extractFields(fieldExtractExec); return source.with( - new ValuesSourceReaderOperator.Factory(physicalSettings.valuesLoadingJumboSize(), fields, readers, docChannel), + new ValuesSourceReaderOperator.Factory(plannerSettings.valuesLoadingJumboSize(), fields, readers, docChannel), layout.build() ); } @@ -299,7 +299,7 @@ public final PhysicalOperation sourcePhysicalOperation(EsQueryExec esQueryExec, luceneFactory = new LuceneTopNSourceOperator.Factory( shardContexts, querySupplier(esQueryExec.query()), - context.queryPragmas().dataPartitioning(physicalSettings.defaultDataPartitioning()), + context.queryPragmas().dataPartitioning(plannerSettings.defaultDataPartitioning()), context.queryPragmas().taskConcurrency(), context.pageSize(rowEstimatedSize), limit, @@ -319,7 +319,7 @@ public final PhysicalOperation sourcePhysicalOperation(EsQueryExec esQueryExec, luceneFactory = new LuceneSourceOperator.Factory( shardContexts, querySupplier(esQueryExec.queryBuilderAndTags()), - context.queryPragmas().dataPartitioning(physicalSettings.defaultDataPartitioning()), + context.queryPragmas().dataPartitioning(plannerSettings.defaultDataPartitioning()), context.autoPartitioningStrategy().get(), context.queryPragmas().taskConcurrency(), context.pageSize(rowEstimatedSize), @@ -388,7 +388,7 @@ public LuceneCountOperator.Factory countSource(LocalExecutionPlannerContext cont return new LuceneCountOperator.Factory( shardContexts, querySupplier(queryBuilder), - context.queryPragmas().dataPartitioning(physicalSettings.defaultDataPartitioning()), + context.queryPragmas().dataPartitioning(plannerSettings.defaultDataPartitioning()), context.queryPragmas().taskConcurrency(), List.of(), limit == null ? NO_LIMIT : (Integer) limit.fold(context.foldCtx()) diff --git a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/planner/PhysicalSettings.java b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/planner/PlannerSettings.java similarity index 71% rename from x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/planner/PhysicalSettings.java rename to x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/planner/PlannerSettings.java index 369fc73d4fdd3..d23825af11284 100644 --- a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/planner/PhysicalSettings.java +++ b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/planner/PlannerSettings.java @@ -19,7 +19,7 @@ /** * Values for cluster level settings used in physical planning. */ -public class PhysicalSettings { +public class PlannerSettings { public static final Setting DEFAULT_DATA_PARTITIONING = Setting.enumSetting( DataPartitioning.class, "esql.default_data_partitioning", @@ -45,26 +45,42 @@ public class PhysicalSettings { Setting.Property.Dynamic ); + public static final Setting INTERMEDIATE_LOCAL_RELATION_MAX_SIZE = Setting.memorySizeSetting( + "esql.intermediate_local_relation_max_size", + "0.1%", + Setting.Property.NodeScope, + Setting.Property.Dynamic + ); + private volatile DataPartitioning defaultDataPartitioning; private volatile ByteSizeValue valuesLoadingJumboSize; private volatile int luceneTopNLimit; + private volatile ByteSizeValue intermediateLocalRelationMaxSize; /** * Ctor for prod that listens for updates from the {@link ClusterService}. */ - public PhysicalSettings(ClusterService clusterService) { - clusterService.getClusterSettings().initializeAndWatch(DEFAULT_DATA_PARTITIONING, v -> this.defaultDataPartitioning = v); - clusterService.getClusterSettings().initializeAndWatch(VALUES_LOADING_JUMBO_SIZE, v -> this.valuesLoadingJumboSize = v); - clusterService.getClusterSettings().initializeAndWatch(LUCENE_TOPN_LIMIT, v -> this.luceneTopNLimit = v); + public PlannerSettings(ClusterService clusterService) { + var clusterSettings = clusterService.getClusterSettings(); + clusterSettings.initializeAndWatch(DEFAULT_DATA_PARTITIONING, v -> this.defaultDataPartitioning = v); + clusterSettings.initializeAndWatch(VALUES_LOADING_JUMBO_SIZE, v -> this.valuesLoadingJumboSize = v); + clusterSettings.initializeAndWatch(LUCENE_TOPN_LIMIT, v -> this.luceneTopNLimit = v); + clusterSettings.initializeAndWatch(INTERMEDIATE_LOCAL_RELATION_MAX_SIZE, v -> this.intermediateLocalRelationMaxSize = v); } /** * Ctor for testing. */ - public PhysicalSettings(DataPartitioning defaultDataPartitioning, ByteSizeValue valuesLoadingJumboSize, int luceneTopNLimit) { + public PlannerSettings( + DataPartitioning defaultDataPartitioning, + ByteSizeValue valuesLoadingJumboSize, + int luceneTopNLimit, + ByteSizeValue intermediateLocalRelationMaxSize + ) { this.defaultDataPartitioning = defaultDataPartitioning; this.valuesLoadingJumboSize = valuesLoadingJumboSize; this.luceneTopNLimit = luceneTopNLimit; + this.intermediateLocalRelationMaxSize = intermediateLocalRelationMaxSize; } public DataPartitioning defaultDataPartitioning() { @@ -92,4 +108,8 @@ public ByteSizeValue valuesLoadingJumboSize() { public int luceneTopNLimit() { return luceneTopNLimit; } + + public ByteSizeValue intermediateLocalRelationMaxSize() { + return intermediateLocalRelationMaxSize; + } } diff --git a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/planner/PlannerUtils.java b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/planner/PlannerUtils.java index fc7184d01976e..c20034ad6e371 100644 --- a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/planner/PlannerUtils.java +++ b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/planner/PlannerUtils.java @@ -180,18 +180,18 @@ private static void forEachRelation(PhysicalPlan plan, Consumer acti } public static PhysicalPlan localPlan( - PhysicalSettings physicalSettings, + PlannerSettings plannerSettings, EsqlFlags flags, List searchContexts, Configuration configuration, FoldContext foldCtx, PhysicalPlan plan ) { - return localPlan(physicalSettings, flags, configuration, foldCtx, plan, SearchContextStats.from(searchContexts)); + return localPlan(plannerSettings, flags, configuration, foldCtx, plan, SearchContextStats.from(searchContexts)); } public static PhysicalPlan localPlan( - PhysicalSettings physicalSettings, + PlannerSettings plannerSettings, EsqlFlags flags, Configuration configuration, FoldContext foldCtx, @@ -200,7 +200,7 @@ public static PhysicalPlan localPlan( ) { final var logicalOptimizer = new LocalLogicalPlanOptimizer(new LocalLogicalOptimizerContext(configuration, foldCtx, searchStats)); var physicalOptimizer = new LocalPhysicalPlanOptimizer( - new LocalPhysicalOptimizerContext(physicalSettings, flags, configuration, foldCtx, searchStats) + new LocalPhysicalOptimizerContext(plannerSettings, flags, configuration, foldCtx, searchStats) ); return localPlan(plan, logicalOptimizer, physicalOptimizer); diff --git a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plugin/ComputeService.java b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plugin/ComputeService.java index ef72beff1ebf8..7d5f75f9201f9 100644 --- a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plugin/ComputeService.java +++ b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plugin/ComputeService.java @@ -61,7 +61,7 @@ import org.elasticsearch.xpack.esql.plan.physical.PhysicalPlan; import org.elasticsearch.xpack.esql.planner.EsPhysicalOperationProviders; import org.elasticsearch.xpack.esql.planner.LocalExecutionPlanner; -import org.elasticsearch.xpack.esql.planner.PhysicalSettings; +import org.elasticsearch.xpack.esql.planner.PlannerSettings; import org.elasticsearch.xpack.esql.planner.PlannerUtils; import org.elasticsearch.xpack.esql.session.Configuration; import org.elasticsearch.xpack.esql.session.EsqlCCSUtils; @@ -140,7 +140,7 @@ public class ComputeService { private final DataNodeComputeHandler dataNodeComputeHandler; private final ClusterComputeHandler clusterComputeHandler; private final ExchangeService exchangeService; - private final PhysicalSettings physicalSettings; + private final PlannerSettings plannerSettings; @SuppressWarnings("this-escape") public ComputeService( @@ -179,7 +179,7 @@ public ComputeService( esqlExecutor, dataNodeComputeHandler ); - this.physicalSettings = new PhysicalSettings(clusterService); + this.plannerSettings = transportActionServices.plannerSettings(); } public void execute( @@ -615,7 +615,7 @@ public SourceProvider createSourceProvider(SourceFilter sourceFilter) { context.foldCtx(), contexts, searchService.getIndicesService().getAnalysis(), - physicalSettings + plannerSettings ); try { LocalExecutionPlanner planner = new LocalExecutionPlanner( @@ -638,7 +638,7 @@ public SourceProvider createSourceProvider(SourceFilter sourceFilter) { LOGGER.debug("Received physical plan:\n{}", plan); var localPlan = PlannerUtils.localPlan( - physicalSettings, + plannerSettings, context.flags(), context.searchExecutionContexts(), context.configuration(), diff --git a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plugin/EsqlPlugin.java b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plugin/EsqlPlugin.java index c026718bf178a..f7e1de8ed2882 100644 --- a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plugin/EsqlPlugin.java +++ b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plugin/EsqlPlugin.java @@ -76,7 +76,7 @@ import org.elasticsearch.xpack.esql.io.stream.PlanStreamWrapperQueryBuilder; import org.elasticsearch.xpack.esql.plan.PlanWritables; import org.elasticsearch.xpack.esql.plan.logical.LogicalPlan; -import org.elasticsearch.xpack.esql.planner.PhysicalSettings; +import org.elasticsearch.xpack.esql.planner.PlannerSettings; import org.elasticsearch.xpack.esql.querydsl.query.SingleValueQuery; import org.elasticsearch.xpack.esql.querylog.EsqlQueryLog; import org.elasticsearch.xpack.esql.session.IndexResolver; @@ -276,9 +276,10 @@ public List> getSettings() { ESQL_QUERYLOG_THRESHOLD_INFO_SETTING, ESQL_QUERYLOG_THRESHOLD_WARN_SETTING, ESQL_QUERYLOG_INCLUDE_USER_SETTING, - PhysicalSettings.DEFAULT_DATA_PARTITIONING, - PhysicalSettings.VALUES_LOADING_JUMBO_SIZE, - PhysicalSettings.LUCENE_TOPN_LIMIT, + PlannerSettings.DEFAULT_DATA_PARTITIONING, + PlannerSettings.VALUES_LOADING_JUMBO_SIZE, + PlannerSettings.LUCENE_TOPN_LIMIT, + PlannerSettings.INTERMEDIATE_LOCAL_RELATION_MAX_SIZE, STORED_FIELDS_SEQUENTIAL_PROPORTION, EsqlFlags.ESQL_STRING_LIKE_ON_INDEX, EsqlFlags.ESQL_ROUNDTO_PUSHDOWN_THRESHOLD diff --git a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plugin/QueryPragmas.java b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plugin/QueryPragmas.java index cafe113f91f9a..d3cd54f3ba59f 100644 --- a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plugin/QueryPragmas.java +++ b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plugin/QueryPragmas.java @@ -22,7 +22,7 @@ import org.elasticsearch.index.mapper.MappedFieldType; import org.elasticsearch.threadpool.ThreadPool; import org.elasticsearch.xpack.esql.core.expression.Expression; -import org.elasticsearch.xpack.esql.planner.PhysicalSettings; +import org.elasticsearch.xpack.esql.planner.PlannerSettings; import java.io.IOException; import java.util.Locale; @@ -46,7 +46,7 @@ public final class QueryPragmas implements Writeable { * the enum {@link DataPartitioning} which has more documentation. Not an * {@link Setting#enumSetting} because those can't have {@code null} defaults. * {@code null} here means "use the default from the cluster setting - * named {@link PhysicalSettings#DEFAULT_DATA_PARTITIONING}." + * named {@link PlannerSettings#DEFAULT_DATA_PARTITIONING}." */ public static final Setting DATA_PARTITIONING = Setting.simpleString("data_partitioning"); diff --git a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plugin/TransportActionServices.java b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plugin/TransportActionServices.java index 3158e3de4c08c..66defcf0ab1e3 100644 --- a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plugin/TransportActionServices.java +++ b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plugin/TransportActionServices.java @@ -16,6 +16,7 @@ import org.elasticsearch.transport.TransportService; import org.elasticsearch.usage.UsageService; import org.elasticsearch.xpack.esql.inference.InferenceService; +import org.elasticsearch.xpack.esql.planner.PlannerSettings; public record TransportActionServices( TransportService transportService, @@ -26,5 +27,6 @@ public record TransportActionServices( IndexNameExpressionResolver indexNameExpressionResolver, UsageService usageService, InferenceService inferenceService, - BlockFactoryProvider blockFactoryProvider + BlockFactoryProvider blockFactoryProvider, + PlannerSettings plannerSettings ) {} diff --git a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plugin/TransportEsqlQueryAction.java b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plugin/TransportEsqlQueryAction.java index 82768bd4b0c15..22e05eb072bcd 100644 --- a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plugin/TransportEsqlQueryAction.java +++ b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plugin/TransportEsqlQueryAction.java @@ -52,6 +52,7 @@ import org.elasticsearch.xpack.esql.execution.PlanExecutor; import org.elasticsearch.xpack.esql.expression.function.UnsupportedAttribute; import org.elasticsearch.xpack.esql.inference.InferenceService; +import org.elasticsearch.xpack.esql.planner.PlannerSettings; import org.elasticsearch.xpack.esql.session.Configuration; import org.elasticsearch.xpack.esql.session.EsqlSession.PlanRunner; import org.elasticsearch.xpack.esql.session.Result; @@ -171,7 +172,8 @@ public TransportEsqlQueryAction( indexNameExpressionResolver, usageService, new InferenceService(client), - blockFactoryProvider + blockFactoryProvider, + new PlannerSettings(clusterService) ); this.computeService = new ComputeService( diff --git a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/session/EsqlSession.java b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/session/EsqlSession.java index 95d28081696d1..15831484755a0 100644 --- a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/session/EsqlSession.java +++ b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/session/EsqlSession.java @@ -108,9 +108,6 @@ public interface PlanRunner { } private static final TransportVersion LOOKUP_JOIN_CCS = TransportVersion.fromName("lookup_join_ccs"); - private static final double INTERMEDIATE_LOCAL_RELATION_CB_PERCETAGE = .1; - private static final long INTERMEDIATE_LOCAL_RELATION_MIN_SIZE = ByteSizeValue.ofMb(1).getBytes(); - private static final long INTERMEDIATE_LOCAL_RELATION_MAX_SIZE = ByteSizeValue.ofMb(30).getBytes(); private final String sessionId; private final Configuration configuration; @@ -131,7 +128,7 @@ public interface PlanRunner { private final InferenceService inferenceService; private final RemoteClusterService remoteClusterService; private final BlockFactory blockFactory; - private final long maxIntermediateLocalRelationSize; + private final ByteSizeValue intermediateLocalRelationMaxSize; private boolean explainMode; private String parsedPlanString; @@ -169,7 +166,7 @@ public EsqlSession( this.preMapper = new PreMapper(services); this.remoteClusterService = services.transportService().getRemoteClusterService(); this.blockFactory = services.blockFactoryProvider().blockFactory(); - maxIntermediateLocalRelationSize = maxIntermediateLocalRelationSize(blockFactory); + this.intermediateLocalRelationMaxSize = services.plannerSettings().intermediateLocalRelationMaxSize(); } public String sessionId() { @@ -339,11 +336,8 @@ private LocalRelation resultToPlan(Source planSource, Result result) { List pages = result.pages(); checkPagesBelowSize( pages, - maxIntermediateLocalRelationSize, - (actual) -> "sub-plan execution results too large [" - + ByteSizeValue.ofBytes(actual) - + "] > " - + ByteSizeValue.ofBytes(maxIntermediateLocalRelationSize) + intermediateLocalRelationMaxSize, + actual -> "sub-plan execution results too large [" + ByteSizeValue.ofBytes(actual) + "] > " + intermediateLocalRelationMaxSize ); List schema = result.schema(); Block[] blocks = SessionUtils.fromPages(schema, pages, blockFactory); @@ -357,14 +351,6 @@ private static void releaseLocalRelationBlocks(AtomicReference localRel } } - // returns INTERMEDIATE_LOCAL_RELATION_CB_PERCETAGE percent of the circuit breaker limit, but at least - // INTERMEDIATE_LOCAL_RELATION_MIN_SIZE and at most INTERMEDIATE_LOCAL_RELATION_MAX_SIZE - static long maxIntermediateLocalRelationSize(BlockFactory blockFactory) { - long breakerLimit = blockFactory.breaker().getLimit(); - long percentageLimit = (long) (breakerLimit * INTERMEDIATE_LOCAL_RELATION_CB_PERCETAGE / 100.d); - return Math.min(Math.max(percentageLimit, INTERMEDIATE_LOCAL_RELATION_MIN_SIZE), INTERMEDIATE_LOCAL_RELATION_MAX_SIZE); - } - private EsqlStatement parse(String query, QueryParams params) { var parsed = new EsqlParser().createQuery(query, params, planTelemetry, configuration); if (LOGGER.isDebugEnabled()) { diff --git a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/session/SessionUtils.java b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/session/SessionUtils.java index fb8f817c5ba03..c622e3edc2c0a 100644 --- a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/session/SessionUtils.java +++ b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/session/SessionUtils.java @@ -7,6 +7,7 @@ package org.elasticsearch.xpack.esql.session; +import org.elasticsearch.common.unit.ByteSizeValue; import org.elasticsearch.compute.data.Block; import org.elasticsearch.compute.data.BlockFactory; import org.elasticsearch.compute.data.BlockUtils; @@ -43,9 +44,9 @@ public static Block[] fromPages(List schema, List pages, BlockF return blocks; } - public static long checkPagesBelowSize(List pages, long maxSize, LongFunction exceptionMessage) { + public static long checkPagesBelowSize(List pages, ByteSizeValue maxSize, LongFunction exceptionMessage) { long currentSize = pages.stream().mapToLong(Page::ramBytesUsedByBlocks).sum(); - if (currentSize > maxSize) { + if (currentSize > maxSize.getBytes()) { throw new IllegalArgumentException(exceptionMessage.apply(currentSize)); } return currentSize; diff --git a/x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/CsvTests.java b/x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/CsvTests.java index 172017d9cde90..272d175f69bff 100644 --- a/x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/CsvTests.java +++ b/x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/CsvTests.java @@ -26,7 +26,6 @@ import org.elasticsearch.common.util.iterable.Iterables; import org.elasticsearch.compute.data.BlockFactory; import org.elasticsearch.compute.data.Page; -import org.elasticsearch.compute.lucene.DataPartitioning; import org.elasticsearch.compute.operator.Driver; import org.elasticsearch.compute.operator.DriverCompletionInfo; import org.elasticsearch.compute.operator.DriverRunner; @@ -88,7 +87,6 @@ import org.elasticsearch.xpack.esql.plan.physical.PhysicalPlan; import org.elasticsearch.xpack.esql.planner.LocalExecutionPlanner; import org.elasticsearch.xpack.esql.planner.LocalExecutionPlanner.LocalExecutionPlan; -import org.elasticsearch.xpack.esql.planner.PhysicalSettings; import org.elasticsearch.xpack.esql.planner.PlannerUtils; import org.elasticsearch.xpack.esql.planner.TestPhysicalOperationProviders; import org.elasticsearch.xpack.esql.planner.mapper.Mapper; @@ -123,6 +121,7 @@ import static org.elasticsearch.xpack.esql.CsvTestUtils.loadCsvSpecValues; import static org.elasticsearch.xpack.esql.CsvTestUtils.loadPageFromCsv; import static org.elasticsearch.xpack.esql.CsvTestsDataLoader.CSV_DATASET_MAP; +import static org.elasticsearch.xpack.esql.EsqlTestUtils.TEST_PLANNER_SETTINGS; import static org.elasticsearch.xpack.esql.EsqlTestUtils.TEST_VERIFIER; import static org.elasticsearch.xpack.esql.EsqlTestUtils.classpathResources; import static org.elasticsearch.xpack.esql.EsqlTestUtils.emptyInferenceResolution; @@ -727,10 +726,9 @@ void executeSubPlan( if (dataNodePlan != null) { var searchStats = new DisabledSearchStats(); var logicalTestOptimizer = new LocalLogicalPlanOptimizer(new LocalLogicalOptimizerContext(configuration, foldCtx, searchStats)); - var physicalSettings = new PhysicalSettings(DataPartitioning.AUTO, ByteSizeValue.ofMb(1), 10_000); var flags = new EsqlFlags(true); var physicalTestOptimizer = new TestLocalPhysicalPlanOptimizer( - new LocalPhysicalOptimizerContext(physicalSettings, flags, configuration, foldCtx, searchStats) + new LocalPhysicalOptimizerContext(TEST_PLANNER_SETTINGS, flags, configuration, foldCtx, searchStats) ); var csvDataNodePhysicalPlan = PlannerUtils.localPlan(dataNodePlan, logicalTestOptimizer, physicalTestOptimizer); diff --git a/x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/optimizer/LocalPhysicalPlanOptimizerTests.java b/x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/optimizer/LocalPhysicalPlanOptimizerTests.java index d4d57809bdc04..3a915bb937ce4 100644 --- a/x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/optimizer/LocalPhysicalPlanOptimizerTests.java +++ b/x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/optimizer/LocalPhysicalPlanOptimizerTests.java @@ -12,9 +12,7 @@ import org.apache.lucene.search.IndexSearcher; import org.elasticsearch.common.network.NetworkAddress; import org.elasticsearch.common.settings.Settings; -import org.elasticsearch.common.unit.ByteSizeValue; import org.elasticsearch.common.unit.Fuzziness; -import org.elasticsearch.compute.lucene.DataPartitioning; import org.elasticsearch.core.Tuple; import org.elasticsearch.index.IndexMode; import org.elasticsearch.index.mapper.MapperService; @@ -98,7 +96,6 @@ import org.elasticsearch.xpack.esql.plan.physical.ProjectExec; import org.elasticsearch.xpack.esql.plan.physical.TopNExec; import org.elasticsearch.xpack.esql.planner.FilterTests; -import org.elasticsearch.xpack.esql.planner.PhysicalSettings; import org.elasticsearch.xpack.esql.plugin.EsqlFlags; import org.elasticsearch.xpack.esql.plugin.QueryPragmas; import org.elasticsearch.xpack.esql.querydsl.query.SingleValueQuery; @@ -131,6 +128,7 @@ import static org.elasticsearch.index.query.QueryBuilders.rangeQuery; import static org.elasticsearch.index.query.QueryBuilders.termQuery; import static org.elasticsearch.index.query.QueryBuilders.termsQuery; +import static org.elasticsearch.xpack.esql.EsqlTestUtils.TEST_PLANNER_SETTINGS; import static org.elasticsearch.xpack.esql.EsqlTestUtils.TEST_VERIFIER; import static org.elasticsearch.xpack.esql.EsqlTestUtils.as; import static org.elasticsearch.xpack.esql.EsqlTestUtils.configuration; @@ -2475,10 +2473,9 @@ public void testVerifierOnMissingReferences() throws Exception { } private LocalPhysicalPlanOptimizer getCustomRulesLocalPhysicalPlanOptimizer(List> batches) { - var physicalSettings = new PhysicalSettings(DataPartitioning.AUTO, ByteSizeValue.ofMb(1), 10_000); var flags = new EsqlFlags(true); LocalPhysicalOptimizerContext context = new LocalPhysicalOptimizerContext( - physicalSettings, + TEST_PLANNER_SETTINGS, flags, config, FoldContext.small(), diff --git a/x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/optimizer/PhysicalPlanOptimizerTests.java b/x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/optimizer/PhysicalPlanOptimizerTests.java index b6b8367379b37..a9c938e6b58d2 100644 --- a/x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/optimizer/PhysicalPlanOptimizerTests.java +++ b/x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/optimizer/PhysicalPlanOptimizerTests.java @@ -14,11 +14,9 @@ import org.elasticsearch.common.geo.ShapeRelation; import org.elasticsearch.common.lucene.BytesRefs; import org.elasticsearch.common.settings.Settings; -import org.elasticsearch.common.unit.ByteSizeValue; import org.elasticsearch.common.util.BigArrays; import org.elasticsearch.common.util.set.Sets; import org.elasticsearch.compute.aggregation.AggregatorMode; -import org.elasticsearch.compute.lucene.DataPartitioning; import org.elasticsearch.compute.operator.exchange.ExchangeSinkHandler; import org.elasticsearch.compute.operator.exchange.ExchangeSourceHandler; import org.elasticsearch.compute.test.TestBlockFactory; @@ -138,7 +136,6 @@ import org.elasticsearch.xpack.esql.plan.physical.UnaryExec; import org.elasticsearch.xpack.esql.planner.EsPhysicalOperationProviders; import org.elasticsearch.xpack.esql.planner.LocalExecutionPlanner; -import org.elasticsearch.xpack.esql.planner.PhysicalSettings; import org.elasticsearch.xpack.esql.planner.PlannerUtils; import org.elasticsearch.xpack.esql.planner.mapper.Mapper; import org.elasticsearch.xpack.esql.plugin.EsqlFlags; @@ -172,6 +169,7 @@ import static org.elasticsearch.index.query.QueryBuilders.existsQuery; import static org.elasticsearch.test.ListMatcher.matchesList; import static org.elasticsearch.test.MapMatcher.assertMap; +import static org.elasticsearch.xpack.esql.EsqlTestUtils.TEST_PLANNER_SETTINGS; import static org.elasticsearch.xpack.esql.EsqlTestUtils.TEST_SEARCH_STATS; import static org.elasticsearch.xpack.esql.EsqlTestUtils.TEST_VERIFIER; import static org.elasticsearch.xpack.esql.EsqlTestUtils.as; @@ -7876,9 +7874,8 @@ private LocalExecutionPlanner.LocalExecutionPlan physicalOperationsFromPhysicalP // The TopN needs an estimated row size for the planner to work var plans = PlannerUtils.breakPlanBetweenCoordinatorAndDataNode(EstimatesRowSize.estimateRowSize(0, plan), config); plan = useDataNodePlan ? plans.v2() : plans.v1(); - var physicalSettings = new PhysicalSettings(DataPartitioning.AUTO, ByteSizeValue.ofMb(1), 10_000); var flags = new EsqlFlags(true); - plan = PlannerUtils.localPlan(physicalSettings, flags, config, FoldContext.small(), plan, TEST_SEARCH_STATS); + plan = PlannerUtils.localPlan(TEST_PLANNER_SETTINGS, flags, config, FoldContext.small(), plan, TEST_SEARCH_STATS); ExchangeSinkHandler exchangeSinkHandler = new ExchangeSinkHandler(null, 10, () -> 10); LocalExecutionPlanner planner = new LocalExecutionPlanner( "test", @@ -7893,12 +7890,7 @@ private LocalExecutionPlanner.LocalExecutionPlan physicalOperationsFromPhysicalP null, null, null, - new EsPhysicalOperationProviders( - FoldContext.small(), - List.of(), - null, - new PhysicalSettings(DataPartitioning.AUTO, ByteSizeValue.ofMb(1), 10_000) - ), + new EsPhysicalOperationProviders(FoldContext.small(), List.of(), null, TEST_PLANNER_SETTINGS), List.of() ); @@ -8255,9 +8247,8 @@ private PhysicalPlan optimizedPlan(PhysicalPlan plan, SearchStats searchStats) { // individually hence why here the plan is kept as is var l = p.transformUp(FragmentExec.class, fragment -> { - var physicalSettings = new PhysicalSettings(DataPartitioning.AUTO, ByteSizeValue.ofMb(1), 10_000); var flags = new EsqlFlags(true); - var localPlan = PlannerUtils.localPlan(physicalSettings, flags, config, FoldContext.small(), fragment, searchStats); + var localPlan = PlannerUtils.localPlan(TEST_PLANNER_SETTINGS, flags, config, FoldContext.small(), fragment, searchStats); return EstimatesRowSize.estimateRowSize(fragment.estimatedRowSize(), localPlan); }); diff --git a/x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/optimizer/TestPlannerOptimizer.java b/x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/optimizer/TestPlannerOptimizer.java index 6602c461c1735..018fb23a03bbd 100644 --- a/x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/optimizer/TestPlannerOptimizer.java +++ b/x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/optimizer/TestPlannerOptimizer.java @@ -7,8 +7,6 @@ package org.elasticsearch.xpack.esql.optimizer; -import org.elasticsearch.common.unit.ByteSizeValue; -import org.elasticsearch.compute.lucene.DataPartitioning; import org.elasticsearch.xpack.esql.EsqlTestUtils; import org.elasticsearch.xpack.esql.analysis.Analyzer; import org.elasticsearch.xpack.esql.core.expression.FoldContext; @@ -16,13 +14,14 @@ import org.elasticsearch.xpack.esql.plan.logical.LogicalPlan; import org.elasticsearch.xpack.esql.plan.physical.EstimatesRowSize; import org.elasticsearch.xpack.esql.plan.physical.PhysicalPlan; -import org.elasticsearch.xpack.esql.planner.PhysicalSettings; import org.elasticsearch.xpack.esql.planner.PlannerUtils; import org.elasticsearch.xpack.esql.planner.mapper.Mapper; import org.elasticsearch.xpack.esql.plugin.EsqlFlags; import org.elasticsearch.xpack.esql.session.Configuration; import org.elasticsearch.xpack.esql.stats.SearchStats; +import static org.elasticsearch.xpack.esql.EsqlTestUtils.TEST_PLANNER_SETTINGS; + public class TestPlannerOptimizer { private final EsqlParser parser; private final Analyzer analyzer; @@ -78,9 +77,8 @@ private PhysicalPlan optimizedPlan(PhysicalPlan plan, SearchStats searchStats, E var logicalTestOptimizer = new LocalLogicalPlanOptimizer( new LocalLogicalOptimizerContext(config, FoldContext.small(), searchStats) ); - var physicalSettings = new PhysicalSettings(DataPartitioning.AUTO, ByteSizeValue.ofMb(1), 10_000); var physicalTestOptimizer = new TestLocalPhysicalPlanOptimizer( - new LocalPhysicalOptimizerContext(physicalSettings, esqlFlags, config, FoldContext.small(), searchStats), + new LocalPhysicalOptimizerContext(TEST_PLANNER_SETTINGS, esqlFlags, config, FoldContext.small(), searchStats), true ); var l = PlannerUtils.localPlan(physicalPlan, logicalTestOptimizer, physicalTestOptimizer); diff --git a/x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/optimizer/rules/physical/local/PushTopNToSourceTests.java b/x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/optimizer/rules/physical/local/PushTopNToSourceTests.java index 82a90ba283754..5dd5bb6f93a1a 100644 --- a/x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/optimizer/rules/physical/local/PushTopNToSourceTests.java +++ b/x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/optimizer/rules/physical/local/PushTopNToSourceTests.java @@ -8,8 +8,6 @@ package org.elasticsearch.xpack.esql.optimizer.rules.physical.local; import org.apache.lucene.util.BytesRef; -import org.elasticsearch.common.unit.ByteSizeValue; -import org.elasticsearch.compute.lucene.DataPartitioning; import org.elasticsearch.geometry.Geometry; import org.elasticsearch.geometry.utils.GeometryValidator; import org.elasticsearch.geometry.utils.WellKnownBinary; @@ -37,7 +35,6 @@ import org.elasticsearch.xpack.esql.plan.physical.EvalExec; import org.elasticsearch.xpack.esql.plan.physical.PhysicalPlan; import org.elasticsearch.xpack.esql.plan.physical.TopNExec; -import org.elasticsearch.xpack.esql.planner.PhysicalSettings; import org.elasticsearch.xpack.esql.plugin.EsqlFlags; import org.elasticsearch.xpack.esql.stats.SearchStats; @@ -51,6 +48,7 @@ import java.util.Map; import java.util.function.Function; +import static org.elasticsearch.xpack.esql.EsqlTestUtils.TEST_PLANNER_SETTINGS; import static org.elasticsearch.xpack.esql.core.type.DataType.DOUBLE; import static org.elasticsearch.xpack.esql.core.type.DataType.GEO_POINT; import static org.elasticsearch.xpack.esql.core.type.DataType.INTEGER; @@ -421,7 +419,7 @@ private static void assertNoPushdownSort(TestPhysicalPlanBuilder builder, String private static PhysicalPlan pushTopNToSource(TopNExec topNExec) { var configuration = EsqlTestUtils.configuration("from test"); var ctx = new LocalPhysicalOptimizerContext( - new PhysicalSettings(DataPartitioning.AUTO, ByteSizeValue.ofMb(1), 10_000), + TEST_PLANNER_SETTINGS, new EsqlFlags(true), configuration, FoldContext.small(), diff --git a/x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/planner/EsPhysicalOperationProvidersTests.java b/x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/planner/EsPhysicalOperationProvidersTests.java index 23bc2f2b2990b..dc3f3c374bd37 100644 --- a/x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/planner/EsPhysicalOperationProvidersTests.java +++ b/x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/planner/EsPhysicalOperationProvidersTests.java @@ -9,8 +9,6 @@ import org.elasticsearch.cluster.metadata.IndexMetadata; import org.elasticsearch.common.settings.Settings; -import org.elasticsearch.common.unit.ByteSizeValue; -import org.elasticsearch.compute.lucene.DataPartitioning; import org.elasticsearch.index.Index; import org.elasticsearch.index.IndexMode; import org.elasticsearch.index.IndexSettings; @@ -50,6 +48,7 @@ import java.util.function.BiFunction; import static java.util.Collections.emptyMap; +import static org.elasticsearch.xpack.esql.EsqlTestUtils.TEST_PLANNER_SETTINGS; import static org.hamcrest.Matchers.equalTo; public class EsPhysicalOperationProvidersTests extends ESTestCase { @@ -82,7 +81,7 @@ record TestCase(QueryBuilder query, List nullsFilteredFields) { FoldContext.small(), List.of(new EsPhysicalOperationProviders.DefaultShardContext(0, () -> {}, createMockContext(), AliasFilter.EMPTY)), null, - new PhysicalSettings(DataPartitioning.AUTO, ByteSizeValue.ofMb(1), 10_000) + TEST_PLANNER_SETTINGS ); for (TestCase testCase : testCases) { EsQueryExec queryExec = new EsQueryExec( diff --git a/x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/planner/LocalExecutionPlannerTests.java b/x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/planner/LocalExecutionPlannerTests.java index 014733ab005b6..804a74b0ff31f 100644 --- a/x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/planner/LocalExecutionPlannerTests.java +++ b/x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/planner/LocalExecutionPlannerTests.java @@ -19,9 +19,7 @@ import org.apache.lucene.tests.index.RandomIndexWriter; import org.elasticsearch.cluster.ClusterName; import org.elasticsearch.common.settings.Settings; -import org.elasticsearch.common.unit.ByteSizeValue; import org.elasticsearch.common.util.BigArrays; -import org.elasticsearch.compute.lucene.DataPartitioning; import org.elasticsearch.compute.lucene.LuceneSourceOperator; import org.elasticsearch.compute.lucene.LuceneTopNSourceOperator; import org.elasticsearch.compute.lucene.read.ValuesSourceReaderOperator; @@ -70,6 +68,7 @@ import java.util.List; import java.util.Map; +import static org.elasticsearch.xpack.esql.EsqlTestUtils.TEST_PLANNER_SETTINGS; import static org.hamcrest.Matchers.equalTo; import static org.hamcrest.Matchers.instanceOf; import static org.hamcrest.Matchers.lessThanOrEqualTo; @@ -326,12 +325,7 @@ private Configuration config() { } private EsPhysicalOperationProviders esPhysicalOperationProviders(List shardContexts) { - return new EsPhysicalOperationProviders( - FoldContext.small(), - shardContexts, - null, - new PhysicalSettings(DataPartitioning.AUTO, ByteSizeValue.ofMb(1), 10_000) - ); + return new EsPhysicalOperationProviders(FoldContext.small(), shardContexts, null, TEST_PLANNER_SETTINGS); } private List createShardContexts() throws IOException { diff --git a/x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/session/SessionUtilsTests.java b/x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/session/SessionUtilsTests.java index d599e924066d2..d71e5ee39df06 100644 --- a/x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/session/SessionUtilsTests.java +++ b/x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/session/SessionUtilsTests.java @@ -31,7 +31,6 @@ import static org.elasticsearch.common.unit.ByteSizeUnit.GB; import static org.elasticsearch.xpack.esql.plan.AbstractNodeSerializationTests.randomSource; -import static org.elasticsearch.xpack.esql.session.EsqlSession.maxIntermediateLocalRelationSize; import static org.elasticsearch.xpack.esql.session.SessionUtils.checkPagesBelowSize; import static org.elasticsearch.xpack.esql.session.SessionUtils.fromPages; import static org.hamcrest.Matchers.containsString; @@ -90,7 +89,7 @@ public void testCheckPagesBelowSize() { var message = "data too large: "; var ex = assertThrows( IllegalArgumentException.class, - () -> checkPagesBelowSize(pagesRec.pages, pagesRec.dataLen - 1, l -> message + l) + () -> checkPagesBelowSize(pagesRec.pages, ByteSizeValue.ofBytes(pagesRec.dataLen - 1), l -> message + l) ); // pages are mocked, their size is considerably larger than dataLen long pagesRamSize = pagesRec.pages.stream().mapToLong(Page::ramBytesUsedByBlocks).sum(); @@ -98,26 +97,6 @@ public void testCheckPagesBelowSize() { } } - // EsqlSession's - public void testMaxIntermediateLocalRelationSize() { - var circuitBreaker = mock(CircuitBreaker.class); - var blockFactory = mock(BlockFactory.class); - when(blockFactory.breaker()).thenReturn(circuitBreaker); - - // enforcing upper limit - when(circuitBreaker.getLimit()).thenReturn(ByteSizeValue.ofGb(32).getBytes()); - assertThat(maxIntermediateLocalRelationSize(blockFactory), is(ByteSizeValue.ofMb(30).getBytes())); - - // enforcing lower limit - when(circuitBreaker.getLimit()).thenReturn(ByteSizeValue.ofMb(32).getBytes()); - assertThat(maxIntermediateLocalRelationSize(blockFactory), is(ByteSizeValue.ofMb(1).getBytes())); - - // in-between limits - var twentyGb = ByteSizeValue.ofGb(20).getBytes(); - when(circuitBreaker.getLimit()).thenReturn(twentyGb); - assertThat(maxIntermediateLocalRelationSize(blockFactory), is((long) (twentyGb / 1000.d))); - } - private static PagesRec generatePageSet(BlockFactory blockFactory) { final int minBytes = 500; final int maxBytes = randomIntBetween(minBytes, minBytes * 1_000);