Skip to content
Merged
Show file tree
Hide file tree
Changes from 24 commits
Commits
Show all changes
33 commits
Select commit Hold shift + click to select a range
63ae2e5
Rewrite RoundTo to QueryAndTags
fang-xing-esql Aug 6, 2025
59edd23
Update docs/changelog/132512.yaml
fang-xing-esql Aug 6, 2025
533ba43
Merge branch 'main' into pushdown-roundto
fang-xing-esql Aug 6, 2025
b135998
Merge branch 'main' into pushdown-roundto
fang-xing-esql Aug 6, 2025
59fef5d
local physical planner tests
fang-xing-esql Aug 8, 2025
9b8798b
more tests of push down functions
fang-xing-esql Aug 8, 2025
35c6ef2
refactor and more comments
fang-xing-esql Aug 8, 2025
00ca0ba
Merge branch 'main' into pushdown-roundto
fang-xing-esql Aug 8, 2025
9afab26
Merge branch 'main' into pushdown-roundto
fang-xing-esql Aug 8, 2025
c03ef46
Merge branch 'main' into pushdown-roundto
fang-xing-esql Aug 11, 2025
9fa959a
Merge branch 'main' into pushdown-roundto
fang-xing-esql Aug 14, 2025
a2e022b
refactor EsQueryExec, merge query into queryBuilderAndTags, remove se…
fang-xing-esql Aug 14, 2025
5720174
Merge branch 'main' into pushdown-roundto
fang-xing-esql Aug 14, 2025
5976fbc
fix test
fang-xing-esql Aug 15, 2025
a936828
Merge branch 'main' into pushdown-roundto
fang-xing-esql Aug 15, 2025
55fb0ca
more test
fang-xing-esql Aug 15, 2025
c8d4fe4
Merge branch 'pushdown-roundto' of github.com:fang-xing-esql/Elastics…
fang-xing-esql Aug 15, 2025
89a8686
update test
fang-xing-esql Aug 15, 2025
bced1e6
Merge branch 'main' into pushdown-roundto
fang-xing-esql Aug 15, 2025
04c733e
more tests for lookup join and fork even they are not supported yet
fang-xing-esql Aug 15, 2025
dde4996
Merge branch 'main' into pushdown-roundto
fang-xing-esql Aug 15, 2025
41a51f5
Merge branch 'main' into pushdown-roundto
fang-xing-esql Aug 18, 2025
91179dd
update according to review comment
fang-xing-esql Aug 18, 2025
edbd545
Merge branch 'main' into pushdown-roundto
fang-xing-esql Aug 18, 2025
bdece4f
update according to review comments
fang-xing-esql Aug 20, 2025
e2c7666
Merge branch 'main' into pushdown-roundto
fang-xing-esql Aug 20, 2025
c23d2a4
Merge branch 'main' into pushdown-roundto
fang-xing-esql Aug 20, 2025
d43363e
Merge branch 'main' into pushdown-roundto
fang-xing-esql Aug 21, 2025
72ad539
upper limit on the number of rounding points when converting roundTo …
fang-xing-esql Aug 21, 2025
1c70e36
add ROUNDTO_PUSHDOWN_THRESHOLD in QueryParagmas and EsqlFlags to cont…
fang-xing-esql Aug 22, 2025
cd07cb5
Merge branch 'main' into pushdown-roundto
fang-xing-esql Aug 22, 2025
757e929
update after merging main
fang-xing-esql Aug 23, 2025
91cd778
Merge branch 'main' into pushdown-roundto
fang-xing-esql Aug 25, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions docs/changelog/132512.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
pr: 132512
summary: Rewrite `RoundTo` to `QueryAndTags`
area: ES|QL
type: enhancement
issues: []

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@
import java.io.IOException;
import java.util.List;
import java.util.Map;
import java.util.Objects;

import static org.elasticsearch.common.logging.LoggerMessageFormat.format;
import static org.elasticsearch.xpack.esql.core.expression.TypeResolutions.isFoldable;
Expand All @@ -39,6 +40,7 @@
import static org.elasticsearch.xpack.esql.core.type.DataType.INTEGER;
import static org.elasticsearch.xpack.esql.core.type.DataType.LONG;
import static org.elasticsearch.xpack.esql.core.type.DataType.UNSIGNED_LONG;
import static org.elasticsearch.xpack.esql.core.type.DataTypeConverter.safeToLong;
import static org.elasticsearch.xpack.esql.type.EsqlDataTypeConverter.commonType;

