Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions docs/changelog/135987.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
pr: 135987
summary: Avoid rewrite `round_to` with expensive queries
area: ES|QL
type: bug
issues: []
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,18 @@

package org.elasticsearch.xpack.esql.optimizer.rules.physical.local;

import org.elasticsearch.index.IndexMode;
import org.elasticsearch.index.query.BoolQueryBuilder;
import org.elasticsearch.index.query.FuzzyQueryBuilder;
import org.elasticsearch.index.query.MatchAllQueryBuilder;
import org.elasticsearch.index.query.MatchNoneQueryBuilder;
import org.elasticsearch.index.query.MultiTermQueryBuilder;
import org.elasticsearch.index.query.PrefixQueryBuilder;
import org.elasticsearch.index.query.QueryBuilder;
import org.elasticsearch.index.query.RangeQueryBuilder;
import org.elasticsearch.index.query.RegexpQueryBuilder;
import org.elasticsearch.index.query.TermsQueryBuilder;
import org.elasticsearch.index.query.WildcardQueryBuilder;
import org.elasticsearch.logging.LogManager;
import org.elasticsearch.logging.Logger;
import org.elasticsearch.xpack.esql.core.expression.Alias;
Expand All @@ -30,6 +41,8 @@
import org.elasticsearch.xpack.esql.plan.physical.EsQueryExec;
import org.elasticsearch.xpack.esql.plan.physical.EvalExec;
import org.elasticsearch.xpack.esql.plan.physical.PhysicalPlan;
import org.elasticsearch.xpack.esql.querydsl.query.SingleValueQuery;
import org.elasticsearch.xpack.esql.stats.SearchStats;

