-
Notifications
You must be signed in to change notification settings - Fork 25.6k
[ES|QL] Substitute date_trunc with round_to when the pre-calculated rounding points are available #128639
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[ES|QL] Substitute date_trunc with round_to when the pre-calculated rounding points are available #128639
Changes from 22 commits
b677c8f
08c3d18
3f96c15
c3ffa8f
f78fa18
88dbdf8
511faeb
3d28f88
3f1cd81
e59ea5a
2f1aef9
debd20e
5d6d9c9
5a152ab
7b669b8
54567d4
3208b5f
ee24a7e
a76362f
57dfe15
d42ea8e
89039cd
1d453f3
4e82da7
a2b80be
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,6 @@ | ||
| pr: 128639 | ||
| summary: Substitue `date_trunc` with `round_to` when the pre-calculated rounding points | ||
| are available | ||
| area: ES|QL | ||
| type: enhancement | ||
| issues: [] |
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -16,38 +16,53 @@ | |
| import org.elasticsearch.compute.ann.Fixed; | ||
| import org.elasticsearch.compute.operator.EvalOperator.ExpressionEvaluator; | ||
| import org.elasticsearch.core.TimeValue; | ||
| import org.elasticsearch.logging.LogManager; | ||
| import org.elasticsearch.logging.Logger; | ||
| import org.elasticsearch.xpack.esql.core.expression.Expression; | ||
| import org.elasticsearch.xpack.esql.core.expression.FieldAttribute; | ||
| import org.elasticsearch.xpack.esql.core.expression.FoldContext; | ||
| import org.elasticsearch.xpack.esql.core.expression.Literal; | ||
| import org.elasticsearch.xpack.esql.core.tree.NodeInfo; | ||
| import org.elasticsearch.xpack.esql.core.tree.Source; | ||
| import org.elasticsearch.xpack.esql.core.type.DataType; | ||
| import org.elasticsearch.xpack.esql.core.type.MultiTypeEsField; | ||
| import org.elasticsearch.xpack.esql.expression.SurrogateExpression; | ||
| import org.elasticsearch.xpack.esql.expression.function.Example; | ||
| import org.elasticsearch.xpack.esql.expression.function.FunctionInfo; | ||
| import org.elasticsearch.xpack.esql.expression.function.Param; | ||
| import org.elasticsearch.xpack.esql.expression.function.scalar.EsqlScalarFunction; | ||
| import org.elasticsearch.xpack.esql.expression.function.scalar.math.RoundTo; | ||
| import org.elasticsearch.xpack.esql.io.stream.PlanStreamInput; | ||
| import org.elasticsearch.xpack.esql.stats.SearchStats; | ||
|
|
||
| import java.io.IOException; | ||
| import java.time.Duration; | ||
| import java.time.Period; | ||
| import java.time.ZoneId; | ||
| import java.time.ZoneOffset; | ||
| import java.util.Arrays; | ||
| import java.util.List; | ||
| import java.util.Map; | ||
| import java.util.concurrent.TimeUnit; | ||
| import java.util.stream.Collectors; | ||
|
|
||
| import static org.elasticsearch.xpack.esql.core.expression.TypeResolutions.ParamOrdinal.FIRST; | ||
| import static org.elasticsearch.xpack.esql.core.expression.TypeResolutions.ParamOrdinal.SECOND; | ||
| import static org.elasticsearch.xpack.esql.core.expression.TypeResolutions.isType; | ||
| import static org.elasticsearch.xpack.esql.core.type.DataType.DATETIME; | ||
| import static org.elasticsearch.xpack.esql.core.type.DataType.DATE_NANOS; | ||
| import static org.elasticsearch.xpack.esql.core.type.DataType.isDateTime; | ||
| import static org.elasticsearch.xpack.esql.type.EsqlDataTypeConverter.dateWithTypeToString; | ||
|
|
||
| public class DateTrunc extends EsqlScalarFunction { | ||
| public class DateTrunc extends EsqlScalarFunction implements SurrogateExpression { | ||
| public static final NamedWriteableRegistry.Entry ENTRY = new NamedWriteableRegistry.Entry( | ||
| Expression.class, | ||
| "DateTrunc", | ||
| DateTrunc::new | ||
| ); | ||
|
|
||
| private static final Logger logger = LogManager.getLogger(DateTrunc.class); | ||
|
|
||
| @FunctionalInterface | ||
| public interface DateTruncFactoryProvider { | ||
| ExpressionEvaluator.Factory apply(Source source, ExpressionEvaluator.Factory lhs, Rounding.Prepared rounding); | ||
|
|
@@ -163,14 +178,23 @@ static Rounding.Prepared createRounding(final Object interval) { | |
|
|
||
| public static Rounding.Prepared createRounding(final Object interval, final ZoneId timeZone) { | ||
| if (interval instanceof Period period) { | ||
| return createRounding(period, timeZone); | ||
| return createRounding(period, timeZone, null, null); | ||
| } else if (interval instanceof Duration duration) { | ||
| return createRounding(duration, timeZone); | ||
| return createRounding(duration, timeZone, null, null); | ||
| } | ||
| throw new IllegalArgumentException("Time interval is not supported"); | ||
| } | ||
|
|
||
| private static Rounding.Prepared createRounding(final Period period, final ZoneId timeZone) { | ||
| public static Rounding.Prepared createRounding(final Object interval, final ZoneId timeZone, Long min, Long max) { | ||
| if (interval instanceof Period period) { | ||
| return createRounding(period, timeZone, min, max); | ||
| } else if (interval instanceof Duration duration) { | ||
| return createRounding(duration, timeZone, min, max); | ||
| } | ||
| throw new IllegalArgumentException("Time interval is not supported"); | ||
| } | ||
|
|
||
| private static Rounding.Prepared createRounding(final Period period, final ZoneId timeZone, Long min, Long max) { | ||
| // Zero or negative intervals are not supported | ||
| if (period == null || period.isNegative() || period.isZero()) { | ||
| throw new IllegalArgumentException("Zero or negative time interval is not supported"); | ||
|
|
@@ -182,6 +206,7 @@ private static Rounding.Prepared createRounding(final Period period, final ZoneI | |
| } | ||
|
|
||
| final Rounding.Builder rounding; | ||
| boolean tryPrepareWithMinMax = true; | ||
| if (period.getDays() == 1) { | ||
| rounding = new Rounding.Builder(Rounding.DateTimeUnit.DAY_OF_MONTH); | ||
| } else if (period.getDays() == 7) { | ||
|
|
@@ -190,6 +215,7 @@ private static Rounding.Prepared createRounding(final Period period, final ZoneI | |
| rounding = new Rounding.Builder(Rounding.DateTimeUnit.WEEK_OF_WEEKYEAR); | ||
| } else if (period.getDays() > 1) { | ||
| rounding = new Rounding.Builder(new TimeValue(period.getDays(), TimeUnit.DAYS)); | ||
| tryPrepareWithMinMax = false; | ||
| } else if (period.getMonths() == 3) { | ||
| // java.time.Period does not have a QUARTERLY period, so a period of 3 months | ||
| // returns a quarterly rounding | ||
|
|
@@ -198,26 +224,36 @@ private static Rounding.Prepared createRounding(final Period period, final ZoneI | |
| rounding = new Rounding.Builder(Rounding.DateTimeUnit.MONTH_OF_YEAR); | ||
| } else if (period.getMonths() > 0) { | ||
| rounding = new Rounding.Builder(Rounding.DateTimeUnit.MONTHS_OF_YEAR, period.getMonths()); | ||
| tryPrepareWithMinMax = false; | ||
| } else if (period.getYears() == 1) { | ||
| rounding = new Rounding.Builder(Rounding.DateTimeUnit.YEAR_OF_CENTURY); | ||
| } else if (period.getYears() > 0) { | ||
| rounding = new Rounding.Builder(Rounding.DateTimeUnit.YEARS_OF_CENTURY, period.getYears()); | ||
| tryPrepareWithMinMax = false; | ||
| } else { | ||
| throw new IllegalArgumentException("Time interval is not supported"); | ||
| } | ||
|
|
||
| rounding.timeZone(timeZone); | ||
| if (min != null && max != null && tryPrepareWithMinMax) { | ||
| // Multiple quantities calendar interval - day/week/month/quarter/year is not supported by PreparedRounding.maybeUseArray, | ||
| // which is called by prepare(min, max), as it may hit an assert. Call prepare(min, max) only for single calendar interval. | ||
| return rounding.build().prepare(min, max); | ||
| } | ||
| return rounding.build().prepareForUnknown(); | ||
| } | ||
|
|
||
| private static Rounding.Prepared createRounding(final Duration duration, final ZoneId timeZone) { | ||
| private static Rounding.Prepared createRounding(final Duration duration, final ZoneId timeZone, Long min, Long max) { | ||
| // Zero or negative intervals are not supported | ||
| if (duration == null || duration.isNegative() || duration.isZero()) { | ||
| throw new IllegalArgumentException("Zero or negative time interval is not supported"); | ||
| } | ||
|
|
||
| final Rounding.Builder rounding = new Rounding.Builder(TimeValue.timeValueMillis(duration.toMillis())); | ||
| rounding.timeZone(timeZone); | ||
| if (min != null && max != null) { | ||
| return rounding.build().prepare(min, max); | ||
| } | ||
| return rounding.build().prepareForUnknown(); | ||
| } | ||
|
|
||
|
|
@@ -249,4 +285,43 @@ public static ExpressionEvaluator.Factory evaluator( | |
| ) { | ||
| return evaluatorMap.get(forType).apply(source, fieldEvaluator, rounding); | ||
| } | ||
|
|
||
| @Override | ||
| public Expression surrogate() { // there is no substitute without SearchStats | ||
fang-xing-esql marked this conversation as resolved.
Outdated
Show resolved
Hide resolved
|
||
| return null; | ||
| } | ||
|
|
||
| @Override | ||
| public Expression surrogate(SearchStats searchStats) { | ||
fang-xing-esql marked this conversation as resolved.
Outdated
Show resolved
Hide resolved
|
||
| if (field() instanceof FieldAttribute fa && fa.field() instanceof MultiTypeEsField == false && isDateTime(fa.dataType())) { | ||
fang-xing-esql marked this conversation as resolved.
Outdated
Show resolved
Hide resolved
|
||
| // Extract min/max from SearchStats | ||
| DataType fieldType = fa.dataType(); | ||
| FieldAttribute.FieldName fieldName = fa.fieldName(); | ||
| var min = searchStats.min(fieldName); | ||
| var max = searchStats.max(fieldName); | ||
| // If min/max is available create rounding with them | ||
| if (min instanceof Long minValue && max instanceof Long maxValue && interval().foldable()) { | ||
|
||
| Object foldedInterval = interval().fold(FoldContext.small() /* TODO remove me */); | ||
| Rounding.Prepared rounding = createRounding(foldedInterval, DEFAULT_TZ, minValue, maxValue); | ||
fang-xing-esql marked this conversation as resolved.
Outdated
Show resolved
Hide resolved
|
||
| long[] roundingPoints = rounding.fixedRoundingPoints(); | ||
| if (roundingPoints == null) { | ||
| logger.trace( | ||
| "Fixed rounding point is null for field {}, minValue {} in string format {} and maxValue {} in string format {}", | ||
| fieldName, | ||
| minValue, | ||
| dateWithTypeToString(minValue, fieldType), | ||
| maxValue, | ||
| dateWithTypeToString(maxValue, fieldType) | ||
| ); | ||
| return null; | ||
| } | ||
| // Convert to round_to function with the roundings | ||
| List<Expression> points = Arrays.stream(roundingPoints) | ||
| .mapToObj(l -> new Literal(Source.EMPTY, l, fieldType)) | ||
| .collect(Collectors.toList()); | ||
| return new RoundTo(source(), field(), points); | ||
| } | ||
| } | ||
| return null; | ||
| } | ||
| } | ||
Uh oh!
There was an error while loading. Please reload this page.