/**
Expand Down Expand Up @@ -181,7 +183,8 @@ public ExpressionEvaluator.Factory toEvaluator(ToEvaluator toEvaluator) {
ExpressionEvaluator.Factory field = toEvaluator.apply(field());
field = Cast.cast(source(), field().dataType(), dataType, field);
List<Object> points = Iterators.toList(Iterators.map(points().iterator(), p -> Foldables.valueOf(toEvaluator.foldCtx(), p)));
return build.build(source(), field, points);
List<Object> sortedPoints = sortedRoundingPoints(points, dataType); // provide sorted points to the evaluator
return build.build(source(), field, sortedPoints);
}

interface Build {
Expand All @@ -195,4 +198,19 @@ interface Build {
Map.entry(LONG, RoundToLong.BUILD),
Map.entry(DOUBLE, RoundToDouble.BUILD)
);

public static List<Object> sortedRoundingPoints(List<Object> points, DataType dataType) {
return points.stream().filter(Objects::nonNull).map(p -> switch (dataType) { // the types are in sync with SIGNATURES
case INTEGER -> ((Number) p).intValue();
case DOUBLE -> ((Number) p).doubleValue();
case LONG, DATETIME, DATE_NANOS -> safeToLong((Number) p);
default -> throw new IllegalArgumentException("Unsupported data type: " + dataType);
}).sorted((a, b) -> {
if (a instanceof Double || b instanceof Double) {
return Double.compare(a.doubleValue(), b.doubleValue());
} else {
return Long.compare(a.longValue(), b.longValue());
}
}).collect(java.util.stream.Collectors.toList());
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,6 @@ import java.util.Arrays;
class RoundTo$Type$ {
static final RoundTo.Build BUILD = (source, field, points) -> {
$type$[] f = points.stream().mapTo$Type$(p -> ((Number) p).$type$Value()).toArray();
Arrays.sort(f);
return switch (f.length) {
// TODO should be a consistent way to do the 0 version - is CASE(MV_COUNT(f) == 1, f[0])
case 1 -> new RoundTo$Type$1Evaluator.Factory(source, field, f[0]);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
import org.elasticsearch.xpack.esql.optimizer.rules.physical.local.PushSampleToSource;
import org.elasticsearch.xpack.esql.optimizer.rules.physical.local.PushStatsToSource;
import org.elasticsearch.xpack.esql.optimizer.rules.physical.local.PushTopNToSource;
import org.elasticsearch.xpack.esql.optimizer.rules.physical.local.ReplaceRoundToWithQueryAndTags;
import org.elasticsearch.xpack.esql.optimizer.rules.physical.local.ReplaceSourceAttributes;
import org.elasticsearch.xpack.esql.optimizer.rules.physical.local.SpatialDocValuesExtraction;
import org.elasticsearch.xpack.esql.optimizer.rules.physical.local.SpatialShapeBoundsExtraction;
Expand Down Expand Up @@ -60,7 +61,7 @@ protected List<Batch<PhysicalPlan>> batches() {
}

protected static List<Batch<PhysicalPlan>> rules(boolean optimizeForEsSource) {
List<Rule<?, PhysicalPlan>> esSourceRules = new ArrayList<>(6);
List<Rule<?, PhysicalPlan>> esSourceRules = new ArrayList<>(7);
esSourceRules.add(new ReplaceSourceAttributes());
if (optimizeForEsSource) {
esSourceRules.add(new PushTopNToSource());
Expand All @@ -74,6 +75,20 @@ protected static List<Batch<PhysicalPlan>> rules(boolean optimizeForEsSource) {
// execute the rules multiple times to improve the chances of things being pushed down
@SuppressWarnings("unchecked")
var pushdown = new Batch<PhysicalPlan>("Push to ES", esSourceRules.toArray(Rule[]::new));
List<Rule<?, PhysicalPlan>> substitutionRules = new ArrayList<>(1);
if (optimizeForEsSource) {
substitutionRules.add(new ReplaceRoundToWithQueryAndTags());
}
// execute the SubstituteRoundToWithQueryAndTags rule once after all the other pushdown rules are applied, as this rule generate
// multiple QueryBuilders according the number of RoundTo points, it should be applied after all the other eligible pushdowns are
// done, and it should be executed only once.
@SuppressWarnings("unchecked")
var substituteRoundToWithQueryAndTags = new Batch<PhysicalPlan>(
"Substitute RoundTo with QueryAndTags",
Limiter.ONCE,
substitutionRules.toArray(Rule[]::new)
);

// add the field extraction in just one pass
// add it at the end after all the other rules have ran
var fieldExtraction = new Batch<>(
Expand All @@ -84,6 +99,6 @@ protected static List<Batch<PhysicalPlan>> rules(boolean optimizeForEsSource) {
new SpatialShapeBoundsExtraction(),
new ParallelizeTimeSeriesSource()
);
return List.of(pushdown, fieldExtraction);
return List.of(pushdown, substituteRoundToWithQueryAndTags, fieldExtraction);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -126,10 +126,10 @@ private static PhysicalPlan rewrite(
queryExec.indexMode(),
queryExec.indexNameWithModes(),
queryExec.output(),
query,
queryExec.limit(),
queryExec.sorts(),
queryExec.estimatedRowSize()
queryExec.estimatedRowSize(),
List.of(new EsQueryExec.QueryBuilderAndTags(query, List.of()))
);
// If the eval contains other aliases, not just field attributes, we need to keep them in the plan
PhysicalPlan plan = evalFields.isEmpty() ? queryExec : new EvalExec(filterExec.source(), queryExec, evalFields);
Expand Down
Loading