diff --git a/docs/changelog/132512.yaml b/docs/changelog/132512.yaml new file mode 100644 index 0000000000000..66b55d45e36ec --- /dev/null +++ b/docs/changelog/132512.yaml @@ -0,0 +1,5 @@ +pr: 132512 +summary: Rewrite `RoundTo` to `QueryAndTags` +area: ES|QL +type: enhancement +issues: [] diff --git a/x-pack/plugin/esql/src/main/generated-src/org/elasticsearch/xpack/esql/expression/function/scalar/math/RoundToDouble.java b/x-pack/plugin/esql/src/main/generated-src/org/elasticsearch/xpack/esql/expression/function/scalar/math/RoundToDouble.java index 9196f9abe7de1..8463fcd89f8c5 100644 --- a/x-pack/plugin/esql/src/main/generated-src/org/elasticsearch/xpack/esql/expression/function/scalar/math/RoundToDouble.java +++ b/x-pack/plugin/esql/src/main/generated-src/org/elasticsearch/xpack/esql/expression/function/scalar/math/RoundToDouble.java @@ -25,7 +25,6 @@ class RoundToDouble { static final RoundTo.Build BUILD = (source, field, points) -> { double[] f = points.stream().mapToDouble(p -> ((Number) p).doubleValue()).toArray(); - Arrays.sort(f); return switch (f.length) { // TODO should be a consistent way to do the 0 version - is CASE(MV_COUNT(f) == 1, f[0]) case 1 -> new RoundToDouble1Evaluator.Factory(source, field, f[0]); diff --git a/x-pack/plugin/esql/src/main/generated-src/org/elasticsearch/xpack/esql/expression/function/scalar/math/RoundToInt.java b/x-pack/plugin/esql/src/main/generated-src/org/elasticsearch/xpack/esql/expression/function/scalar/math/RoundToInt.java index 0dfd93ecd2d18..d30a254745059 100644 --- a/x-pack/plugin/esql/src/main/generated-src/org/elasticsearch/xpack/esql/expression/function/scalar/math/RoundToInt.java +++ b/x-pack/plugin/esql/src/main/generated-src/org/elasticsearch/xpack/esql/expression/function/scalar/math/RoundToInt.java @@ -25,7 +25,6 @@ class RoundToInt { static final RoundTo.Build BUILD = (source, field, points) -> { int[] f = points.stream().mapToInt(p -> ((Number) p).intValue()).toArray(); - Arrays.sort(f); return switch (f.length) { // TODO should be a consistent way to do the 0 version - is CASE(MV_COUNT(f) == 1, f[0]) case 1 -> new RoundToInt1Evaluator.Factory(source, field, f[0]); diff --git a/x-pack/plugin/esql/src/main/generated-src/org/elasticsearch/xpack/esql/expression/function/scalar/math/RoundToLong.java b/x-pack/plugin/esql/src/main/generated-src/org/elasticsearch/xpack/esql/expression/function/scalar/math/RoundToLong.java index 6fbd5dbd2caca..28af14038e47d 100644 --- a/x-pack/plugin/esql/src/main/generated-src/org/elasticsearch/xpack/esql/expression/function/scalar/math/RoundToLong.java +++ b/x-pack/plugin/esql/src/main/generated-src/org/elasticsearch/xpack/esql/expression/function/scalar/math/RoundToLong.java @@ -25,7 +25,6 @@ class RoundToLong { static final RoundTo.Build BUILD = (source, field, points) -> { long[] f = points.stream().mapToLong(p -> ((Number) p).longValue()).toArray(); - Arrays.sort(f); return switch (f.length) { // TODO should be a consistent way to do the 0 version - is CASE(MV_COUNT(f) == 1, f[0]) case 1 -> new RoundToLong1Evaluator.Factory(source, field, f[0]); diff --git a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/expression/function/scalar/math/RoundTo.java b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/expression/function/scalar/math/RoundTo.java index 1b98db093e258..e0bd293c25dc4 100644 --- a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/expression/function/scalar/math/RoundTo.java +++ b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/expression/function/scalar/math/RoundTo.java @@ -18,6 +18,7 @@ import org.elasticsearch.xpack.esql.core.tree.NodeInfo; import org.elasticsearch.xpack.esql.core.tree.Source; import org.elasticsearch.xpack.esql.core.type.DataType; +import org.elasticsearch.xpack.esql.core.type.DataTypeConverter; import org.elasticsearch.xpack.esql.expression.Foldables; import org.elasticsearch.xpack.esql.expression.function.Example; import org.elasticsearch.xpack.esql.expression.function.FunctionAppliesTo; @@ -30,6 +31,7 @@ import java.io.IOException; import java.util.List; import java.util.Map; +import java.util.Objects; import static org.elasticsearch.common.logging.LoggerMessageFormat.format; import static org.elasticsearch.xpack.esql.core.expression.TypeResolutions.isFoldable; @@ -181,7 +183,8 @@ public ExpressionEvaluator.Factory toEvaluator(ToEvaluator toEvaluator) { ExpressionEvaluator.Factory field = toEvaluator.apply(field()); field = Cast.cast(source(), field().dataType(), dataType, field); List points = Iterators.toList(Iterators.map(points().iterator(), p -> Foldables.valueOf(toEvaluator.foldCtx(), p))); - return build.build(source(), field, points); + List sortedPoints = sortedRoundingPoints(points, dataType); // provide sorted points to the evaluator + return build.build(source(), field, sortedPoints); } interface Build { @@ -195,4 +198,27 @@ interface Build { Map.entry(LONG, RoundToLong.BUILD), Map.entry(DOUBLE, RoundToDouble.BUILD) ); + + public static List sortedRoundingPoints(List points, DataType dataType) { + List pointsTobeSorted = points.stream().filter(Objects::nonNull).map(p -> (Number) p).toList(); + + return switch (dataType) { + case INTEGER -> pointsTobeSorted.stream() + .mapToInt(Number::intValue) + .sorted() + .boxed() + .collect(java.util.stream.Collectors.toList()); + case DOUBLE -> pointsTobeSorted.stream() + .mapToDouble(Number::doubleValue) + .sorted() + .boxed() + .collect(java.util.stream.Collectors.toList()); + case LONG, DATETIME, DATE_NANOS -> pointsTobeSorted.stream() + .mapToLong(DataTypeConverter::safeToLong) + .sorted() + .boxed() + .collect(java.util.stream.Collectors.toList()); + default -> throw new IllegalArgumentException("Unsupported data type: " + dataType); + }; + } } diff --git a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/expression/function/scalar/math/X-RoundTo.java.st b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/expression/function/scalar/math/X-RoundTo.java.st index 849d4d338a386..6da3d00743c2e 100644 --- a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/expression/function/scalar/math/X-RoundTo.java.st +++ b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/expression/function/scalar/math/X-RoundTo.java.st @@ -25,7 +25,6 @@ import java.util.Arrays; class RoundTo$Type$ { static final RoundTo.Build BUILD = (source, field, points) -> { $type$[] f = points.stream().mapTo$Type$(p -> ((Number) p).$type$Value()).toArray(); - Arrays.sort(f); return switch (f.length) { // TODO should be a consistent way to do the 0 version - is CASE(MV_COUNT(f) == 1, f[0]) case 1 -> new RoundTo$Type$1Evaluator.Factory(source, field, f[0]); diff --git a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/optimizer/LocalPhysicalPlanOptimizer.java b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/optimizer/LocalPhysicalPlanOptimizer.java index af36963ac54a3..15cc1d54eb3a6 100644 --- a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/optimizer/LocalPhysicalPlanOptimizer.java +++ b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/optimizer/LocalPhysicalPlanOptimizer.java @@ -18,6 +18,7 @@ import org.elasticsearch.xpack.esql.optimizer.rules.physical.local.PushSampleToSource; import org.elasticsearch.xpack.esql.optimizer.rules.physical.local.PushStatsToSource; import org.elasticsearch.xpack.esql.optimizer.rules.physical.local.PushTopNToSource; +import org.elasticsearch.xpack.esql.optimizer.rules.physical.local.ReplaceRoundToWithQueryAndTags; import org.elasticsearch.xpack.esql.optimizer.rules.physical.local.ReplaceSourceAttributes; import org.elasticsearch.xpack.esql.optimizer.rules.physical.local.SpatialDocValuesExtraction; import org.elasticsearch.xpack.esql.optimizer.rules.physical.local.SpatialShapeBoundsExtraction; @@ -60,7 +61,7 @@ protected List> batches() { } protected static List> rules(boolean optimizeForEsSource) { - List> esSourceRules = new ArrayList<>(6); + List> esSourceRules = new ArrayList<>(7); esSourceRules.add(new ReplaceSourceAttributes()); if (optimizeForEsSource) { esSourceRules.add(new PushTopNToSource()); @@ -74,6 +75,12 @@ protected static List> rules(boolean optimizeForEsSource) { // execute the rules multiple times to improve the chances of things being pushed down @SuppressWarnings("unchecked") var pushdown = new Batch("Push to ES", esSourceRules.toArray(Rule[]::new)); + + // execute the SubstituteRoundToWithQueryAndTags rule once after all the other pushdown rules are applied, as this rule generate + // multiple QueryBuilders according the number of RoundTo points, it should be applied after all the other eligible pushdowns are + // done, and it should be executed only once. + var substitutionRules = new Batch<>("Substitute RoundTo with QueryAndTags", Limiter.ONCE, new ReplaceRoundToWithQueryAndTags()); + // add the field extraction in just one pass // add it at the end after all the other rules have ran var fieldExtraction = new Batch<>( @@ -84,6 +91,6 @@ protected static List> rules(boolean optimizeForEsSource) { new SpatialShapeBoundsExtraction(), new ParallelizeTimeSeriesSource() ); - return List.of(pushdown, fieldExtraction); + return optimizeForEsSource ? List.of(pushdown, substitutionRules, fieldExtraction) : List.of(pushdown, fieldExtraction); } } diff --git a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/optimizer/rules/physical/local/PushFiltersToSource.java b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/optimizer/rules/physical/local/PushFiltersToSource.java index ba382b9800ece..9ee93fa6c8223 100644 --- a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/optimizer/rules/physical/local/PushFiltersToSource.java +++ b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/optimizer/rules/physical/local/PushFiltersToSource.java @@ -126,10 +126,10 @@ private static PhysicalPlan rewrite( queryExec.indexMode(), queryExec.indexNameWithModes(), queryExec.output(), - query, queryExec.limit(), queryExec.sorts(), - queryExec.estimatedRowSize() + queryExec.estimatedRowSize(), + List.of(new EsQueryExec.QueryBuilderAndTags(query, List.of())) ); // If the eval contains other aliases, not just field attributes, we need to keep them in the plan PhysicalPlan plan = evalFields.isEmpty() ? queryExec : new EvalExec(filterExec.source(), queryExec, evalFields); diff --git a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/optimizer/rules/physical/local/ReplaceRoundToWithQueryAndTags.java b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/optimizer/rules/physical/local/ReplaceRoundToWithQueryAndTags.java new file mode 100644 index 0000000000000..4cdef35437ebf --- /dev/null +++ b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/optimizer/rules/physical/local/ReplaceRoundToWithQueryAndTags.java @@ -0,0 +1,488 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License + * 2.0; you may not use this file except in compliance with the Elastic License + * 2.0. + */ + +package org.elasticsearch.xpack.esql.optimizer.rules.physical.local; + +import org.elasticsearch.index.query.QueryBuilder; +import org.elasticsearch.logging.LogManager; +import org.elasticsearch.logging.Logger; +import org.elasticsearch.xpack.esql.core.expression.Alias; +import org.elasticsearch.xpack.esql.core.expression.Attribute; +import org.elasticsearch.xpack.esql.core.expression.Expression; +import org.elasticsearch.xpack.esql.core.expression.FieldAttribute; +import org.elasticsearch.xpack.esql.core.expression.Literal; +import org.elasticsearch.xpack.esql.core.querydsl.query.Query; +import org.elasticsearch.xpack.esql.core.tree.Source; +import org.elasticsearch.xpack.esql.core.type.DataType; +import org.elasticsearch.xpack.esql.core.type.EsField; +import org.elasticsearch.xpack.esql.core.util.Queries; +import org.elasticsearch.xpack.esql.expression.function.scalar.math.RoundTo; +import org.elasticsearch.xpack.esql.expression.predicate.Range; +import org.elasticsearch.xpack.esql.expression.predicate.nulls.IsNull; +import org.elasticsearch.xpack.esql.expression.predicate.operator.comparison.GreaterThanOrEqual; +import org.elasticsearch.xpack.esql.expression.predicate.operator.comparison.LessThan; +import org.elasticsearch.xpack.esql.optimizer.LocalPhysicalOptimizerContext; +import org.elasticsearch.xpack.esql.optimizer.PhysicalOptimizerRules; +import org.elasticsearch.xpack.esql.plan.physical.EsQueryExec; +import org.elasticsearch.xpack.esql.plan.physical.EvalExec; +import org.elasticsearch.xpack.esql.plan.physical.PhysicalPlan; + +import java.time.ZoneId; +import java.util.ArrayList; +import java.util.List; +import java.util.Map; + +import static org.elasticsearch.xpack.esql.core.type.DataTypeConverter.safeToLong; +import static org.elasticsearch.xpack.esql.planner.TranslatorHandler.TRANSLATOR_HANDLER; +import static org.elasticsearch.xpack.esql.plugin.QueryPragmas.ROUNDTO_PUSHDOWN_THRESHOLD; + +/** + * {@code ReplaceRoundToWithQueryAndTags} builds a list of ranges and associated tags base on the rounding points defined in a + * {@code RoundTo} function. It then rewrites the {@code EsQueryExec.query()} into a corresponding list of {@code QueryBuilder}s and tags, + * each mapped to its respective range. + * + * Here are some examples: + * + * 1. Aggregation with date_histogram. + * The {@code DATE_TRUNC} function in the query below can be rewritten to {@code RoundTo} by {@code ReplaceDateTruncBucketWithRoundTo}. + * This rule pushes down the {@code RoundTo} function by creating a list of {@code QueryBuilderAndTags}, so that + * {@code EsPhysicalOperationProviders} can build {@code LuceneSliceQueue} with the corresponding list of {@code QueryAndTags} to process + * further. + * | STATS COUNT(*) BY d = DATE_TRUNC(1 day, date) + * becomes, the rounding points are calculated according to SearchStats and predicates from the query. + * | EVAL d = ROUND_TO(hire_date, 1697760000000, 1697846400000, 1697932800000) + * | STATS COUNT(*) BY d + * becomes + * [QueryBuilderAndTags[query={ + * "esql_single_value" : { + * "field" : "date", + * "next" : { + * "range" : { + * "date" : { + * "lt" : "2023-10-21T00:00:00.000Z", + * "time_zone" : "Z", + * "format" : "strict_date_optional_time", + * "boost" : 0.0 + * } + * } + * }, + * "source" : "date_trunc(1 day, date)@2:25" + * } + * }, tags=[1697760000000]], QueryBuilderAndTags[query={ + * "esql_single_value" : { + * "field" : "date", + * "next" : { + * "range" : { + * "date" : { + * "gte" : "2023-10-21T00:00:00.000Z", + * "lt" : "2023-10-22T00:00:00.000Z", + * "time_zone" : "Z", + * "format" : "strict_date_optional_time", + * "boost" : 0.0 + * } + * } + * }, + * "source" : "date_trunc(1 day, date)@2:25" + * } + * }, tags=[1697846400000]], QueryBuilderAndTags[query={ + * "esql_single_value" : { + * "field" : "date", + * "next" : { + * "range" : { + * "date" : { + * "gte" : "2023-10-22T00:00:00.000Z", + * "time_zone" : "Z", + * "format" : "strict_date_optional_time", + * "boost" : 0.0 + * } + * } + * }, + * "source" : "date_trunc(1 day, date)@2:25" + * } + * }, tags=[1697932800000]], QueryBuilderAndTags[query={ + * "bool" : { + * "must_not" : [ + * { + * "exists" : { + * "field" : "date", + * "boost" : 0.0 + * } + * } + * ], + * "boost" : 1.0 + * } + * }, tags=[null]]] + * + * 2. Aggregation with date_histogram and the other pushdown functions + * When there are other functions that can also be pushed down to Lucene, this rule combines the main query with the {@code RoundTo} + * ranges to create a list of {@code QueryBuilderAndTags}. The main query is then applied to each query leg. + * | WHERE keyword : "keyword" + * | STATS COUNT(*) BY d = DATE_TRUNC(1 day, date) + * becomes + * | EVAL d = ROUND_TO(hire_date, 1697760000000, 1697846400000, 1697932800000) + * | STATS COUNT(*) BY d + * becomes + * [QueryBuilderAndTags[query={ + * "bool" : { + * "filter" : [ + * { + * "match" : { + * "keyword" : { + * "query" : "keyword", + * "lenient" : true + * } + * } + * }, + * { + * "esql_single_value" : { + * "field" : "date", + * "next" : { + * "range" : { + * "date" : { + * "lt" : "2023-10-21T00:00:00.000Z", + * "time_zone" : "Z", + * "format" : "strict_date_optional_time", + * "boost" : 0.0 + * } + * } + * }, + * "source" : "date_trunc(1 day, date)@3:25" + * } + * } + * ], + * "boost" : 1.0 + * } + * }, tags=[1697760000000]], QueryBuilderAndTags[query={ + * "bool" : { + * "filter" : [ + * { + * "match" : { + * "keyword" : { + * "query" : "keyword", + * "lenient" : true + * } + * } + * }, + * { + * "esql_single_value" : { + * "field" : "date", + * "next" : { + * "range" : { + * "date" : { + * "gte" : "2023-10-21T00:00:00.000Z", + * "lt" : "2023-10-22T00:00:00.000Z", + * "time_zone" : "Z", + * "format" : "strict_date_optional_time", + * "boost" : 0.0 + * } + * } + * }, + * "source" : "date_trunc(1 day, date)@3:25" + * } + * } + * ], + * "boost" : 1.0 + * } + * }, tags=[1697846400000]], QueryBuilderAndTags[query={ + * "bool" : { + * "filter" : [ + * { + * "match" : { + * "keyword" : { + * "query" : "keyword", + * "lenient" : true + * } + * } + * }, + * { + * "esql_single_value" : { + * "field" : "date", + * "next" : { + * "range" : { + * "date" : { + * "gte" : "2023-10-22T00:00:00.000Z", + * "time_zone" : "Z", + * "format" : "strict_date_optional_time", + * "boost" : 0.0 + * } + * } + * }, + * "source" : "date_trunc(1 day, date)@3:25" + * } + * } + * ], + * "boost" : 1.0 + * } + * }, tags=[1697932800000]], QueryBuilderAndTags[query={ + * "bool" : { + * "filter" : [ + * { + * "match" : { + * "keyword" : { + * "query" : "keyword", + * "lenient" : true + * } + * } + * }, + * { + * "bool" : { + * "must_not" : [ + * { + * "exists" : { + * "field" : "date", + * "boost" : 0.0 + * } + * } + * ], + * "boost" : 0.0 + * } + * } + * ], + * "boost" : 1.0 + * } + * }, tags=[null]]] + * + * There are some restrictions: + * 1. Tags are not supported by {@code LuceneTopNSourceOperator}, if the sort is pushed down to Lucene, this rewrite does not apply. + * 2. Tags are not supported by {@code TimeSeriesSourceOperator}, this rewrite does not apply to timeseries indices. + * 3. Tags are not supported by {@code LuceneCountOperator}, this rewrite does not apply to {@code EsStatsQueryExec}, count with grouping + * is not supported by {@code EsStatsQueryExec} today. + */ +public class ReplaceRoundToWithQueryAndTags extends PhysicalOptimizerRules.ParameterizedOptimizerRule< + EvalExec, + LocalPhysicalOptimizerContext> { + + private static final Logger logger = LogManager.getLogger(ReplaceRoundToWithQueryAndTags.class); + + @Override + protected PhysicalPlan rule(EvalExec evalExec, LocalPhysicalOptimizerContext ctx) { + PhysicalPlan plan = evalExec; + // TimeSeriesSourceOperator and LuceneTopNSourceOperator do not support QueryAndTags, skip them + // Lookup join is not supported yet + if (evalExec.child() instanceof EsQueryExec queryExec && queryExec.canSubstituteRoundToWithQueryBuilderAndTags()) { + // Look for RoundTo and plan the push down for it. + List roundTos = evalExec.fields() + .stream() + .map(Alias::child) + .filter(RoundTo.class::isInstance) + .map(RoundTo.class::cast) + .toList(); + // It is not clear how to push down multiple RoundTos, dealing with multiple RoundTos is out of the scope of this PR. + if (roundTos.size() == 1) { + RoundTo roundTo = roundTos.get(0); + int count = roundTo.points().size(); + int roundingPointsUpperLimit = roundingPointsThreshold(ctx); + if (count > roundingPointsUpperLimit) { + logger.debug( + "Skipping RoundTo push down for [{}], as it has [{}] points, which is more than [{}]", + roundTo.source(), + count, + roundingPointsUpperLimit + ); + return evalExec; + } + plan = planRoundTo(roundTo, evalExec, queryExec, ctx); + } + } + return plan; + } + + /** + * Rewrite the {@code RoundTo} to a list of {@code QueryBuilderAndTags} as input to {@code EsPhysicalOperationProviders}. + */ + private static PhysicalPlan planRoundTo(RoundTo roundTo, EvalExec evalExec, EsQueryExec queryExec, LocalPhysicalOptimizerContext ctx) { + // Usually EsQueryExec has only one QueryBuilder, one Lucene query, without RoundTo push down. + // If the RoundTo can be pushed down, a list of QueryBuilders with tags will be added into EsQueryExec, and it will be sent to + // EsPhysicalOperationProviders.sourcePhysicalOperation to create a list of LuceneSliceQueue.QueryAndTags + List queryBuilderAndTags = queryBuilderAndTags(roundTo, queryExec, ctx); + if (queryBuilderAndTags == null || queryBuilderAndTags.isEmpty()) { + return evalExec; + } + + FieldAttribute fieldAttribute = (FieldAttribute) roundTo.field(); + String tagFieldName = Attribute.rawTemporaryName( + // $$fieldName$round_to$dateType + fieldAttribute.fieldName().string(), + "round_to", + roundTo.field().dataType().typeName() + ); + FieldAttribute tagField = new FieldAttribute( + roundTo.source(), + tagFieldName, + new EsField(tagFieldName, roundTo.dataType(), Map.of(), false, EsField.TimeSeriesFieldType.NONE) + ); + // Add new tag field to attributes/output + List newAttributes = new ArrayList<>(queryExec.attrs()); + newAttributes.add(tagField); + + // create a new EsQueryExec with newAttributes/output and queryBuilderAndTags + EsQueryExec queryExecWithTags = new EsQueryExec( + queryExec.source(), + queryExec.indexPattern(), + queryExec.indexMode(), + queryExec.indexNameWithModes(), + newAttributes, + queryExec.limit(), + queryExec.sorts(), + queryExec.estimatedRowSize(), + queryBuilderAndTags + ); + + // Replace RoundTo with new tag field in EvalExec + List updatedFields = evalExec.fields() + .stream() + .map(alias -> alias.child() instanceof RoundTo ? alias.replaceChild(tagField) : alias) + .toList(); + + return new EvalExec(evalExec.source(), queryExecWithTags, updatedFields); + } + + private static List queryBuilderAndTags( + RoundTo roundTo, + EsQueryExec queryExec, + LocalPhysicalOptimizerContext ctx + ) { + LucenePushdownPredicates pushdownPredicates = LucenePushdownPredicates.from(ctx.searchStats(), ctx.flags()); + Expression field = roundTo.field(); + if (pushdownPredicates.isPushableFieldAttribute(field) == false) { + return null; + } + List roundingPoints = roundTo.points(); + int count = roundingPoints.size(); + DataType dataType = roundTo.dataType(); + // sort rounding points + List points = resolveRoundingPoints(roundingPoints, dataType); + if (points.size() != count || points.isEmpty()) { + return null; + } + List queries = new ArrayList<>(count); + + Object tag = points.get(0); + if (points.size() == 1) { // if there is only one rounding point, just tag the main query + EsQueryExec.QueryBuilderAndTags queryBuilderAndTags = tagOnlyBucket(queryExec, tag); + queries.add(queryBuilderAndTags); + } else { + Source source = roundTo.source(); + Object lower = null; + Object upper = null; + Queries.Clause clause = queryExec.hasScoring() ? Queries.Clause.MUST : Queries.Clause.FILTER; + ZoneId zoneId = ctx.configuration().zoneId(); + for (int i = 1; i < count; i++) { + upper = points.get(i); + // build predicates and range queries for RoundTo ranges + queries.add(rangeBucket(source, field, dataType, lower, upper, tag, zoneId, queryExec, pushdownPredicates, clause)); + lower = upper; + tag = upper; + } + // build the last/gte bucket + queries.add(rangeBucket(source, field, dataType, lower, null, lower, zoneId, queryExec, pushdownPredicates, clause)); + // build null bucket + queries.add(nullBucket(source, field, queryExec, pushdownPredicates, clause)); + } + return queries; + } + + private static List resolveRoundingPoints(List roundingPoints, DataType dataType) { + List points = new ArrayList<>(roundingPoints.size()); + for (Expression e : roundingPoints) { + if (e instanceof Literal l && l.value() instanceof Number n) { + switch (dataType) { + case INTEGER -> points.add(n.intValue()); + case LONG, DATETIME, DATE_NANOS -> points.add(safeToLong(n)); + case DOUBLE -> points.add(n.doubleValue()); + // this should not happen, as RoundTo type resolution will fail with the other data types + default -> throw new IllegalArgumentException("Unsupported data type: " + dataType); + } + } + } + return RoundTo.sortedRoundingPoints(points, dataType); + } + + private static Expression createRangeExpression( + Source source, + Expression field, + DataType dataType, + Object lower, + Object upper, + ZoneId zoneId + ) { + Literal lowerValue = new Literal(source, lower, dataType); + Literal upperValue = new Literal(source, upper, dataType); + if (lower == null) { + return new LessThan(source, field, upperValue, zoneId); + } else if (upper == null) { + return new GreaterThanOrEqual(source, field, lowerValue, zoneId); + } else { + // lower and upper should not be both null + return new Range(source, field, lowerValue, true, upperValue, false, dataType.isDate() ? zoneId : null); + } + } + + private static EsQueryExec.QueryBuilderAndTags tagOnlyBucket(EsQueryExec queryExec, Object tag) { + return new EsQueryExec.QueryBuilderAndTags(queryExec.query(), List.of(tag)); + } + + private static EsQueryExec.QueryBuilderAndTags nullBucket( + Source source, + Expression field, + EsQueryExec queryExec, + LucenePushdownPredicates pushdownPredicates, + Queries.Clause clause + ) { + IsNull isNull = new IsNull(source, field); + List nullTags = new ArrayList<>(1); + nullTags.add(null); + return buildCombinedQueryAndTags(queryExec, pushdownPredicates, isNull, clause, nullTags); + } + + private static EsQueryExec.QueryBuilderAndTags rangeBucket( + Source source, + Expression field, + DataType dataType, + Object lower, + Object upper, + Object tag, + ZoneId zoneId, + EsQueryExec queryExec, + LucenePushdownPredicates pushdownPredicates, + Queries.Clause clause + ) { + Expression range = createRangeExpression(source, field, dataType, lower, upper, zoneId); + return buildCombinedQueryAndTags(queryExec, pushdownPredicates, range, clause, List.of(tag)); + } + + private static EsQueryExec.QueryBuilderAndTags buildCombinedQueryAndTags( + EsQueryExec queryExec, + LucenePushdownPredicates pushdownPredicates, + Expression expression, + Queries.Clause clause, + List tags + ) { + Query queryDSL = TRANSLATOR_HANDLER.asQuery(pushdownPredicates, expression); + QueryBuilder mainQuery = queryExec.query(); + QueryBuilder newQuery = queryDSL.toQueryBuilder(); + QueryBuilder combinedQuery = Queries.combine(clause, mainQuery != null ? List.of(mainQuery, newQuery) : List.of(newQuery)); + return new EsQueryExec.QueryBuilderAndTags(combinedQuery, tags); + } + + /** + * Get the rounding points upper limit for {@code RoundTo} pushdown from query level pragmas or cluster level flags. + * If the query level pragmas is set to -1(default), the cluster level flags will be used. + * If the query level pragmas is set to greater than or equals to 0, the query level pragmas will be used. + */ + private int roundingPointsThreshold(LocalPhysicalOptimizerContext ctx) { + int queryLevelRoundingPointsThreshold = ctx.configuration().pragmas().roundToPushDownThreshold(); + int clusterLevelRoundingPointsThreshold = ctx.flags().roundToPushdownThreshold(); + int roundingPointsThreshold; + if (queryLevelRoundingPointsThreshold == ROUNDTO_PUSHDOWN_THRESHOLD.getDefault(ctx.configuration().pragmas().getSettings())) { + roundingPointsThreshold = clusterLevelRoundingPointsThreshold; + } else { + roundingPointsThreshold = queryLevelRoundingPointsThreshold; + } + return roundingPointsThreshold; + } +} diff --git a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/optimizer/rules/physical/local/ReplaceSourceAttributes.java b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/optimizer/rules/physical/local/ReplaceSourceAttributes.java index 4730f561348c9..bc73b869d02e1 100644 --- a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/optimizer/rules/physical/local/ReplaceSourceAttributes.java +++ b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/optimizer/rules/physical/local/ReplaceSourceAttributes.java @@ -62,6 +62,16 @@ protected PhysicalPlan rule(EsSourceExec plan) { attributes.add(score); } - return new EsQueryExec(plan.source(), plan.indexPattern(), plan.indexMode(), plan.indexNameWithModes(), attributes, plan.query()); + return new EsQueryExec( + plan.source(), + plan.indexPattern(), + plan.indexMode(), + plan.indexNameWithModes(), + attributes, + null, + null, + null, + List.of(new EsQueryExec.QueryBuilderAndTags(plan.query(), List.of())) + ); } } diff --git a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plan/PlanWritables.java b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plan/PlanWritables.java index 9312f2abdf509..03c9957754b89 100644 --- a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plan/PlanWritables.java +++ b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plan/PlanWritables.java @@ -36,7 +36,6 @@ import org.elasticsearch.xpack.esql.plan.physical.AggregateExec; import org.elasticsearch.xpack.esql.plan.physical.DissectExec; import org.elasticsearch.xpack.esql.plan.physical.EnrichExec; -import org.elasticsearch.xpack.esql.plan.physical.EsQueryExec; import org.elasticsearch.xpack.esql.plan.physical.EsSourceExec; import org.elasticsearch.xpack.esql.plan.physical.EvalExec; import org.elasticsearch.xpack.esql.plan.physical.ExchangeExec; @@ -105,7 +104,6 @@ public static List physical() { CompletionExec.ENTRY, DissectExec.ENTRY, EnrichExec.ENTRY, - EsQueryExec.ENTRY, EsSourceExec.ENTRY, EvalExec.ENTRY, ExchangeExec.ENTRY, diff --git a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plan/physical/EsQueryExec.java b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plan/physical/EsQueryExec.java index f29554081ed01..36886b0e58d9b 100644 --- a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plan/physical/EsQueryExec.java +++ b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plan/physical/EsQueryExec.java @@ -7,9 +7,6 @@ package org.elasticsearch.xpack.esql.plan.physical; -import org.elasticsearch.common.Strings; -import org.elasticsearch.common.io.stream.NamedWriteableRegistry; -import org.elasticsearch.common.io.stream.StreamInput; import org.elasticsearch.common.io.stream.StreamOutput; import org.elasticsearch.index.IndexMode; import org.elasticsearch.index.query.QueryBuilder; @@ -28,24 +25,13 @@ import org.elasticsearch.xpack.esql.core.type.DataType; import org.elasticsearch.xpack.esql.core.type.EsField; import org.elasticsearch.xpack.esql.expression.Order; -import org.elasticsearch.xpack.esql.index.EsIndex; -import org.elasticsearch.xpack.esql.io.stream.PlanStreamInput; -import org.elasticsearch.xpack.esql.plan.logical.EsRelation; import java.io.IOException; import java.util.List; import java.util.Map; import java.util.Objects; -import static org.elasticsearch.TransportVersions.ESQL_SKIP_ES_INDEX_SERIALIZATION; - public class EsQueryExec extends LeafExec implements EstimatesRowSize { - public static final NamedWriteableRegistry.Entry ENTRY = new NamedWriteableRegistry.Entry( - PhysicalPlan.class, - "EsQueryExec", - EsQueryExec::readFrom - ); - public static final EsField DOC_ID_FIELD = new EsField( "_doc", DataType.DOC_DATA_TYPE, @@ -53,13 +39,11 @@ public class EsQueryExec extends LeafExec implements EstimatesRowSize { false, EsField.TimeSeriesFieldType.NONE ); - public static final List NO_SORTS = List.of(); // only exists to mimic older serialization, but we no longer serialize sorts private final String indexPattern; private final IndexMode indexMode; private final Map indexNameWithModes; private final List attrs; - private final QueryBuilder query; private final Expression limit; private final List sorts; @@ -69,6 +53,16 @@ public class EsQueryExec extends LeafExec implements EstimatesRowSize { */ private final Integer estimatedRowSize; + /** + * queryBuilderAndTags may contain one or multiple {@code QueryBuilder}s, it is built on data node. + * If there is one {@code RoundTo} function in the query plan, {@code ReplaceRoundToWithQueryAndTags} rule will build multiple + * {@code QueryBuilder}s in the list, otherwise it is expected to contain only one {@code QueryBuilder} and its tag is + * null. + * It will be used by {@code EsPhysicalOperationProviders}.{@code sourcePhysicalOperation} to create + * {@code LuceneSliceQueue.QueryAndTags} + */ + private final List queryBuilderAndTags; + public interface Sort { SortBuilder sortBuilder(); @@ -86,14 +80,6 @@ public SortBuilder sortBuilder() { builder.unmappedType(field.dataType().esType()); return builder; } - - private static FieldSort readFrom(StreamInput in) throws IOException { - return new EsQueryExec.FieldSort( - FieldAttribute.readFrom(in), - in.readEnum(Order.OrderDirection.class), - in.readEnum(Order.NullsPosition.class) - ); - } } public record GeoDistanceSort(FieldAttribute field, Order.OrderDirection direction, double lat, double lon) implements Sort { @@ -118,16 +104,12 @@ public FieldAttribute field() { } } - public EsQueryExec( - Source source, - String indexPattern, - IndexMode indexMode, - Map indexNameWithModes, - List attributes, - QueryBuilder query - ) { - this(source, indexPattern, indexMode, indexNameWithModes, attributes, query, null, null, null); - } + public record QueryBuilderAndTags(QueryBuilder query, List tags) { + @Override + public String toString() { + return "QueryBuilderAndTags{" + "queryBuilder=[" + query + "], tags=" + tags.toString() + "}"; + } + }; public EsQueryExec( Source source, @@ -135,76 +117,31 @@ public EsQueryExec( IndexMode indexMode, Map indexNameWithModes, List attrs, - QueryBuilder query, Expression limit, List sorts, - Integer estimatedRowSize + Integer estimatedRowSize, + List queryBuilderAndTags ) { super(source); this.indexPattern = indexPattern; this.indexMode = indexMode; this.indexNameWithModes = indexNameWithModes; this.attrs = attrs; - this.query = query; this.limit = limit; this.sorts = sorts; this.estimatedRowSize = estimatedRowSize; - } - - /** - * The matching constructor is used during physical plan optimization and needs valid sorts. But we no longer serialize sorts. - * If this cluster node is talking to an older instance it might receive a plan with sorts, but it will ignore them. - */ - private static EsQueryExec readFrom(StreamInput in) throws IOException { - var source = Source.readFrom((PlanStreamInput) in); - String indexPattern; - Map indexNameWithModes; - if (in.getTransportVersion().onOrAfter(ESQL_SKIP_ES_INDEX_SERIALIZATION)) { - indexPattern = in.readString(); - indexNameWithModes = in.readMap(IndexMode::readFrom); - } else { - var index = EsIndex.readFrom(in); - indexPattern = index.name(); - indexNameWithModes = index.indexNameWithModes(); - } - var indexMode = EsRelation.readIndexMode(in); - var attrs = in.readNamedWriteableCollectionAsList(Attribute.class); - var query = in.readOptionalNamedWriteable(QueryBuilder.class); - var limit = in.readOptionalNamedWriteable(Expression.class); - in.readOptionalCollectionAsList(EsQueryExec::readSort); - var rowSize = in.readOptionalVInt(); - // Ignore sorts from the old serialization format - return new EsQueryExec(source, indexPattern, indexMode, indexNameWithModes, attrs, query, limit, NO_SORTS, rowSize); - } - - private static Sort readSort(StreamInput in) throws IOException { - return FieldSort.readFrom(in); - } - - private static void writeSort(StreamOutput out, Sort sort) { - throw new IllegalStateException("sorts are no longer serialized"); + // cannot keep the ctor with QueryBuilder as it has the same number of arguments as this ctor, EsqlNodeSubclassTests will fail + this.queryBuilderAndTags = queryBuilderAndTags; } @Override public void writeTo(StreamOutput out) throws IOException { - Source.EMPTY.writeTo(out); - if (out.getTransportVersion().onOrAfter(ESQL_SKIP_ES_INDEX_SERIALIZATION)) { - out.writeString(indexPattern); - out.writeMap(indexNameWithModes, (o, v) -> IndexMode.writeTo(v, out)); - } else { - new EsIndex(indexPattern, Map.of(), indexNameWithModes).writeTo(out); - } - EsRelation.writeIndexMode(out, indexMode()); - out.writeNamedWriteableCollection(output()); - out.writeOptionalNamedWriteable(query()); - out.writeOptionalNamedWriteable(limit()); - out.writeOptionalCollection(NO_SORTS, EsQueryExec::writeSort); - out.writeOptionalVInt(estimatedRowSize()); + throw new UnsupportedOperationException("not serialized"); } @Override public String getWriteableName() { - return ENTRY.name; + throw new UnsupportedOperationException("not serialized"); } public static boolean isSourceAttribute(Attribute attr) { @@ -229,10 +166,10 @@ protected NodeInfo info() { indexMode, indexNameWithModes, attrs, - query, limit, sorts, - estimatedRowSize + estimatedRowSize, + queryBuilderAndTags ); } @@ -248,8 +185,13 @@ public Map indexNameWithModes() { return indexNameWithModes; } + /** + * query is merged into queryBuilderAndTags, keep this method as it is called by many many places in both tests and product code. + * If this method is called, the caller looks for the original queryBuilder, before {@code ReplaceRoundToWithQueryAndTags} converts it + * to multiple queries with tags. + */ public QueryBuilder query() { - return query; + return queryWithoutTag(); } @Override @@ -291,13 +233,23 @@ public PhysicalPlan estimateRowSize(State state) { } return Objects.equals(this.estimatedRowSize, size) ? this - : new EsQueryExec(source(), indexPattern, indexMode, indexNameWithModes, attrs, query, limit, sorts, size); + : new EsQueryExec(source(), indexPattern, indexMode, indexNameWithModes, attrs, limit, sorts, size, queryBuilderAndTags); } public EsQueryExec withLimit(Expression limit) { return Objects.equals(this.limit, limit) ? this - : new EsQueryExec(source(), indexPattern, indexMode, indexNameWithModes, attrs, query, limit, sorts, estimatedRowSize); + : new EsQueryExec( + source(), + indexPattern, + indexMode, + indexNameWithModes, + attrs, + limit, + sorts, + estimatedRowSize, + queryBuilderAndTags + ); } public boolean canPushSorts() { @@ -311,18 +263,79 @@ public EsQueryExec withSorts(List sorts) { } return Objects.equals(this.sorts, sorts) ? this - : new EsQueryExec(source(), indexPattern, indexMode, indexNameWithModes, attrs, query, limit, sorts, estimatedRowSize); + : new EsQueryExec( + source(), + indexPattern, + indexMode, + indexNameWithModes, + attrs, + limit, + sorts, + estimatedRowSize, + queryBuilderAndTags + ); } + /** + * query is merged into queryBuilderAndTags, keep this method as it is called by too many places. + * If this method is called, the caller looks for the original queryBuilder, before {@code ReplaceRoundToWithQueryAndTags} converts it + * to multiple queries with tags. + */ public EsQueryExec withQuery(QueryBuilder query) { - return Objects.equals(this.query, query) + QueryBuilder thisQuery = queryWithoutTag(); + return Objects.equals(thisQuery, query) ? this - : new EsQueryExec(source(), indexPattern, indexMode, indexNameWithModes, attrs, query, limit, sorts, estimatedRowSize); + : new EsQueryExec( + source(), + indexPattern, + indexMode, + indexNameWithModes, + attrs, + limit, + sorts, + estimatedRowSize, + List.of(new QueryBuilderAndTags(query, List.of())) + ); + } + + public List queryBuilderAndTags() { + return queryBuilderAndTags; + } + + public boolean canSubstituteRoundToWithQueryBuilderAndTags() { + // TimeSeriesSourceOperator and LuceneTopNSourceOperator do not support QueryAndTags + return indexMode != IndexMode.TIME_SERIES && (sorts == null || sorts.isEmpty()); + } + + /** + * Returns the original queryBuilder before {@code ReplaceRoundToWithQueryAndTags} converts it to multiple queryBuilder with tags. + * If we reach here, the caller is looking for the original query before the rule converts it. If there are multiple queries in + * queryBuilderAndTags or if the single query in queryBuilderAndTags already has a tag, that means + * {@code ReplaceRoundToWithQueryAndTags} already applied to the original query, the original query cannot be retrieved any more, + * exception will be thrown. + */ + private QueryBuilder queryWithoutTag() { + QueryBuilder queryWithoutTag; + if (queryBuilderAndTags == null || queryBuilderAndTags.isEmpty()) { + return null; + } else if (queryBuilderAndTags.size() == 1) { + QueryBuilderAndTags firstQuery = this.queryBuilderAndTags.get(0); + if (firstQuery.tags().isEmpty()) { + queryWithoutTag = firstQuery.query(); + } else { + throw new UnsupportedOperationException("query is converted to query with tags: " + "[" + firstQuery + "]"); + } + } else { + throw new UnsupportedOperationException( + "query is converted to multiple queries and tags: " + "[" + this.queryBuilderAndTags + "]" + ); + } + return queryWithoutTag; } @Override public int hashCode() { - return Objects.hash(indexPattern, indexMode, indexNameWithModes, attrs, query, limit, sorts); + return Objects.hash(indexPattern, indexMode, indexNameWithModes, attrs, limit, sorts, queryBuilderAndTags); } @Override @@ -340,10 +353,10 @@ public boolean equals(Object obj) { && Objects.equals(indexMode, other.indexMode) && Objects.equals(indexNameWithModes, other.indexNameWithModes) && Objects.equals(attrs, other.attrs) - && Objects.equals(query, other.query) && Objects.equals(limit, other.limit) && Objects.equals(sorts, other.sorts) - && Objects.equals(estimatedRowSize, other.estimatedRowSize); + && Objects.equals(estimatedRowSize, other.estimatedRowSize) + && Objects.equals(queryBuilderAndTags, other.queryBuilderAndTags); } @Override @@ -355,9 +368,6 @@ public String nodeString() { + "indexMode[" + indexMode + "], " - + "query[" - + (query != null ? Strings.toString(query, false, true) : "") - + "]" + NodeUtils.limitedToString(attrs) + ", limit[" + (limit != null ? limit.toString() : "") @@ -365,6 +375,8 @@ public String nodeString() { + (sorts != null ? sorts.toString() : "") + "] estimatedRowSize[" + estimatedRowSize + + "] queryBuilderAndTags [" + + (queryBuilderAndTags != null ? queryBuilderAndTags.toString() : "") + "]"; } diff --git a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/planner/EsPhysicalOperationProviders.java b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/planner/EsPhysicalOperationProviders.java index 5b32cdbbacdc9..9520f68580731 100644 --- a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/planner/EsPhysicalOperationProviders.java +++ b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/planner/EsPhysicalOperationProviders.java @@ -266,6 +266,18 @@ public Function List.of(new LuceneSliceQueue.QueryAndTags(shardContexts.get(ctx.index()).toQuery(qb), List.of())); } + public Function> querySupplier( + List queryAndTagsFromEsQueryExec + ) { + return ctx -> queryAndTagsFromEsQueryExec.stream().map(queryBuilderAndTags -> { + QueryBuilder qb = queryBuilderAndTags.query(); + return new LuceneSliceQueue.QueryAndTags( + shardContexts.get(ctx.index()).toQuery(qb == null ? QueryBuilders.matchAllQuery().boost(0.0f) : qb), + queryBuilderAndTags.tags() + ); + }).toList(); + } + @Override public final PhysicalOperation sourcePhysicalOperation(EsQueryExec esQueryExec, LocalExecutionPlannerContext context) { if (esQueryExec.indexMode() == IndexMode.TIME_SERIES) { @@ -285,6 +297,8 @@ public final PhysicalOperation sourcePhysicalOperation(EsQueryExec esQueryExec, for (Sort sort : sorts) { sortBuilders.add(sort.sortBuilder()); } + // LuceneTopNSourceOperator does not support QueryAndTags, if there are multiple queries or if the single query has tags, + // UnsupportedOperationException will be thrown by esQueryExec.query() luceneFactory = new LuceneTopNSourceOperator.Factory( shardContexts, querySupplier(esQueryExec.query()), @@ -298,7 +312,7 @@ public final PhysicalOperation sourcePhysicalOperation(EsQueryExec esQueryExec, } else { luceneFactory = new LuceneSourceOperator.Factory( shardContexts, - querySupplier(esQueryExec.query()), + querySupplier(esQueryExec.queryBuilderAndTags()), context.queryPragmas().dataPartitioning(physicalSettings.defaultDataPartitioning()), context.autoPartitioningStrategy().get(), context.queryPragmas().taskConcurrency(), @@ -335,7 +349,7 @@ List extractFields(FieldExtractExec fieldE List fieldInfos = new ArrayList<>(attributes.size()); Set nullsFilteredFields = new HashSet<>(); fieldExtractExec.forEachDown(EsQueryExec.class, queryExec -> { - QueryBuilder q = queryExec.query(); + QueryBuilder q = queryExec.queryBuilderAndTags().get(0).query(); if (q != null) { nullsFilteredFields.addAll(nullsFilteredFieldsAfterSourceQuery(q)); } diff --git a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plugin/EsqlFlags.java b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plugin/EsqlFlags.java index 09ef93d56e9ee..a7eacf383fc73 100644 --- a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plugin/EsqlFlags.java +++ b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plugin/EsqlFlags.java @@ -9,6 +9,7 @@ import org.elasticsearch.common.settings.ClusterSettings; import org.elasticsearch.common.settings.Setting; +import org.elasticsearch.common.settings.Settings; public class EsqlFlags { public static final Setting ESQL_STRING_LIKE_ON_INDEX = Setting.boolSetting( @@ -18,17 +19,65 @@ public class EsqlFlags { Setting.Property.Dynamic ); + /** + * The maximum number of rounding points to push down to Lucene for the {@code roundTo} function at cluster level. + * {@code ReplaceRoundToWithQueryAndTags} checks this threshold before rewriting {@code RoundTo} to range queries. + * + * There is also a query level ROUNDTO_PUSHDOWN_THRESHOLD defined in {@code QueryPragmas}. + * The cluster level threshold defaults to 127, it is the same as the maximum number of buckets used in {@code Rounding}. + * The query level threshold defaults to -1, which means this query level setting is not set and cluster level upper limit will be used. + * If query level threshold is set to greater than or equals to 0, the query level threshold will be used, and it overrides the cluster + * level threshold. + * + * If the cluster level threshold is set to -1 or 0, no {@code RoundTo} pushdown will be performed, query level threshold is not set to + * -1 or 0. + */ + public static final Setting ESQL_ROUNDTO_PUSHDOWN_THRESHOLD = Setting.intSetting( + "esql.query.roundto_pushdown_threshold", + 127, + -1, + Setting.Property.NodeScope, + Setting.Property.Dynamic + ); + private final boolean stringLikeOnIndex; + private final int roundToPushdownThreshold; + + /** + * Constructor for tests. + */ public EsqlFlags(boolean stringLikeOnIndex) { this.stringLikeOnIndex = stringLikeOnIndex; + this.roundToPushdownThreshold = ESQL_ROUNDTO_PUSHDOWN_THRESHOLD.getDefault(Settings.EMPTY); + } + + /** + * Constructor for tests. + */ + public EsqlFlags(int roundToPushdownThreshold) { + this.stringLikeOnIndex = ESQL_STRING_LIKE_ON_INDEX.getDefault(Settings.EMPTY); + this.roundToPushdownThreshold = roundToPushdownThreshold; + } + + /** + * Constructor for tests. + */ + public EsqlFlags(boolean stringLikeOnIndex, int roundToPushdownThreshold) { + this.stringLikeOnIndex = stringLikeOnIndex; + this.roundToPushdownThreshold = roundToPushdownThreshold; } public EsqlFlags(ClusterSettings settings) { this.stringLikeOnIndex = settings.get(ESQL_STRING_LIKE_ON_INDEX); + this.roundToPushdownThreshold = settings.get(ESQL_ROUNDTO_PUSHDOWN_THRESHOLD); } public boolean stringLikeOnIndex() { return stringLikeOnIndex; } + + public int roundToPushdownThreshold() { + return roundToPushdownThreshold; + } } diff --git a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plugin/EsqlPlugin.java b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plugin/EsqlPlugin.java index 64e205e68d6fe..abd19b92366f8 100644 --- a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plugin/EsqlPlugin.java +++ b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plugin/EsqlPlugin.java @@ -260,7 +260,8 @@ public List> getSettings() { PhysicalSettings.DEFAULT_DATA_PARTITIONING, PhysicalSettings.VALUES_LOADING_JUMBO_SIZE, STORED_FIELDS_SEQUENTIAL_PROPORTION, - EsqlFlags.ESQL_STRING_LIKE_ON_INDEX + EsqlFlags.ESQL_STRING_LIKE_ON_INDEX, + EsqlFlags.ESQL_ROUNDTO_PUSHDOWN_THRESHOLD ); } diff --git a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plugin/QueryPragmas.java b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plugin/QueryPragmas.java index bdd0e382c3fd3..cafe113f91f9a 100644 --- a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plugin/QueryPragmas.java +++ b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plugin/QueryPragmas.java @@ -80,6 +80,20 @@ public final class QueryPragmas implements Writeable { MappedFieldType.FieldExtractPreference.NONE ); + /** + * The maximum number of rounding points to push down to Lucene for the {@code roundTo} function at query level. + * {@code ReplaceRoundToWithQueryAndTags} checks this threshold before rewriting {@code RoundTo} to range queries. + * + * There is also a cluster level ESQL_ROUNDTO_PUSHDOWN_THRESHOLD defined in {@code EsqlFlags}. + * The query level threshold defaults to -1, which means this query level setting is not set and cluster level upper limit will be used. + * The cluster level threshold defaults to 127, it is the same as the maximum number of buckets used in {@code Rounding}. + * If query level threshold is set to greater than or equals to 0, the query level threshold will be used, and it overrides the cluster + * level threshold. + * + * If the query level threshold is set to 0, no {@code RoundTo} pushdown will be performed. + */ + public static final Setting ROUNDTO_PUSHDOWN_THRESHOLD = Setting.intSetting("roundto_pushdown_threshold", -1, -1); + public static final QueryPragmas EMPTY = new QueryPragmas(Settings.EMPTY); private final Settings settings; @@ -197,6 +211,10 @@ public MappedFieldType.FieldExtractPreference fieldExtractPreference() { return FIELD_EXTRACT_PREFERENCE.get(settings); } + public int roundToPushDownThreshold() { + return ROUNDTO_PUSHDOWN_THRESHOLD.get(settings); + } + public boolean isEmpty() { return settings.isEmpty(); } diff --git a/x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/optimizer/LocalPhysicalPlanOptimizerTests.java b/x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/optimizer/LocalPhysicalPlanOptimizerTests.java index f50b83833198f..9b161388d6cc3 100644 --- a/x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/optimizer/LocalPhysicalPlanOptimizerTests.java +++ b/x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/optimizer/LocalPhysicalPlanOptimizerTests.java @@ -172,7 +172,7 @@ public class LocalPhysicalPlanOptimizerTests extends MapperServiceTestCase { public static final String MATCH_OPERATOR_QUERY = "from test | where %s:%s"; public static final String MATCH_FUNCTION_QUERY = "from test | where match(%s, %s)"; - private TestPlannerOptimizer plannerOptimizer; + protected TestPlannerOptimizer plannerOptimizer; private TestPlannerOptimizer plannerOptimizerDateDateNanosUnionTypes; private Analyzer timeSeriesAnalyzer; private final Configuration config; @@ -261,7 +261,7 @@ private Analyzer makeAnalyzer(String mappingFileName, EnrichResolution enrichRes ); } - private Analyzer makeAnalyzer(String mappingFileName) { + protected Analyzer makeAnalyzer(String mappingFileName) { return makeAnalyzer(mappingFileName, new EnrichResolution()); } @@ -2510,7 +2510,7 @@ private boolean isMultiTypeEsField(Expression e) { return e instanceof FieldAttribute fa && fa.field() instanceof MultiTypeEsField; } - private QueryBuilder wrapWithSingleQuery(String query, QueryBuilder inner, String fieldName, Source source) { + protected static QueryBuilder wrapWithSingleQuery(String query, QueryBuilder inner, String fieldName, Source source) { return FilterTests.singleValueQuery(query, inner, fieldName, source); } diff --git a/x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/optimizer/TestPlannerOptimizer.java b/x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/optimizer/TestPlannerOptimizer.java index 761902bebe19e..4bd6fa6737041 100644 --- a/x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/optimizer/TestPlannerOptimizer.java +++ b/x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/optimizer/TestPlannerOptimizer.java @@ -55,7 +55,15 @@ public PhysicalPlan plan(String query, SearchStats stats, Analyzer analyzer) { return physical; } + public PhysicalPlan plan(String query, SearchStats stats, EsqlFlags esqlFlags) { + return optimizedPlan(physicalPlan(query, analyzer), stats, esqlFlags); + } + private PhysicalPlan optimizedPlan(PhysicalPlan plan, SearchStats searchStats) { + return optimizedPlan(plan, searchStats, new EsqlFlags(true)); + } + + private PhysicalPlan optimizedPlan(PhysicalPlan plan, SearchStats searchStats, EsqlFlags esqlFlags) { // System.out.println("* Physical Before\n" + plan); var physicalPlan = EstimatesRowSize.estimateRowSize(0, physicalPlanOptimizer.optimize(plan)); // System.out.println("* Physical After\n" + physicalPlan); @@ -67,7 +75,7 @@ private PhysicalPlan optimizedPlan(PhysicalPlan plan, SearchStats searchStats) { new LocalLogicalOptimizerContext(config, FoldContext.small(), searchStats) ); var physicalTestOptimizer = new TestLocalPhysicalPlanOptimizer( - new LocalPhysicalOptimizerContext(new EsqlFlags(true), config, FoldContext.small(), searchStats), + new LocalPhysicalOptimizerContext(esqlFlags, config, FoldContext.small(), searchStats), true ); var l = PlannerUtils.localPlan(physicalPlan, logicalTestOptimizer, physicalTestOptimizer); diff --git a/x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/optimizer/rules/logical/local/ReplaceDateTruncBucketWithRoundToTests.java b/x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/optimizer/rules/logical/local/ReplaceDateTruncBucketWithRoundToTests.java index 225cab8330b84..aef6ba3c2abab 100644 --- a/x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/optimizer/rules/logical/local/ReplaceDateTruncBucketWithRoundToTests.java +++ b/x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/optimizer/rules/logical/local/ReplaceDateTruncBucketWithRoundToTests.java @@ -8,7 +8,6 @@ package org.elasticsearch.xpack.esql.optimizer.rules.logical.local; import org.elasticsearch.common.logging.LoggerMessageFormat; -import org.elasticsearch.test.junit.annotations.TestLogging; import org.elasticsearch.xpack.esql.EsqlTestUtils; import org.elasticsearch.xpack.esql.core.expression.Alias; import org.elasticsearch.xpack.esql.core.expression.Expression; @@ -34,7 +33,7 @@ import static org.elasticsearch.xpack.esql.EsqlTestUtils.as; import static org.elasticsearch.xpack.esql.core.type.DataType.DATETIME; -@TestLogging(value = "org.elasticsearch.xpack.esql:TRACE", reason = "debug") +//@TestLogging(value = "org.elasticsearch.xpack.esql:TRACE", reason = "debug") public class ReplaceDateTruncBucketWithRoundToTests extends LocalLogicalPlanOptimizerTests { // Key is the predicate, diff --git a/x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/optimizer/rules/physical/local/PushTopNToSourceTests.java b/x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/optimizer/rules/physical/local/PushTopNToSourceTests.java index 15fc4d75f4f13..8e99e412338be 100644 --- a/x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/optimizer/rules/physical/local/PushTopNToSourceTests.java +++ b/x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/optimizer/rules/physical/local/PushTopNToSourceTests.java @@ -585,7 +585,17 @@ public TestPhysicalPlanBuilder limit(int limit) { public TopNExec build() { List attributes = new ArrayList<>(fields.values()); - PhysicalPlan child = new EsQueryExec(Source.EMPTY, this.index, indexMode, Map.of(), attributes, null, null, List.of(), 0); + PhysicalPlan child = new EsQueryExec( + Source.EMPTY, + this.index, + indexMode, + Map.of(), + attributes, + null, + List.of(), + 0, + List.of(new EsQueryExec.QueryBuilderAndTags(null, List.of())) + ); if (aliases.isEmpty() == false) { child = new EvalExec(Source.EMPTY, child, aliases); } diff --git a/x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/optimizer/rules/physical/local/ReplaceRoundToWithQueryAndTagsTests.java b/x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/optimizer/rules/physical/local/ReplaceRoundToWithQueryAndTagsTests.java new file mode 100644 index 0000000000000..79c484c916af5 --- /dev/null +++ b/x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/optimizer/rules/physical/local/ReplaceRoundToWithQueryAndTagsTests.java @@ -0,0 +1,641 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License + * 2.0; you may not use this file except in compliance with the Elastic License + * 2.0. + */ + +package org.elasticsearch.xpack.esql.optimizer.rules.physical.local; + +import org.elasticsearch.common.logging.LoggerMessageFormat; +import org.elasticsearch.common.settings.Settings; +import org.elasticsearch.index.query.BoolQueryBuilder; +import org.elasticsearch.index.query.MatchQueryBuilder; +import org.elasticsearch.index.query.QueryBuilder; +import org.elasticsearch.index.query.RangeQueryBuilder; +import org.elasticsearch.xpack.esql.EsqlTestUtils; +import org.elasticsearch.xpack.esql.core.expression.Alias; +import org.elasticsearch.xpack.esql.core.expression.Attribute; +import org.elasticsearch.xpack.esql.core.expression.Expression; +import org.elasticsearch.xpack.esql.core.expression.Expressions; +import org.elasticsearch.xpack.esql.core.expression.FieldAttribute; +import org.elasticsearch.xpack.esql.core.expression.NamedExpression; +import org.elasticsearch.xpack.esql.core.expression.ReferenceAttribute; +import org.elasticsearch.xpack.esql.core.expression.function.Function; +import org.elasticsearch.xpack.esql.core.tree.Source; +import org.elasticsearch.xpack.esql.core.type.DataType; +import org.elasticsearch.xpack.esql.expression.function.grouping.Bucket; +import org.elasticsearch.xpack.esql.expression.function.scalar.date.DateTrunc; +import org.elasticsearch.xpack.esql.expression.function.scalar.math.RoundTo; +import org.elasticsearch.xpack.esql.optimizer.LocalPhysicalPlanOptimizerTests; +import org.elasticsearch.xpack.esql.optimizer.TestPlannerOptimizer; +import org.elasticsearch.xpack.esql.plan.physical.AggregateExec; +import org.elasticsearch.xpack.esql.plan.physical.EsQueryExec; +import org.elasticsearch.xpack.esql.plan.physical.EvalExec; +import org.elasticsearch.xpack.esql.plan.physical.ExchangeExec; +import org.elasticsearch.xpack.esql.plan.physical.FieldExtractExec; +import org.elasticsearch.xpack.esql.plan.physical.LimitExec; +import org.elasticsearch.xpack.esql.plan.physical.LookupJoinExec; +import org.elasticsearch.xpack.esql.plan.physical.MergeExec; +import org.elasticsearch.xpack.esql.plan.physical.PhysicalPlan; +import org.elasticsearch.xpack.esql.plan.physical.ProjectExec; +import org.elasticsearch.xpack.esql.plan.physical.TopNExec; +import org.elasticsearch.xpack.esql.plugin.EsqlFlags; +import org.elasticsearch.xpack.esql.plugin.QueryPragmas; +import org.elasticsearch.xpack.esql.session.Configuration; +import org.elasticsearch.xpack.esql.stats.SearchStats; + +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; +import java.util.Locale; +import java.util.Map; +import java.util.stream.Collectors; + +import static org.elasticsearch.compute.aggregation.AggregatorMode.FINAL; +import static org.elasticsearch.compute.aggregation.AggregatorMode.INITIAL; +import static org.elasticsearch.index.query.QueryBuilders.boolQuery; +import static org.elasticsearch.index.query.QueryBuilders.existsQuery; +import static org.elasticsearch.index.query.QueryBuilders.matchQuery; +import static org.elasticsearch.index.query.QueryBuilders.rangeQuery; +import static org.elasticsearch.index.query.QueryBuilders.termQuery; +import static org.elasticsearch.xpack.esql.EsqlTestUtils.as; +import static org.elasticsearch.xpack.esql.EsqlTestUtils.configuration; +import static org.elasticsearch.xpack.esql.type.EsqlDataTypeConverter.DEFAULT_DATE_NANOS_FORMATTER; +import static org.elasticsearch.xpack.esql.type.EsqlDataTypeConverter.DEFAULT_DATE_TIME_FORMATTER; +import static org.elasticsearch.xpack.esql.type.EsqlDataTypeConverter.dateNanosToLong; +import static org.elasticsearch.xpack.esql.type.EsqlDataTypeConverter.dateTimeToLong; +import static org.hamcrest.Matchers.is; + +//@TestLogging(value = "org.elasticsearch.xpack.esql:TRACE", reason = "debug") +public class ReplaceRoundToWithQueryAndTagsTests extends LocalPhysicalPlanOptimizerTests { + + public ReplaceRoundToWithQueryAndTagsTests(String name, Configuration config) { + super(name, config); + } + + private static final List dateHistograms = List.of( + "date_trunc(1 day, date)", + "bucket(date, 1 day)", + "round_to(date, \"2023-10-20\", \"2023-10-21\", \"2023-10-22\", \"2023-10-23\")" + ); + + private static final Map> roundToAllTypes = new HashMap<>( + Map.ofEntries( + Map.entry("byte", List.of(2, 1, 3, 4)), + Map.entry("short", List.of(1, 3, 2, 4)), + Map.entry("integer", List.of(1, 2, 3, 4)), + Map.entry("long", List.of(1697760000000L, 1697846400000L, 1697932800000L, 1698019200000L)), + Map.entry("float", List.of(3.0, 2.0, 1.0, 4.0)), + Map.entry("half_float", List.of(4.0, 2.0, 3.0, 1.0)), + Map.entry("scaled_float", List.of(4.0, 3.0, 2.0, 1.0)), + Map.entry("double", List.of(1.0, 2.0, 3.0, 4.0)), + Map.entry("date", List.of("\"2023-10-20\"::date", "\"2023-10-21\"::date", "\"2023-10-22\"::date", "\"2023-10-23\"::date")), + Map.entry( + "date_nanos", + List.of( + "\"2023-10-20\"::date_nanos", + "\"2023-10-21\"::date_nanos", + "\"2023-10-22\"::date_nanos", + "\"2023-10-23\"::date_nanos" + ) + ) + ) + ); + + private static final Map otherPushDownFunctions = new HashMap<>( + Map.ofEntries( + Map.entry("keyword == \"keyword\"", termQuery("keyword", "keyword").boost(0)), + Map.entry( + "date >= \"2023-10-19\" and date <= \"2023-10-24\"", + rangeQuery("date").gte("2023-10-19T00:00:00.000Z") + .lte("2023-10-24T00:00:00.000Z") + .timeZone("Z") + .boost(0) + .format(DEFAULT_DATE_TIME_FORMATTER.pattern()) + ), + Map.entry("keyword : \"keyword\"", matchQuery("keyword", "keyword").lenient(true)) + ) + ); + + // The date range of SearchStats is from 2023-10-20 to 2023-10-23. + private static final SearchStats searchStats = searchStats(); + + // DateTrunc/Bucket is transformed to RoundTo first and then to QueryAndTags + public void testDateTruncBucketTransformToQueryAndTags() { + for (String dateHistogram : dateHistograms) { + String query = LoggerMessageFormat.format(null, """ + from test + | stats count(*) by x = {} + """, dateHistogram); + PhysicalPlan plan = plannerOptimizer.plan(query, searchStats, makeAnalyzer("mapping-all-types.json")); + + LimitExec limit = as(plan, LimitExec.class); + AggregateExec agg = as(limit.child(), AggregateExec.class); + assertThat(agg.getMode(), is(FINAL)); + List groupings = agg.groupings(); + NamedExpression grouping = as(groupings.get(0), NamedExpression.class); + assertEquals("x", grouping.name()); + assertEquals(DataType.DATETIME, grouping.dataType()); + assertEquals(List.of("count(*)", "x"), Expressions.names(agg.aggregates())); + ExchangeExec exchange = as(agg.child(), ExchangeExec.class); + assertThat(exchange.inBetweenAggs(), is(true)); + agg = as(exchange.child(), AggregateExec.class); + EvalExec eval = as(agg.child(), EvalExec.class); + List aliases = eval.fields(); + assertEquals(1, aliases.size()); + FieldAttribute roundToTag = as(aliases.get(0).child(), FieldAttribute.class); + assertEquals("$$date$round_to$datetime", roundToTag.name()); + EsQueryExec esQueryExec = as(eval.child(), EsQueryExec.class); + List queryBuilderAndTags = esQueryExec.queryBuilderAndTags(); + List expectedQueryBuilderAndTags = expectedQueryBuilderAndTags( + query, + "date", + List.of(), + new Source(2, 24, dateHistogram), + null + ); + verifyQueryAndTags(expectedQueryBuilderAndTags, queryBuilderAndTags); + assertThrows(UnsupportedOperationException.class, esQueryExec::query); + } + } + + // DateTrunc is transformed to RoundTo first but cannot be transformed to QueryAndTags, when the TopN is pushed down to EsQueryExec + public void testDateTruncNotTransformToQueryAndTags() { + for (String dateHistogram : dateHistograms) { + if (dateHistogram.contains("bucket")) { // bucket cannot be used out side of stats + continue; + } + String query = LoggerMessageFormat.format(null, """ + from test + | sort date + | eval x = {} + | keep alias_integer, date, x + | limit 5 + """, dateHistogram); + + PhysicalPlan plan = plannerOptimizer.plan(query, searchStats, makeAnalyzer("mapping-all-types.json")); + + ProjectExec projectExec = as(plan, ProjectExec.class); + TopNExec topNExec = as(projectExec.child(), TopNExec.class); + ExchangeExec exchangeExec = as(topNExec.child(), ExchangeExec.class); + projectExec = as(exchangeExec.child(), ProjectExec.class); + FieldExtractExec fieldExtractExec = as(projectExec.child(), FieldExtractExec.class); + EvalExec evalExec = as(fieldExtractExec.child(), EvalExec.class); + List aliases = evalExec.fields(); + assertEquals(1, aliases.size()); + RoundTo roundTo = as(aliases.get(0).child(), RoundTo.class); + assertEquals(4, roundTo.points().size()); + fieldExtractExec = as(evalExec.child(), FieldExtractExec.class); + EsQueryExec esQueryExec = as(fieldExtractExec.child(), EsQueryExec.class); + List queryBuilderAndTags = esQueryExec.queryBuilderAndTags(); + assertEquals(1, queryBuilderAndTags.size()); + EsQueryExec.QueryBuilderAndTags queryBuilder = queryBuilderAndTags.get(0); + assertNull(queryBuilder.query()); + assertTrue(queryBuilder.tags().isEmpty()); + assertNull(esQueryExec.query()); + } + } + + // RoundTo(all numeric data types) is transformed to QueryAndTags + public void testRoundToTransformToQueryAndTags() { + for (Map.Entry> roundTo : roundToAllTypes.entrySet()) { + String fieldName = roundTo.getKey(); + List roundingPoints = roundTo.getValue(); + String expression = "round_to(" + + fieldName + + ", " + + roundingPoints.stream().map(Object::toString).collect(Collectors.joining(",")) + + ")"; + String query = LoggerMessageFormat.format(null, """ + from test + | stats count(*) by x = {} + """, expression); + PhysicalPlan plan = plannerOptimizer.plan(query, searchStats, makeAnalyzer("mapping-all-types.json")); + + LimitExec limit = as(plan, LimitExec.class); + AggregateExec agg = as(limit.child(), AggregateExec.class); + assertThat(agg.getMode(), is(FINAL)); + List groupings = agg.groupings(); + NamedExpression grouping = as(groupings.get(0), NamedExpression.class); + assertEquals("x", grouping.name()); + assertEquals(List.of("count(*)", "x"), Expressions.names(agg.aggregates())); + ExchangeExec exchange = as(agg.child(), ExchangeExec.class); + assertThat(exchange.inBetweenAggs(), is(true)); + agg = as(exchange.child(), AggregateExec.class); + EvalExec eval = as(agg.child(), EvalExec.class); + List aliases = eval.fields(); + assertEquals(1, aliases.size()); + FieldAttribute roundToTag = as(aliases.get(0).child(), FieldAttribute.class); + assertTrue(roundToTag.name().startsWith("$$" + fieldName + "$round_to$")); + EsQueryExec esQueryExec = as(eval.child(), EsQueryExec.class); + List queryBuilderAndTags = esQueryExec.queryBuilderAndTags(); + List expectedQueryBuilderAndTags = expectedQueryBuilderAndTags( + query, + fieldName, + roundingPoints, + new Source(2, 24, expression), + null + ); + verifyQueryAndTags(expectedQueryBuilderAndTags, queryBuilderAndTags); + assertThrows(UnsupportedOperationException.class, esQueryExec::query); + } + } + + // test if the combine query is generated correctly when there are other functions that can be pushed down + public void testDateTruncBucketTransformToQueryAndTagsWithOtherPushdownFunctions() { + for (String dateHistogram : dateHistograms) { + for (Map.Entry otherPushDownFunction : otherPushDownFunctions.entrySet()) { + String predicate = otherPushDownFunction.getKey(); + QueryBuilder qb = otherPushDownFunction.getValue(); + String query = LoggerMessageFormat.format(null, """ + from test + | where {} + | stats count(*) by x = {} + """, predicate, dateHistogram); + QueryBuilder mainQueryBuilder = qb instanceof MatchQueryBuilder + ? qb + : wrapWithSingleQuery( + query, + qb, + predicate.contains("and") ? "date" : "keyword", + new Source(2, 8, predicate.contains("and") ? predicate.substring(0, 20) : predicate) + ); + + PhysicalPlan plan = plannerOptimizer.plan(query, searchStats, makeAnalyzer("mapping-all-types.json")); + + LimitExec limit = as(plan, LimitExec.class); + AggregateExec agg = as(limit.child(), AggregateExec.class); + assertThat(agg.getMode(), is(FINAL)); + List groupings = agg.groupings(); + NamedExpression grouping = as(groupings.get(0), NamedExpression.class); + assertEquals("x", grouping.name()); + assertEquals(DataType.DATETIME, grouping.dataType()); + assertEquals(List.of("count(*)", "x"), Expressions.names(agg.aggregates())); + ExchangeExec exchange = as(agg.child(), ExchangeExec.class); + assertThat(exchange.inBetweenAggs(), is(true)); + agg = as(exchange.child(), AggregateExec.class); + EvalExec eval = as(agg.child(), EvalExec.class); + List aliases = eval.fields(); + assertEquals(1, aliases.size()); + FieldAttribute roundToTag = as(aliases.get(0).child(), FieldAttribute.class); + assertEquals("$$date$round_to$datetime", roundToTag.name()); + EsQueryExec esQueryExec = as(eval.child(), EsQueryExec.class); + List queryBuilderAndTags = esQueryExec.queryBuilderAndTags(); + List expectedQueryBuilderAndTags = expectedQueryBuilderAndTags( + query, + "date", + List.of(), + new Source(3, 24, dateHistogram), + mainQueryBuilder + ); + verifyQueryAndTags(expectedQueryBuilderAndTags, queryBuilderAndTags); + assertThrows(UnsupportedOperationException.class, esQueryExec::query); + } + } + } + + // ReplaceRoundToWithQueryAndTags does not support lookup joins yet + public void testDateTruncBucketNotTransformToQueryAndTagsWithLookupJoin() { + for (String dateHistogram : dateHistograms) { + String query = LoggerMessageFormat.format(null, """ + from test + | rename integer as language_code + | lookup join languages_lookup on language_code + | stats count(*) by x = {} + """, dateHistogram); + PhysicalPlan plan = plannerOptimizer.plan(query, searchStats, makeAnalyzer("mapping-all-types.json")); + + LimitExec limit = as(plan, LimitExec.class); + AggregateExec agg = as(limit.child(), AggregateExec.class); + assertThat(agg.getMode(), is(FINAL)); + List groupings = agg.groupings(); + NamedExpression grouping = as(groupings.get(0), NamedExpression.class); + assertEquals("x", grouping.name()); + assertEquals(DataType.DATETIME, grouping.dataType()); + assertEquals(List.of("count(*)", "x"), Expressions.names(agg.aggregates())); + ExchangeExec exchange = as(agg.child(), ExchangeExec.class); + assertThat(exchange.inBetweenAggs(), is(true)); + agg = as(exchange.child(), AggregateExec.class); + EvalExec eval = as(agg.child(), EvalExec.class); + List aliases = eval.fields(); + assertEquals(1, aliases.size()); + RoundTo roundTo = as(aliases.get(0).child(), RoundTo.class); + assertEquals(4, roundTo.points().size()); + FieldExtractExec fieldExtractExec = as(eval.child(), FieldExtractExec.class); + List attributes = fieldExtractExec.attributesToExtract(); + assertEquals(1, attributes.size()); + assertEquals("date", attributes.get(0).name()); + LookupJoinExec lookupJoinExec = as(fieldExtractExec.child(), LookupJoinExec.class); // this is why the rule doesn't apply + // lhs of lookup join + fieldExtractExec = as(lookupJoinExec.left(), FieldExtractExec.class); + attributes = fieldExtractExec.attributesToExtract(); + assertEquals(1, attributes.size()); + assertEquals("integer", attributes.get(0).name()); + EsQueryExec esQueryExec = as(fieldExtractExec.child(), EsQueryExec.class); + assertEquals("test", esQueryExec.indexPattern()); + List queryBuilderAndTags = esQueryExec.queryBuilderAndTags(); + assertEquals(1, queryBuilderAndTags.size()); + EsQueryExec.QueryBuilderAndTags queryBuilder = queryBuilderAndTags.get(0); + assertNull(queryBuilder.query()); + assertTrue(queryBuilder.tags().isEmpty()); + assertNull(esQueryExec.query()); + // rhs of lookup join + esQueryExec = as(lookupJoinExec.right(), EsQueryExec.class); + assertEquals("languages_lookup", esQueryExec.indexPattern()); + queryBuilderAndTags = esQueryExec.queryBuilderAndTags(); + assertEquals(1, queryBuilderAndTags.size()); + queryBuilder = queryBuilderAndTags.get(0); + assertNull(queryBuilder.query()); + assertTrue(queryBuilder.tags().isEmpty()); + assertNull(esQueryExec.query()); + } + } + + // ReplaceRoundToWithQueryAndTags does not support lookup joins yet + public void testDateTruncBucketNotTransformToQueryAndTagsWithFork() { + for (String dateHistogram : dateHistograms) { + String query = LoggerMessageFormat.format(null, """ + from test + | fork (where integer > 100) + (where keyword : "keyword") + | stats count(*) by x = {} + """, dateHistogram); + PhysicalPlan plan = plannerOptimizer.plan(query, searchStats, makeAnalyzer("mapping-all-types.json")); + + LimitExec limit = as(plan, LimitExec.class); + AggregateExec agg = as(limit.child(), AggregateExec.class); + assertThat(agg.getMode(), is(FINAL)); + List groupings = agg.groupings(); + NamedExpression grouping = as(groupings.get(0), NamedExpression.class); + assertEquals("x", grouping.name()); + assertEquals(DataType.DATETIME, grouping.dataType()); + assertEquals(List.of("count(*)", "x"), Expressions.names(agg.aggregates())); + agg = as(agg.child(), AggregateExec.class); + assertThat(agg.getMode(), is(INITIAL)); + groupings = agg.groupings(); + grouping = as(groupings.get(0), NamedExpression.class); + assertEquals("x", grouping.name()); + assertEquals(DataType.DATETIME, grouping.dataType()); + assertEquals(List.of("count(*)", "x"), Expressions.names(agg.aggregates())); + EvalExec eval = as(agg.child(), EvalExec.class); + List aliases = eval.fields(); + assertEquals(1, aliases.size()); + var function = as(aliases.get(0).child(), Function.class); + ReferenceAttribute fa = null; // if merge returns FieldAttribute instead of ReferenceAttribute, the rule might apply + if (function instanceof DateTrunc dateTrunc) { + fa = as(dateTrunc.field(), ReferenceAttribute.class); + } else if (function instanceof Bucket bucket) { + fa = as(bucket.field(), ReferenceAttribute.class); + } else if (function instanceof RoundTo roundTo) { + fa = as(roundTo.field(), ReferenceAttribute.class); + } + assertNotNull(fa); + assertEquals("date", fa.name()); + assertEquals(DataType.DATETIME, fa.dataType()); + MergeExec mergeExec = as(eval.child(), MergeExec.class); + } + } + + /** + * If the number of rounding points is 127 or less, the query is rewritten to QueryAndTags. + * If the number of rounding points is 128 or more, the query is not rewritten. + */ + public void testRoundToTransformToQueryAndTagsWithDefaultUpperLimit() { + for (int numOfPoints : List.of(127, 128)) { + StringBuilder points = new StringBuilder(); + for (int i = 0; i < numOfPoints; i++) { + if (i > 0) { + points.append(", "); + } + points.append(i); + } + String query = LoggerMessageFormat.format(null, """ + from test + | stats count(*) by x = round_to(integer, {}) + """, points.toString()); + + PhysicalPlan plan = plannerOptimizer.plan(query, searchStats, makeAnalyzer("mapping-all-types.json")); + + LimitExec limit = as(plan, LimitExec.class); + AggregateExec agg = as(limit.child(), AggregateExec.class); + assertThat(agg.getMode(), is(FINAL)); + List groupings = agg.groupings(); + NamedExpression grouping = as(groupings.get(0), NamedExpression.class); + assertEquals("x", grouping.name()); + assertEquals(DataType.INTEGER, grouping.dataType()); + assertEquals(List.of("count(*)", "x"), Expressions.names(agg.aggregates())); + ExchangeExec exchange = as(agg.child(), ExchangeExec.class); + assertThat(exchange.inBetweenAggs(), is(true)); + agg = as(exchange.child(), AggregateExec.class); + EvalExec evalExec = as(agg.child(), EvalExec.class); + List aliases = evalExec.fields(); + assertEquals(1, aliases.size()); + if (numOfPoints == 127) { + FieldAttribute roundToTag = as(aliases.get(0).child(), FieldAttribute.class); + assertTrue(roundToTag.name().startsWith("$$integer$round_to$")); + EsQueryExec esQueryExec = as(evalExec.child(), EsQueryExec.class); + List queryBuilderAndTags = esQueryExec.queryBuilderAndTags(); + assertEquals(128, queryBuilderAndTags.size()); // 127 + nullBucket + assertThrows(UnsupportedOperationException.class, esQueryExec::query); + } else { // numOfPoints == 128, query rewrite does not happen + RoundTo roundTo = as(aliases.get(0).child(), RoundTo.class); + assertEquals(128, roundTo.points().size()); + FieldExtractExec fieldExtractExec = as(evalExec.child(), FieldExtractExec.class); + EsQueryExec esQueryExec = as(fieldExtractExec.child(), EsQueryExec.class); + List queryBuilderAndTags = esQueryExec.queryBuilderAndTags(); + assertEquals(1, queryBuilderAndTags.size()); + EsQueryExec.QueryBuilderAndTags queryBuilder = queryBuilderAndTags.get(0); + assertNull(queryBuilder.query()); + assertTrue(queryBuilder.tags().isEmpty()); + assertNull(esQueryExec.query()); + } + } + } + + /** + * Query level threshold(if greater than -1) set in QueryPragmas overrides the cluster level threshold set in EsqlFlags. + */ + public void testRoundToTransformToQueryAndTagsWithCustomizedUpperLimit() { + for (int clusterLevelThreshold : List.of(-1, 0, 60, 126, 128, 256)) { + for (int queryLevelThreshold : List.of(-1, 0, 60, 126, 128, 256)) { + StringBuilder points = new StringBuilder(); // there are 127 rounding points + for (int i = 0; i < 127; i++) { + if (i > 0) { + points.append(", "); + } + points.append(i); + } + String query = LoggerMessageFormat.format(null, """ + from test + | stats count(*) by x = round_to(integer, {}) + """, points.toString()); + + TestPlannerOptimizer plannerOptimizerWithPragmas = new TestPlannerOptimizer( + configuration( + new QueryPragmas( + Settings.builder() + .put(QueryPragmas.ROUNDTO_PUSHDOWN_THRESHOLD.getKey().toLowerCase(Locale.ROOT), queryLevelThreshold) + .build() + ), + query + ), + makeAnalyzer("mapping-all-types.json") + ); + EsqlFlags esqlFlags = new EsqlFlags(clusterLevelThreshold); + assertEquals(clusterLevelThreshold, esqlFlags.roundToPushdownThreshold()); + assertTrue(esqlFlags.stringLikeOnIndex()); + PhysicalPlan plan = plannerOptimizerWithPragmas.plan(query, searchStats, esqlFlags); + boolean pushdown = false; + if (queryLevelThreshold > -1) { + pushdown = queryLevelThreshold >= 127; + } else { + pushdown = clusterLevelThreshold >= 127; + } + + LimitExec limit = as(plan, LimitExec.class); + AggregateExec agg = as(limit.child(), AggregateExec.class); + assertThat(agg.getMode(), is(FINAL)); + List groupings = agg.groupings(); + NamedExpression grouping = as(groupings.get(0), NamedExpression.class); + assertEquals("x", grouping.name()); + assertEquals(DataType.INTEGER, grouping.dataType()); + assertEquals(List.of("count(*)", "x"), Expressions.names(agg.aggregates())); + ExchangeExec exchange = as(agg.child(), ExchangeExec.class); + assertThat(exchange.inBetweenAggs(), is(true)); + agg = as(exchange.child(), AggregateExec.class); + EvalExec evalExec = as(agg.child(), EvalExec.class); + List aliases = evalExec.fields(); + assertEquals(1, aliases.size()); + if (pushdown) { + FieldAttribute roundToTag = as(aliases.get(0).child(), FieldAttribute.class); + assertTrue(roundToTag.name().startsWith("$$integer$round_to$")); + EsQueryExec esQueryExec = as(evalExec.child(), EsQueryExec.class); + List queryBuilderAndTags = esQueryExec.queryBuilderAndTags(); + assertEquals(128, queryBuilderAndTags.size()); // 127 + nullBucket + assertThrows(UnsupportedOperationException.class, esQueryExec::query); + } else { // query rewrite does not happen + RoundTo roundTo = as(aliases.get(0).child(), RoundTo.class); + assertEquals(127, roundTo.points().size()); + FieldExtractExec fieldExtractExec = as(evalExec.child(), FieldExtractExec.class); + EsQueryExec esQueryExec = as(fieldExtractExec.child(), EsQueryExec.class); + List queryBuilderAndTags = esQueryExec.queryBuilderAndTags(); + assertEquals(1, queryBuilderAndTags.size()); + EsQueryExec.QueryBuilderAndTags queryBuilder = queryBuilderAndTags.get(0); + assertNull(queryBuilder.query()); + assertTrue(queryBuilder.tags().isEmpty()); + assertNull(esQueryExec.query()); + } + } + } + } + + private static void verifyQueryAndTags(List expected, List actual) { + assertEquals(expected.size(), actual.size()); + for (int i = 0; i < expected.size(); i++) { + EsQueryExec.QueryBuilderAndTags expectedItem = expected.get(i); + EsQueryExec.QueryBuilderAndTags actualItem = actual.get(i); + assertEquals(expectedItem.query().toString(), actualItem.query().toString()); + assertEquals(expectedItem.tags().get(0), actualItem.tags().get(0)); + } + } + + private static List expectedQueryBuilderAndTags( + String query, + String fieldName, + List roundingPoints, + Source source, + QueryBuilder mainQueryBuilder + ) { + List expected = new ArrayList<>(5); + boolean isDateField = fieldName.equals("date"); + boolean isDateNanosField = fieldName.equals("date_nanos"); + boolean isNumericField = (isDateField || isDateNanosField) == false; + List> rangeAndTags = isNumericField ? numericBuckets(roundingPoints) : dateBuckets(isDateField); + for (List rangeAndTag : rangeAndTags) { + Object lower = rangeAndTag.get(0); + Object upper = rangeAndTag.get(1); + Object tag = rangeAndTag.get(2); + RangeQueryBuilder rangeQueryBuilder; + if (isNumericField) { + rangeQueryBuilder = rangeQuery(fieldName).boost(0); + } else if (isDateField) { // date + rangeQueryBuilder = rangeQuery(fieldName).boost(0).timeZone("Z").format(DEFAULT_DATE_TIME_FORMATTER.pattern()); + } else { // date_nanos + rangeQueryBuilder = rangeQuery(fieldName).boost(0).timeZone("Z").format(DEFAULT_DATE_NANOS_FORMATTER.pattern()); + } + if (upper != null) { + rangeQueryBuilder = rangeQueryBuilder.lt(upper); + } + if (lower != null) { + rangeQueryBuilder = rangeQueryBuilder.gte(lower); + } + + QueryBuilder qb = wrapWithSingleQuery(query, rangeQueryBuilder, fieldName, source); + if (mainQueryBuilder != null) { + qb = boolQuery().filter(mainQueryBuilder).filter(qb); + } + expected.add(new EsQueryExec.QueryBuilderAndTags(qb, List.of(tag))); + } + // add null bucket + BoolQueryBuilder isNullQueryBuilder = boolQuery().mustNot(existsQuery(fieldName).boost(0)); + List nullTags = new ArrayList<>(1); + nullTags.add(null); + if (mainQueryBuilder != null) { + isNullQueryBuilder = boolQuery().filter(mainQueryBuilder).filter(isNullQueryBuilder.boost(0)); + } + expected.add(new EsQueryExec.QueryBuilderAndTags(isNullQueryBuilder, nullTags)); + return expected; + } + + private static List> dateBuckets(boolean isDate) { + // Date rounding points + String[] dates = { "2023-10-20T00:00:00.000Z", "2023-10-21T00:00:00.000Z", "2023-10-22T00:00:00.000Z", "2023-10-23T00:00:00.000Z" }; + + // first bucket has no lower bound + List firstBucket = new ArrayList<>(3); + firstBucket.add(null); + firstBucket.add(dates[1]); + firstBucket.add(isDate ? dateTimeToLong(dates[0]) : dateNanosToLong(dates[0])); + + // last bucket has no upper bound + List lastBucket = new ArrayList<>(3); + lastBucket.add(dates[3]); + lastBucket.add(null); + lastBucket.add(isDate ? dateTimeToLong(dates[3]) : dateNanosToLong(dates[3])); + + return List.of( + firstBucket, + List.of(dates[1], dates[2], isDate ? dateTimeToLong(dates[1]) : dateNanosToLong(dates[1])), + List.of(dates[2], dates[3], isDate ? dateTimeToLong(dates[2]) : dateNanosToLong(dates[2])), + lastBucket + ); + } + + private static List> numericBuckets(List roundingPoints) { + // sort the rounding points in ascending order + roundingPoints = roundingPoints.stream().sorted().collect(Collectors.toList()); + Object p1 = roundingPoints.get(0); + Object p2 = roundingPoints.get(1); + Object p3 = roundingPoints.get(2); + Object p4 = roundingPoints.get(3); + // first bucket has no lower bound + List firstBucket = new ArrayList<>(3); + firstBucket.add(null); + firstBucket.add(p2); + firstBucket.add(p1); + // last bucket has no upper bound + List lastBucket = new ArrayList<>(3); + lastBucket.add(p4); + lastBucket.add(null); + lastBucket.add(p4); + return List.of(firstBucket, List.of(p2, p3, p2), List.of(p3, p4, p3), lastBucket); + } + + private static SearchStats searchStats() { + // create a SearchStats with min and max in milliseconds + Map minValue = Map.of("date", 1697804103360L); // 2023-10-20T12:15:03.360Z + Map maxValue = Map.of("date", 1698069301543L); // 2023-10-23T13:55:01.543Z + return new EsqlTestUtils.TestSearchStatsWithMinMax(minValue, maxValue); + } +} diff --git a/x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/plan/physical/EsQueryExecSerializationTests.java b/x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/plan/physical/EsQueryExecSerializationTests.java deleted file mode 100644 index eb53a57d3bdfb..0000000000000 --- a/x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/plan/physical/EsQueryExecSerializationTests.java +++ /dev/null @@ -1,91 +0,0 @@ -/* - * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one - * or more contributor license agreements. Licensed under the Elastic License - * 2.0; you may not use this file except in compliance with the Elastic License - * 2.0. - */ - -package org.elasticsearch.xpack.esql.plan.physical; - -import org.elasticsearch.index.IndexMode; -import org.elasticsearch.index.query.MatchAllQueryBuilder; -import org.elasticsearch.index.query.QueryBuilder; -import org.elasticsearch.index.query.TermQueryBuilder; -import org.elasticsearch.xpack.esql.core.expression.Attribute; -import org.elasticsearch.xpack.esql.core.expression.Expression; -import org.elasticsearch.xpack.esql.core.expression.Literal; -import org.elasticsearch.xpack.esql.core.type.DataType; -import org.elasticsearch.xpack.esql.index.EsIndexSerializationTests; - -import java.io.IOException; -import java.util.List; -import java.util.Map; - -import static org.elasticsearch.xpack.esql.index.EsIndexSerializationTests.randomIndexNameWithModes; - -public class EsQueryExecSerializationTests extends AbstractPhysicalPlanSerializationTests { - public static EsQueryExec randomEsQueryExec() { - return new EsQueryExec( - randomSource(), - randomIdentifier(), - randomFrom(IndexMode.values()), - randomIndexNameWithModes(), - randomFieldAttributes(1, 10, false), - randomQuery(), - new Literal(randomSource(), between(0, Integer.MAX_VALUE), DataType.INTEGER), - EsQueryExec.NO_SORTS, - randomEstimatedRowSize() - ); - } - - public static QueryBuilder randomQuery() { - return randomBoolean() ? new MatchAllQueryBuilder() : new TermQueryBuilder(randomAlphaOfLength(4), randomAlphaOfLength(4)); - } - - @Override - protected EsQueryExec createTestInstance() { - return randomEsQueryExec(); - } - - @Override - protected EsQueryExec mutateInstance(EsQueryExec instance) throws IOException { - String indexPattern = instance.indexPattern(); - IndexMode indexMode = instance.indexMode(); - Map indexNameWithModes = instance.indexNameWithModes(); - List attrs = instance.attrs(); - QueryBuilder query = instance.query(); - Expression limit = instance.limit(); - Integer estimatedRowSize = instance.estimatedRowSize(); - switch (between(0, 6)) { - case 0 -> indexPattern = randomValueOtherThan(indexPattern, EsIndexSerializationTests::randomIdentifier); - case 1 -> indexMode = randomValueOtherThan(indexMode, () -> randomFrom(IndexMode.values())); - case 2 -> indexNameWithModes = randomValueOtherThan(indexNameWithModes, EsIndexSerializationTests::randomIndexNameWithModes); - case 3 -> attrs = randomValueOtherThan(attrs, () -> randomFieldAttributes(1, 10, false)); - case 4 -> query = randomValueOtherThan(query, EsQueryExecSerializationTests::randomQuery); - case 5 -> limit = randomValueOtherThan( - limit, - () -> new Literal(randomSource(), between(0, Integer.MAX_VALUE), DataType.INTEGER) - ); - case 6 -> estimatedRowSize = randomValueOtherThan( - estimatedRowSize, - AbstractPhysicalPlanSerializationTests::randomEstimatedRowSize - ); - } - return new EsQueryExec( - instance.source(), - indexPattern, - indexMode, - indexNameWithModes, - attrs, - query, - limit, - EsQueryExec.NO_SORTS, - estimatedRowSize - ); - } - - @Override - protected boolean alwaysEmptySource() { - return true; - } -} diff --git a/x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/planner/EsPhysicalOperationProvidersTests.java b/x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/planner/EsPhysicalOperationProvidersTests.java index 31b720b65ebac..6430967584553 100644 --- a/x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/planner/EsPhysicalOperationProvidersTests.java +++ b/x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/planner/EsPhysicalOperationProvidersTests.java @@ -91,10 +91,10 @@ record TestCase(QueryBuilder query, List nullsFilteredFields) { IndexMode.STANDARD, Map.of(), List.of(), - testCase.query, null, null, - 10 + 10, + List.of(new EsQueryExec.QueryBuilderAndTags(testCase.query, List.of())) ); FieldExtractExec fieldExtractExec = new FieldExtractExec( Source.EMPTY, diff --git a/x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/planner/LocalExecutionPlannerTests.java b/x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/planner/LocalExecutionPlannerTests.java index f820ee636f630..e1ea1bf2fef94 100644 --- a/x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/planner/LocalExecutionPlannerTests.java +++ b/x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/planner/LocalExecutionPlannerTests.java @@ -137,8 +137,8 @@ public void testLuceneSourceOperatorHugeRowSize() throws IOException { List.of(), null, null, - null, - estimatedRowSize + estimatedRowSize, + List.of(new EsQueryExec.QueryBuilderAndTags(null, List.of())) ) ); assertThat(plan.driverFactories.size(), lessThanOrEqualTo(pragmas.taskConcurrency())); @@ -166,10 +166,10 @@ public void testLuceneTopNSourceOperator() throws IOException { IndexMode.STANDARD, index().indexNameWithModes(), List.of(), - null, limit, List.of(sort), - estimatedRowSize + estimatedRowSize, + List.of(new EsQueryExec.QueryBuilderAndTags(null, List.of())) ) ); assertThat(plan.driverFactories.size(), lessThanOrEqualTo(pragmas.taskConcurrency())); @@ -197,10 +197,10 @@ public void testLuceneTopNSourceOperatorDistanceSort() throws IOException { IndexMode.STANDARD, index().indexNameWithModes(), List.of(), - null, limit, List.of(sort), - estimatedRowSize + estimatedRowSize, + List.of(new EsQueryExec.QueryBuilderAndTags(null, List.of())) ) ); assertThat(plan.driverFactories.size(), lessThanOrEqualTo(pragmas.taskConcurrency())); @@ -223,8 +223,8 @@ public void testDriverClusterAndNodeName() throws IOException { List.of(), null, null, - null, - estimatedRowSize + estimatedRowSize, + List.of(new EsQueryExec.QueryBuilderAndTags(null, List.of())) ) ); assertThat(plan.driverFactories.size(), lessThanOrEqualTo(pragmas.taskConcurrency())); @@ -242,8 +242,8 @@ public void testParallel() throws Exception { List.of(), null, null, - null, - between(1, 1000) + between(1, 1000), + List.of(new EsQueryExec.QueryBuilderAndTags(null, List.of())) ); var limitExec = new LimitExec( Source.EMPTY, @@ -279,8 +279,8 @@ private BlockLoader constructBlockLoader() throws IOException { List.of(new FieldAttribute(Source.EMPTY, EsQueryExec.DOC_ID_FIELD.getName(), EsQueryExec.DOC_ID_FIELD)), null, null, - null, - between(1, 1000) + between(1, 1000), + List.of(new EsQueryExec.QueryBuilderAndTags(null, List.of())) ); FieldExtractExec fieldExtractExec = new FieldExtractExec( Source.EMPTY, diff --git a/x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/tree/EsqlNodeSubclassTests.java b/x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/tree/EsqlNodeSubclassTests.java index e24e6adc2becb..12e1b8215d530 100644 --- a/x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/tree/EsqlNodeSubclassTests.java +++ b/x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/tree/EsqlNodeSubclassTests.java @@ -16,6 +16,9 @@ import org.elasticsearch.core.SuppressForbidden; import org.elasticsearch.dissect.DissectParser; import org.elasticsearch.index.IndexMode; +import org.elasticsearch.index.query.MatchAllQueryBuilder; +import org.elasticsearch.index.query.QueryBuilder; +import org.elasticsearch.index.query.TermQueryBuilder; import org.elasticsearch.test.ESTestCase; import org.elasticsearch.xpack.core.enrich.EnrichPolicy; import org.elasticsearch.xpack.esql.core.capabilities.UnresolvedException; @@ -518,6 +521,10 @@ public void accept(Page page) { ); } + if (argClass == EsQueryExec.QueryBuilderAndTags.class) { + return randomQueryBuildAndTags(); + } + try { return mock(argClass); } catch (MockitoException e) { @@ -732,6 +739,14 @@ static EsRelation randomEsRelation() { ); } + static QueryBuilder randomQuery() { + return randomBoolean() ? new MatchAllQueryBuilder() : new TermQueryBuilder(randomAlphaOfLength(4), randomAlphaOfLength(4)); + } + + static EsQueryExec.QueryBuilderAndTags randomQueryBuildAndTags() { + return new EsQueryExec.QueryBuilderAndTags(randomQuery(), List.of(randomBoolean() ? randomLong() : randomDouble())); + } + static FieldAttribute field(String name, DataType type) { return new FieldAttribute( Source.EMPTY,