Skip to content

Commit e6ee2fe

Browse files
authored
Support date trunc in TS (#138947)
With this change, TS will treat date_trunc as equivalent to bucket or tbucket. For example: ``` TS .. | STATS ... BY ... bucket=DATE_TRUNC(1m, @timestamp) ``` is equivalent to ``` TS .. | STATS ... BY ... bucket=TBUCKET(1m) ```
1 parent c7d5389 commit e6ee2fe

File tree

4 files changed

+107
-0
lines changed

4 files changed

+107
-0
lines changed

docs/changelog/138947.yaml

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,5 @@
1+
pr: 138947
2+
summary: Support date trunc in TS
3+
area: ES|QL
4+
type: bug
5+
issues: []

x-pack/plugin/esql/src/internalClusterTest/java/org/elasticsearch/xpack/esql/action/TimeSeriesIT.java

Lines changed: 52 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -28,6 +28,8 @@
2828
import org.hamcrest.Matchers;
2929
import org.junit.Before;
3030

31+
import java.math.BigDecimal;
32+
import java.math.RoundingMode;
3133
import java.util.ArrayList;
3234
import java.util.Collection;
3335
import java.util.Comparator;
@@ -36,6 +38,7 @@
3638
import java.util.Map;
3739
import java.util.Objects;
3840
import java.util.Set;
41+
import java.util.stream.Collectors;
3942

4043
import static org.elasticsearch.index.mapper.DateFieldMapper.DEFAULT_DATE_TIME_FORMATTER;
4144
import static org.hamcrest.Matchers.aMapWithSize;
@@ -881,6 +884,55 @@ record Key(String project, String cluster, String tbucket) {
881884
assertThat(actualAvg, closeTo(expectedAvg, 0.5));
882885
}
883886
}
887+
884888
}
885889

890+
public void testWithDateTrunc() {
891+
try (
892+
var resp1 = run("TS host* | STATS avg(avg_over_time(cpu)) BY cluster, TBUCKET(1minute)");
893+
var resp2 = run("TS host* | EVAL tbucket = DATE_TRUNC(1m, @timestamp) | STATS avg(avg_over_time(cpu)) BY cluster, tbucket");
894+
) {
895+
record Key(String cluster, String tbucket) {
896+
897+
}
898+
Map<Key, Double> results1 = EsqlTestUtils.getValuesList(resp1)
899+
.stream()
900+
.collect(Collectors.toMap(e -> new Key((String) e.get(1), (String) e.get(2)), e -> round((Double) e.get(0))));
901+
Map<Key, Double> results2 = EsqlTestUtils.getValuesList(resp2)
902+
.stream()
903+
.collect(Collectors.toMap(e -> new Key((String) e.get(1), (String) e.get(2)), e -> round((Double) e.get(0))));
904+
assertThat(results1, equalTo(results2));
905+
}
906+
try (
907+
var resp1 = run("TS host* | STATS max(avg_over_time(cpu)) BY TBUCKET(1minute)");
908+
var resp2 = run("TS host* | EVAL tbucket = DATE_TRUNC(1m, @timestamp) | STATS max(avg_over_time(cpu)) BY tbucket");
909+
) {
910+
Map<String, Double> results1 = EsqlTestUtils.getValuesList(resp1)
911+
.stream()
912+
.collect(Collectors.toMap(e -> (String) e.get(1), e -> round((Double) e.get(0))));
913+
Map<String, Double> results2 = EsqlTestUtils.getValuesList(resp2)
914+
.stream()
915+
.collect(Collectors.toMap(e -> (String) e.get(1), e -> round((Double) e.get(0))));
916+
assertThat(results1, equalTo(results2));
917+
}
918+
try (
919+
var resp1 = run("TS host* | STATS avg(avg_over_time(cpu, 5m)) BY cluster, TBUCKET(1minute)");
920+
var resp2 = run("TS host* | EVAL tbucket = DATE_TRUNC(1m, @timestamp) | STATS avg(avg_over_time(cpu, 5m)) BY cluster, tbucket");
921+
) {
922+
record Key(String cluster, String tbucket) {
923+
924+
}
925+
Map<Key, Double> results1 = EsqlTestUtils.getValuesList(resp1)
926+
.stream()
927+
.collect(Collectors.toMap(e -> new Key((String) e.get(1), (String) e.get(2)), e -> round((Double) e.get(0))));
928+
Map<Key, Double> results2 = EsqlTestUtils.getValuesList(resp2)
929+
.stream()
930+
.collect(Collectors.toMap(e -> new Key((String) e.get(1), (String) e.get(2)), e -> round((Double) e.get(0))));
931+
assertThat(results1, equalTo(results2));
932+
}
933+
}
934+
935+
private static double round(double value) {
936+
return new BigDecimal(value).setScale(6, RoundingMode.HALF_UP).doubleValue();
937+
}
886938
}