import java.time.ZoneId;
import java.util.ArrayList;
Expand Down Expand Up @@ -275,7 +288,12 @@ protected PhysicalPlan rule(EvalExec evalExec, LocalPhysicalOptimizerContext ctx
if (roundTos.size() == 1) {
RoundTo roundTo = roundTos.get(0);
int count = roundTo.points().size();
int roundingPointsUpperLimit = roundingPointsThreshold(ctx);
int roundingPointsUpperLimit = adjustedRoundingPointsThreshold(
ctx.searchStats(),
roundingPointsThreshold(ctx),
queryExec.query(),
queryExec.indexMode()
);
if (count > roundingPointsUpperLimit) {
logger.debug(
"Skipping RoundTo push down for [{}], as it has [{}] points, which is more than [{}]",
Expand Down Expand Up @@ -485,4 +503,63 @@ private int roundingPointsThreshold(LocalPhysicalOptimizerContext ctx) {
}
return roundingPointsThreshold;
}

/**
* If the main query is expensive (such as including wildcard queries), executing more queries with tags is slower and more costly
* than executing fewer queries without tags and then reading points and rounding. The rounding points threshold is treated as the
* maximum number of clauses allowed to execute. We estimate the number of clauses in the main query and adjust the threshold so
* that the total number of clauses does not exceed the limit by too much. Some expensive queries count as more than one clause;
* for example, a wildcard query counts as 5 clauses, and a terms query counts as the number of terms.
*/
static int adjustedRoundingPointsThreshold(SearchStats stats, int threshold, QueryBuilder query, IndexMode indexMode) {
int clauses = estimateQueryClauses(stats, query) + 1;
if (indexMode == IndexMode.TIME_SERIES) {
// No doc partitioning for time_series sources; increase the threshold to trade overhead for parallelism.
threshold *= 2;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Super nit: conside adding constants for these numbers (2, 5 etc).

}
return Math.ceilDiv(threshold, clauses);
}

static int estimateQueryClauses(SearchStats stats, QueryBuilder q) {
if (q == null || q instanceof MatchAllQueryBuilder || q instanceof MatchNoneQueryBuilder) {
return 0;
}
if (q instanceof WildcardQueryBuilder
|| q instanceof RegexpQueryBuilder
|| q instanceof PrefixQueryBuilder
|| q instanceof FuzzyQueryBuilder) {
return 5;
}
if (q instanceof RangeQueryBuilder r) {
// with points count 1, without count 3
return stats.min(new FieldAttribute.FieldName(r.fieldName())) != null ? 1 : 3;
}
if (q instanceof MultiTermQueryBuilder) {
return 3;
}
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should we also score phrase queries differently?

if (q instanceof TermsQueryBuilder terms && terms.values() != null) {
return terms.values().size();
}
if (q instanceof SingleValueQuery.Builder b) {
// ignore the single_value clause
return Math.max(1, estimateQueryClauses(stats, b.next()));
}
if (q instanceof BoolQueryBuilder bq) {
int total = 0;
for (var c : bq.filter()) {
total += estimateQueryClauses(stats, c);
}
for (var c : bq.must()) {
total += estimateQueryClauses(stats, c);
}
for (var c : bq.should()) {
total += estimateQueryClauses(stats, c);
}
for (var c : bq.mustNot()) {
total += Math.max(2, estimateQueryClauses(stats, c));
}
return total;
}
return 1;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,7 @@
import java.util.Locale;
import java.util.Map;
import java.util.stream.Collectors;
import java.util.stream.IntStream;

import static org.elasticsearch.compute.aggregation.AggregatorMode.FINAL;
import static org.elasticsearch.compute.aggregation.AggregatorMode.SINGLE;
Expand All @@ -67,6 +68,7 @@
import static org.elasticsearch.xpack.esql.type.EsqlDataTypeConverter.DEFAULT_DATE_TIME_FORMATTER;
import static org.elasticsearch.xpack.esql.type.EsqlDataTypeConverter.dateNanosToLong;
import static org.elasticsearch.xpack.esql.type.EsqlDataTypeConverter.dateTimeToLong;
import static org.hamcrest.Matchers.equalTo;
import static org.hamcrest.Matchers.is;

//@TestLogging(value = "org.elasticsearch.xpack.esql:TRACE", reason = "debug")
Expand Down Expand Up @@ -537,6 +539,74 @@ public void testRoundToTransformToQueryAndTagsWithCustomizedUpperLimit() {
}
}

static String pointArray(int numPoints) {
return IntStream.range(0, numPoints).mapToObj(Integer::toString).collect(Collectors.joining(","));
}

static int queryAndTags(PhysicalPlan plan) {
EsQueryExec esQuery = (EsQueryExec) plan.collectFirstChildren(EsQueryExec.class::isInstance).getFirst();
return esQuery.queryBuilderAndTags().size();
}

public void testAdjustThresholdForQueries() {
{
int points = between(2, 127);
String q = String.format(Locale.ROOT, """
from test
| stats count(*) by x = round_to(integer, %s)
""", pointArray(points));
PhysicalPlan plan = plannerOptimizer.plan(q, searchStats, makeAnalyzer("mapping-all-types.json"));
int queryAndTags = queryAndTags(plan);
assertThat(queryAndTags, equalTo(points + 1)); // include null bucket
}
{
int points = between(2, 64);
String q = String.format(Locale.ROOT, """
from test
| where date >= "2023-10-19"
| stats count(*) by x = round_to(integer, %s)
""", pointArray(points));
var plan = plannerOptimizer.plan(q, searchStats, makeAnalyzer("mapping-all-types.json"));
int queryAndTags = queryAndTags(plan);
assertThat(queryAndTags, equalTo(points + 1)); // include null bucket
}
{
int points = between(65, 128);
String q = String.format(Locale.ROOT, """
from test
| where date >= "2023-10-19"
| stats count(*) by x = round_to(integer, %s)
""", pointArray(points));
var plan = plannerOptimizer.plan(q, searchStats, makeAnalyzer("mapping-all-types.json"));
int queryAndTags = queryAndTags(plan);
assertThat(queryAndTags, equalTo(1)); // no rewrite
}
{
int points = between(2, 19);
String q = String.format(Locale.ROOT, """
from test
| where date >= "2023-10-19"
| where keyword LIKE "w*"
| stats count(*) by x = round_to(integer, %s)
""", pointArray(points));
var plan = plannerOptimizer.plan(q, searchStats, makeAnalyzer("mapping-all-types.json"));
int queryAndTags = queryAndTags(plan);
assertThat("points=" + points, queryAndTags, equalTo(points + 1)); // include null bucket
}
{
int points = between(20, 128);
String q = String.format(Locale.ROOT, """
from test
| where date >= "2023-10-19"
| where keyword LIKE "*w*"
| stats count(*) by x = round_to(integer, %s)
""", pointArray(points));
PhysicalPlan plan = plannerOptimizer.plan(q, searchStats, makeAnalyzer("mapping-all-types.json"));
int queryAndTags = queryAndTags(plan);
assertThat("points=" + points, queryAndTags, equalTo(1)); // no rewrite
}
}

private static void verifyQueryAndTags(List<EsQueryExec.QueryBuilderAndTags> expected, List<EsQueryExec.QueryBuilderAndTags> actual) {
assertEquals(expected.size(), actual.size());
for (int i = 0; i < expected.size(); i++) {
Expand Down