Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@
import org.elasticsearch.xcontent.XContentBuilder;
import org.elasticsearch.xcontent.json.JsonXContent;
import org.elasticsearch.xpack.esql.CsvSpecReader.CsvTestCase;
import org.elasticsearch.xpack.esql.planner.PhysicalSettings;
import org.elasticsearch.xpack.esql.planner.PlannerSettings;
import org.elasticsearch.xpack.esql.plugin.ComputeService;
import org.elasticsearch.xpack.esql.qa.rest.EsqlSpecTestCase;
import org.junit.Before;
Expand Down Expand Up @@ -56,7 +56,7 @@ public void configureChunks() throws IOException {
boolean smallChunks = randomBoolean();
Request request = new Request("PUT", "/_cluster/settings");
XContentBuilder builder = JsonXContent.contentBuilder().startObject().startObject("persistent");
builder.field(PhysicalSettings.VALUES_LOADING_JUMBO_SIZE.getKey(), smallChunks ? "1kb" : null);
builder.field(PlannerSettings.VALUES_LOADING_JUMBO_SIZE.getKey(), smallChunks ? "1kb" : null);
request.setJsonEntity(Strings.toString(builder.endObject().endObject()));
assertOK(client().performRequest(request));
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,7 @@
import static org.elasticsearch.test.MapMatcher.assertMap;
import static org.elasticsearch.test.MapMatcher.matchesMap;
import static org.elasticsearch.xpack.esql.core.type.DataType.isMillisOrNanos;
import static org.elasticsearch.xpack.esql.planner.PhysicalSettings.LUCENE_TOPN_LIMIT;
import static org.elasticsearch.xpack.esql.planner.PlannerSettings.LUCENE_TOPN_LIMIT;
import static org.elasticsearch.xpack.esql.qa.rest.RestEsqlTestCase.Mode.SYNC;
import static org.elasticsearch.xpack.esql.tools.ProfileParser.parseProfile;
import static org.elasticsearch.xpack.esql.tools.ProfileParser.readProfileFromResponse;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
import org.elasticsearch.common.lucene.BytesRefs;
import org.elasticsearch.common.regex.Regex;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.unit.ByteSizeValue;
import org.elasticsearch.common.util.BigArrays;
import org.elasticsearch.common.util.concurrent.EsExecutors;
import org.elasticsearch.common.xcontent.XContentHelper;
Expand All @@ -35,6 +36,7 @@
import org.elasticsearch.compute.data.DoubleBlock;
import org.elasticsearch.compute.data.IntBlock;
import org.elasticsearch.compute.data.LongBlock;
import org.elasticsearch.compute.lucene.DataPartitioning;
import org.elasticsearch.core.PathUtils;
import org.elasticsearch.core.SuppressForbidden;
import org.elasticsearch.core.Tuple;
Expand Down Expand Up @@ -95,6 +97,7 @@
import org.elasticsearch.xpack.esql.plan.logical.local.EmptyLocalSupplier;
import org.elasticsearch.xpack.esql.plan.logical.local.LocalRelation;
import org.elasticsearch.xpack.esql.plan.logical.local.LocalSupplier;
import org.elasticsearch.xpack.esql.planner.PlannerSettings;
import org.elasticsearch.xpack.esql.planner.PlannerUtils;
import org.elasticsearch.xpack.esql.plugin.EsqlPlugin;
import org.elasticsearch.xpack.esql.plugin.QueryPragmas;
Expand Down Expand Up @@ -424,6 +427,13 @@ public static LogicalOptimizerContext unboundLogicalOptimizerContext() {

public static final Verifier TEST_VERIFIER = new Verifier(new Metrics(new EsqlFunctionRegistry()), new XPackLicenseState(() -> 0L));

public static final PlannerSettings TEST_PLANNER_SETTINGS = new PlannerSettings(
DataPartitioning.AUTO,
ByteSizeValue.ofMb(1),
10_000,
ByteSizeValue.ofMb(1)
);

public static final TransportActionServices MOCK_TRANSPORT_ACTION_SERVICES = new TransportActionServices(
createMockTransportService(),
mock(SearchService.class),
Expand All @@ -433,7 +443,8 @@ public static LogicalOptimizerContext unboundLogicalOptimizerContext() {
mock(IndexNameExpressionResolver.class),
null,
new InferenceService(mock(Client.class)),
new BlockFactoryProvider(PlannerUtils.NON_BREAKING_BLOCK_FACTORY)
new BlockFactoryProvider(PlannerUtils.NON_BREAKING_BLOCK_FACTORY),
TEST_PLANNER_SETTINGS
);

private static TransportService createMockTransportService() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -71,7 +71,7 @@
import org.elasticsearch.xpack.esql.plan.physical.FragmentExec;
import org.elasticsearch.xpack.esql.plan.physical.PhysicalPlan;
import org.elasticsearch.xpack.esql.planner.EsPhysicalOperationProviders;
import org.elasticsearch.xpack.esql.planner.PhysicalSettings;
import org.elasticsearch.xpack.esql.planner.PlannerSettings;
import org.elasticsearch.xpack.esql.planner.PlannerUtils;
import org.elasticsearch.xpack.esql.plugin.EsqlPlugin;
import org.elasticsearch.xpack.esql.plugin.QueryPragmas;
Expand Down Expand Up @@ -358,7 +358,7 @@ private void runLookup(List<DataType> keyTypes, PopulateIndices populateIndices,
);
}
ValuesSourceReaderOperator.Factory reader = new ValuesSourceReaderOperator.Factory(
PhysicalSettings.VALUES_LOADING_JUMBO_SIZE.getDefault(Settings.EMPTY),
PlannerSettings.VALUES_LOADING_JUMBO_SIZE.getDefault(Settings.EMPTY),
fieldInfos,
List.of(new ValuesSourceReaderOperator.ShardContext(searchContext.getSearchExecutionContext().getIndexReader(), () -> {
throw new IllegalStateException("can't load source here");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,13 +8,13 @@
package org.elasticsearch.xpack.esql.optimizer;

import org.elasticsearch.xpack.esql.core.expression.FoldContext;
import org.elasticsearch.xpack.esql.planner.PhysicalSettings;
import org.elasticsearch.xpack.esql.planner.PlannerSettings;
import org.elasticsearch.xpack.esql.plugin.EsqlFlags;
import org.elasticsearch.xpack.esql.session.Configuration;
import org.elasticsearch.xpack.esql.stats.SearchStats;

public record LocalPhysicalOptimizerContext(
PhysicalSettings physicalSettings,
PlannerSettings plannerSettings,
EsqlFlags flags,
Configuration configuration,
FoldContext foldCtx,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@
import org.elasticsearch.xpack.esql.plan.physical.EvalExec;
import org.elasticsearch.xpack.esql.plan.physical.PhysicalPlan;
import org.elasticsearch.xpack.esql.plan.physical.TopNExec;
import org.elasticsearch.xpack.esql.planner.PhysicalSettings;
import org.elasticsearch.xpack.esql.planner.PlannerSettings;

import java.util.ArrayList;
import java.util.LinkedHashMap;
Expand Down Expand Up @@ -66,7 +66,7 @@ public class PushTopNToSource extends PhysicalOptimizerRules.ParameterizedOptimi
@Override
protected PhysicalPlan rule(TopNExec topNExec, LocalPhysicalOptimizerContext ctx) {
Pushable pushable = evaluatePushable(
ctx.physicalSettings(),
ctx.plannerSettings(),
ctx.foldCtx(),
topNExec,
LucenePushdownPredicates.from(ctx.searchStats(), ctx.flags())
Expand Down Expand Up @@ -132,7 +132,7 @@ public PhysicalPlan rewrite(TopNExec topNExec) {
}

private static Pushable evaluatePushable(
PhysicalSettings physicalSettings,
PlannerSettings plannerSettings,
FoldContext ctx,
TopNExec topNExec,
LucenePushdownPredicates lucenePushdownPredicates
Expand All @@ -141,14 +141,14 @@ private static Pushable evaluatePushable(
if (child instanceof EsQueryExec queryExec
&& queryExec.canPushSorts()
&& canPushDownOrders(topNExec.order(), lucenePushdownPredicates)
&& canPushLimit(topNExec, physicalSettings)) {
&& canPushLimit(topNExec, plannerSettings)) {
// With the simplest case of `FROM index | SORT ...` we only allow pushing down if the sort is on a field
return new PushableQueryExec(queryExec);
}
if (child instanceof EvalExec evalExec
&& evalExec.child() instanceof EsQueryExec queryExec
&& queryExec.canPushSorts()
&& canPushLimit(topNExec, physicalSettings)) {
&& canPushLimit(topNExec, plannerSettings)) {
// When we have an EVAL between the FROM and the SORT, we consider pushing down if the sort is on a field and/or
// a distance function defined in the EVAL. We also move the EVAL to after the SORT.
List<Order> orders = topNExec.order();
Expand Down Expand Up @@ -220,8 +220,8 @@ private static boolean canPushDownOrders(List<Order> orders, LucenePushdownPredi
return orders.stream().allMatch(o -> isSortableAttribute.apply(o.child(), lucenePushdownPredicates));
}

private static boolean canPushLimit(TopNExec topn, PhysicalSettings physicalSettings) {
return Foldables.limitValue(topn.limit(), topn.sourceText()) <= physicalSettings.luceneTopNLimit();
private static boolean canPushLimit(TopNExec topn, PlannerSettings plannerSettings) {
return Foldables.limitValue(topn.limit(), topn.sourceText()) <= plannerSettings.luceneTopNLimit();
}

private static List<EsQueryExec.Sort> buildFieldSorts(List<Order> orders) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -144,17 +144,17 @@ public boolean hasReferences() {
}

private final List<ShardContext> shardContexts;
private final PhysicalSettings physicalSettings;
private final PlannerSettings plannerSettings;

public EsPhysicalOperationProviders(
FoldContext foldContext,
List<ShardContext> shardContexts,
AnalysisRegistry analysisRegistry,
PhysicalSettings physicalSettings
PlannerSettings plannerSettings
) {
super(foldContext, analysisRegistry);
this.shardContexts = shardContexts;
this.physicalSettings = physicalSettings;
this.plannerSettings = plannerSettings;
}

@Override
Expand All @@ -176,7 +176,7 @@ public final PhysicalOperation fieldExtractPhysicalOperation(FieldExtractExec fi
}
var fields = extractFields(fieldExtractExec);
return source.with(
new ValuesSourceReaderOperator.Factory(physicalSettings.valuesLoadingJumboSize(), fields, readers, docChannel),
new ValuesSourceReaderOperator.Factory(plannerSettings.valuesLoadingJumboSize(), fields, readers, docChannel),
layout.build()
);
}
Expand Down Expand Up @@ -299,7 +299,7 @@ public final PhysicalOperation sourcePhysicalOperation(EsQueryExec esQueryExec,
luceneFactory = new LuceneTopNSourceOperator.Factory(
shardContexts,
querySupplier(esQueryExec.query()),
context.queryPragmas().dataPartitioning(physicalSettings.defaultDataPartitioning()),
context.queryPragmas().dataPartitioning(plannerSettings.defaultDataPartitioning()),
context.queryPragmas().taskConcurrency(),
context.pageSize(rowEstimatedSize),
limit,
Expand All @@ -319,7 +319,7 @@ public final PhysicalOperation sourcePhysicalOperation(EsQueryExec esQueryExec,
luceneFactory = new LuceneSourceOperator.Factory(
shardContexts,
querySupplier(esQueryExec.queryBuilderAndTags()),
context.queryPragmas().dataPartitioning(physicalSettings.defaultDataPartitioning()),
context.queryPragmas().dataPartitioning(plannerSettings.defaultDataPartitioning()),
context.autoPartitioningStrategy().get(),
context.queryPragmas().taskConcurrency(),
context.pageSize(rowEstimatedSize),
Expand Down Expand Up @@ -388,7 +388,7 @@ public LuceneCountOperator.Factory countSource(LocalExecutionPlannerContext cont
return new LuceneCountOperator.Factory(
shardContexts,
querySupplier(queryBuilder),
context.queryPragmas().dataPartitioning(physicalSettings.defaultDataPartitioning()),
context.queryPragmas().dataPartitioning(plannerSettings.defaultDataPartitioning()),
context.queryPragmas().taskConcurrency(),
List.of(),
limit == null ? NO_LIMIT : (Integer) limit.fold(context.foldCtx())
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@
/**
* Values for cluster level settings used in physical planning.
*/
public class PhysicalSettings {
public class PlannerSettings {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ah. I was expecting a LogicalSettings on day. but this is just as good.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I was a bit hesitant about this rename, but I thought it might make sense since there's no hard distinguishing between them so far and even the new one is actually using while executing (subplans). But yes, then can be untangled if needed.

public static final Setting<DataPartitioning> DEFAULT_DATA_PARTITIONING = Setting.enumSetting(
DataPartitioning.class,
"esql.default_data_partitioning",
Expand All @@ -45,26 +45,42 @@ public class PhysicalSettings {
Setting.Property.Dynamic
);

public static final Setting<ByteSizeValue> INTERMEDIATE_LOCAL_RELATION_MAX_SIZE = Setting.memorySizeSetting(
"esql.intermediate_local_relation_max_size",
"0.1%",
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

On a 512mb heap this is half a megabyte. That feels fine.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeh.. I dropped the min/max bounds for the limit, since I think this proportionality should be working by itself. But can add them back if it proves it'll be useful.

Setting.Property.NodeScope,
Setting.Property.Dynamic
);

private volatile DataPartitioning defaultDataPartitioning;
private volatile ByteSizeValue valuesLoadingJumboSize;
private volatile int luceneTopNLimit;
private volatile ByteSizeValue intermediateLocalRelationMaxSize;

/**
* Ctor for prod that listens for updates from the {@link ClusterService}.
*/
public PhysicalSettings(ClusterService clusterService) {
clusterService.getClusterSettings().initializeAndWatch(DEFAULT_DATA_PARTITIONING, v -> this.defaultDataPartitioning = v);
clusterService.getClusterSettings().initializeAndWatch(VALUES_LOADING_JUMBO_SIZE, v -> this.valuesLoadingJumboSize = v);
clusterService.getClusterSettings().initializeAndWatch(LUCENE_TOPN_LIMIT, v -> this.luceneTopNLimit = v);
public PlannerSettings(ClusterService clusterService) {
var clusterSettings = clusterService.getClusterSettings();
clusterSettings.initializeAndWatch(DEFAULT_DATA_PARTITIONING, v -> this.defaultDataPartitioning = v);
clusterSettings.initializeAndWatch(VALUES_LOADING_JUMBO_SIZE, v -> this.valuesLoadingJumboSize = v);
clusterSettings.initializeAndWatch(LUCENE_TOPN_LIMIT, v -> this.luceneTopNLimit = v);
clusterSettings.initializeAndWatch(INTERMEDIATE_LOCAL_RELATION_MAX_SIZE, v -> this.intermediateLocalRelationMaxSize = v);
}

/**
* Ctor for testing.
*/
public PhysicalSettings(DataPartitioning defaultDataPartitioning, ByteSizeValue valuesLoadingJumboSize, int luceneTopNLimit) {
public PlannerSettings(
DataPartitioning defaultDataPartitioning,
ByteSizeValue valuesLoadingJumboSize,
int luceneTopNLimit,
ByteSizeValue intermediateLocalRelationMaxSize
) {
this.defaultDataPartitioning = defaultDataPartitioning;
this.valuesLoadingJumboSize = valuesLoadingJumboSize;
this.luceneTopNLimit = luceneTopNLimit;
this.intermediateLocalRelationMaxSize = intermediateLocalRelationMaxSize;
}

public DataPartitioning defaultDataPartitioning() {
Expand Down Expand Up @@ -92,4 +108,8 @@ public ByteSizeValue valuesLoadingJumboSize() {
public int luceneTopNLimit() {
return luceneTopNLimit;
}

public ByteSizeValue intermediateLocalRelationMaxSize() {
return intermediateLocalRelationMaxSize;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -180,18 +180,18 @@ private static void forEachRelation(PhysicalPlan plan, Consumer<EsRelation> acti
}

public static PhysicalPlan localPlan(
PhysicalSettings physicalSettings,
PlannerSettings plannerSettings,
EsqlFlags flags,
List<SearchExecutionContext> searchContexts,
Configuration configuration,
FoldContext foldCtx,
PhysicalPlan plan
) {
return localPlan(physicalSettings, flags, configuration, foldCtx, plan, SearchContextStats.from(searchContexts));
return localPlan(plannerSettings, flags, configuration, foldCtx, plan, SearchContextStats.from(searchContexts));
}

public static PhysicalPlan localPlan(
PhysicalSettings physicalSettings,
PlannerSettings plannerSettings,
EsqlFlags flags,
Configuration configuration,
FoldContext foldCtx,
Expand All @@ -200,7 +200,7 @@ public static PhysicalPlan localPlan(
) {
final var logicalOptimizer = new LocalLogicalPlanOptimizer(new LocalLogicalOptimizerContext(configuration, foldCtx, searchStats));
var physicalOptimizer = new LocalPhysicalPlanOptimizer(
new LocalPhysicalOptimizerContext(physicalSettings, flags, configuration, foldCtx, searchStats)
new LocalPhysicalOptimizerContext(plannerSettings, flags, configuration, foldCtx, searchStats)
);

return localPlan(plan, logicalOptimizer, physicalOptimizer);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,7 @@
import org.elasticsearch.xpack.esql.plan.physical.PhysicalPlan;
import org.elasticsearch.xpack.esql.planner.EsPhysicalOperationProviders;
import org.elasticsearch.xpack.esql.planner.LocalExecutionPlanner;
import org.elasticsearch.xpack.esql.planner.PhysicalSettings;
import org.elasticsearch.xpack.esql.planner.PlannerSettings;
import org.elasticsearch.xpack.esql.planner.PlannerUtils;
import org.elasticsearch.xpack.esql.session.Configuration;
import org.elasticsearch.xpack.esql.session.EsqlCCSUtils;
Expand Down Expand Up @@ -140,7 +140,7 @@ public class ComputeService {
private final DataNodeComputeHandler dataNodeComputeHandler;
private final ClusterComputeHandler clusterComputeHandler;
private final ExchangeService exchangeService;
private final PhysicalSettings physicalSettings;
private final PlannerSettings plannerSettings;

@SuppressWarnings("this-escape")
public ComputeService(
Expand Down Expand Up @@ -179,7 +179,7 @@ public ComputeService(
esqlExecutor,
dataNodeComputeHandler
);
this.physicalSettings = new PhysicalSettings(clusterService);
this.plannerSettings = transportActionServices.plannerSettings();
}

public void execute(
Expand Down Expand Up @@ -615,7 +615,7 @@ public SourceProvider createSourceProvider(SourceFilter sourceFilter) {
context.foldCtx(),
contexts,
searchService.getIndicesService().getAnalysis(),
physicalSettings
plannerSettings
);
try {
LocalExecutionPlanner planner = new LocalExecutionPlanner(
Expand All @@ -638,7 +638,7 @@ public SourceProvider createSourceProvider(SourceFilter sourceFilter) {
LOGGER.debug("Received physical plan:\n{}", plan);

var localPlan = PlannerUtils.localPlan(
physicalSettings,
plannerSettings,
context.flags(),
context.searchExecutionContexts(),
context.configuration(),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -76,7 +76,7 @@
import org.elasticsearch.xpack.esql.io.stream.PlanStreamWrapperQueryBuilder;
import org.elasticsearch.xpack.esql.plan.PlanWritables;
import org.elasticsearch.xpack.esql.plan.logical.LogicalPlan;
import org.elasticsearch.xpack.esql.planner.PhysicalSettings;
import org.elasticsearch.xpack.esql.planner.PlannerSettings;
import org.elasticsearch.xpack.esql.querydsl.query.SingleValueQuery;
import org.elasticsearch.xpack.esql.querylog.EsqlQueryLog;
import org.elasticsearch.xpack.esql.session.IndexResolver;
Expand Down Expand Up @@ -276,9 +276,10 @@ public List<Setting<?>> getSettings() {
ESQL_QUERYLOG_THRESHOLD_INFO_SETTING,
ESQL_QUERYLOG_THRESHOLD_WARN_SETTING,
ESQL_QUERYLOG_INCLUDE_USER_SETTING,
PhysicalSettings.DEFAULT_DATA_PARTITIONING,
PhysicalSettings.VALUES_LOADING_JUMBO_SIZE,
PhysicalSettings.LUCENE_TOPN_LIMIT,
PlannerSettings.DEFAULT_DATA_PARTITIONING,
PlannerSettings.VALUES_LOADING_JUMBO_SIZE,
PlannerSettings.LUCENE_TOPN_LIMIT,
PlannerSettings.INTERMEDIATE_LOCAL_RELATION_MAX_SIZE,
STORED_FIELDS_SEQUENTIAL_PROPORTION,
EsqlFlags.ESQL_STRING_LIKE_ON_INDEX,
EsqlFlags.ESQL_ROUNDTO_PUSHDOWN_THRESHOLD
Expand Down
Loading