x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/optimizer/rules/logical/TranslateTimeSeriesAggregate.java

Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -30,6 +30,7 @@
3030
import org.elasticsearch.xpack.esql.expression.function.aggregate.Values;
3131
import org.elasticsearch.xpack.esql.expression.function.grouping.Bucket;
3232
import org.elasticsearch.xpack.esql.expression.function.grouping.TBucket;
33+
import org.elasticsearch.xpack.esql.expression.function.scalar.date.DateTrunc;
3334
import org.elasticsearch.xpack.esql.expression.function.scalar.histogram.ExtractHistogramComponent;
3435
import org.elasticsearch.xpack.esql.expression.function.scalar.internal.PackDimension;
3536
import org.elasticsearch.xpack.esql.expression.function.scalar.internal.UnpackDimension;
@@ -296,6 +297,19 @@ protected LogicalPlan rule(TimeSeriesAggregate aggregate, LogicalOptimizerContex
296297
}
297298
Bucket bucket = (Bucket) tbucket.surrogate();
298299
timeBucketRef.set(new Alias(e.source(), bucket.functionName(), bucket, e.id()));
300+
} else if (child instanceof DateTrunc dateTrunc && dateTrunc.field().equals(timestamp.get())) {
301+
if (timeBucketRef.get() != null) {
302+
throw new IllegalArgumentException("expected at most one time bucket");
303+
}
304+
Bucket bucket = new Bucket(
305+
dateTrunc.source(),
306+
dateTrunc.field(),
307+
dateTrunc.interval(),
308+
null,
309+
null,
310+
dateTrunc.configuration()
311+
);
312+
timeBucketRef.set(new Alias(e.source(), bucket.functionName(), bucket, e.id()));
299313
}
300314
}
301315
});

x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/optimizer/LogicalPlanOptimizerTests.java

Lines changed: 36 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -69,6 +69,7 @@
6969
import org.elasticsearch.xpack.esql.expression.function.scalar.convert.ToInteger;
7070
import org.elasticsearch.xpack.esql.expression.function.scalar.convert.ToLong;
7171
import org.elasticsearch.xpack.esql.expression.function.scalar.convert.ToString;
72+
import org.elasticsearch.xpack.esql.expression.function.scalar.date.DateTrunc;
7273
import org.elasticsearch.xpack.esql.expression.function.scalar.histogram.ExtractHistogramComponent;
7374
import org.elasticsearch.xpack.esql.expression.function.scalar.histogram.HistogramPercentile;
7475
import org.elasticsearch.xpack.esql.expression.function.scalar.internal.PackDimension;
@@ -7816,6 +7817,41 @@ public void testTranslateWithInlineFilter() {
78167817
assertThat(lastOverTime.filter(), instanceOf(Equals.class));
78177818
}
78187819

7820+
public void testTranslateWithDateTrunc() {
7821+
var query = randomFrom("""
7822+
TS k8s
7823+
| EVAL tbucket = date_trunc(1m, @timestamp)
7824+
| STATS sum(last_over_time(network.bytes_in)) WHERE cluster == "prod" BY tbucket
7825+
| LIMIT 10
7826+
""", """
7827+
TS k8s
7828+
| STATS sum(last_over_time(network.bytes_in)) WHERE cluster == "prod" BY tbucket=date_trunc(1m, @timestamp)
7829+
| LIMIT 10
7830+
""");
7831+
var plan = logicalOptimizerWithLatestVersion.optimize(metricsAnalyzer.analyze(parser.createStatement(query)));
7832+
var limit = as(plan, Limit.class);
7833+
Aggregate finalAgg = as(limit.child(), Aggregate.class);
7834+
assertThat(finalAgg, not(instanceOf(TimeSeriesAggregate.class)));
7835+
TimeSeriesAggregate aggsByTsid = as(finalAgg.child(), TimeSeriesAggregate.class);
7836+
assertNotNull(aggsByTsid.timeBucket());
7837+
assertThat(aggsByTsid.timeBucket().buckets().fold(FoldContext.small()), equalTo(Duration.ofMinutes(1)));
7838+
Eval evalBucket = as(aggsByTsid.child(), Eval.class);
7839+
assertThat(evalBucket.fields(), hasSize(1));
7840+
EsRelation relation = as(evalBucket.child(), EsRelation.class);
7841+
assertThat(relation.indexMode(), equalTo(IndexMode.STANDARD));
7842+
7843+
var sum = as(Alias.unwrap(finalAgg.aggregates().get(0)), Sum.class);
7844+
assertFalse(sum.hasFilter());
7845+
7846+
LastOverTime lastOverTime = as(Alias.unwrap(aggsByTsid.aggregates().get(0)), LastOverTime.class);
7847+
assertThat(Expressions.attribute(lastOverTime.field()).name(), equalTo("network.bytes_in"));
7848+
assertThat(Expressions.attribute(aggsByTsid.groupings().get(1)).id(), equalTo(evalBucket.fields().get(0).id()));
7849+
DateTrunc dateTrunc = as(Alias.unwrap(evalBucket.fields().get(0)), DateTrunc.class);
7850+
assertThat(Expressions.attribute(dateTrunc.field()).name(), equalTo("@timestamp"));
7851+
assertTrue(lastOverTime.hasFilter());
7852+
assertThat(lastOverTime.filter(), instanceOf(Equals.class));
7853+
}
7854+
78197855
public void testTranslateWithInlineFilterWithImplicitLastOverTime() {
78207856
var query = """
78217857
TS k8s | STATS avg(network.bytes_in) WHERE cluster == "prod" BY bucket(@timestamp, 1 minute)

0 commit comments

Comments
 (0)