diff --git a/server/src/main/resources/transport/definitions/referable/esql_bucket_offset.csv b/server/src/main/resources/transport/definitions/referable/esql_bucket_offset.csv new file mode 100644 index 0000000000000..3f3c217b1676b --- /dev/null +++ b/server/src/main/resources/transport/definitions/referable/esql_bucket_offset.csv @@ -0,0 +1 @@ +9327000 diff --git a/server/src/main/resources/transport/upper_bounds/9.4.csv b/server/src/main/resources/transport/upper_bounds/9.4.csv index f3d2516de998c..a6e771dee8eeb 100644 --- a/server/src/main/resources/transport/upper_bounds/9.4.csv +++ b/server/src/main/resources/transport/upper_bounds/9.4.csv @@ -1 +1 @@ -azure_openai_oauth_settings,9326000 +esql_bucket_offset,9327000 diff --git a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/expression/function/grouping/Bucket.java b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/expression/function/grouping/Bucket.java index 840e3849fa7c2..e3a922dbc8d83 100644 --- a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/expression/function/grouping/Bucket.java +++ b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/expression/function/grouping/Bucket.java @@ -8,6 +8,7 @@ package org.elasticsearch.xpack.esql.expression.function.grouping; import org.apache.lucene.util.BytesRef; +import org.elasticsearch.TransportVersion; import org.elasticsearch.common.Rounding; import org.elasticsearch.common.io.stream.NamedWriteableRegistry; import org.elasticsearch.common.io.stream.StreamInput; @@ -67,6 +68,7 @@ public class Bucket extends GroupingFunction.EvaluatableGroupingFunction TwoOptionalArguments, ConfigurationFunction { public static final NamedWriteableRegistry.Entry ENTRY = new NamedWriteableRegistry.Entry(Expression.class, "Bucket", Bucket::new); + public static final TransportVersion ESQL_BUCKET_OFFSET = TransportVersion.fromName("esql_bucket_offset"); // TODO maybe we should just cover the whole of representable dates here - like ten years, 100 years, 1000 years, all the way up. // That way you never end up with more than the target number of buckets. @@ -113,6 +115,7 @@ public Rounding apply(ZoneId zoneId) { private final Expression buckets; private final Expression from; private final Expression to; + private final long offset; @FunctionInfo( returnType = { "double", "date", "date_nanos" }, @@ -233,6 +236,18 @@ public Bucket( description = "End of the range. Can be a number, a date or a date expressed as a string." ) Expression to, Configuration configuration + ) { + this(source, field, buckets, from, to, configuration, 0L); + } + + public Bucket( + Source source, + Expression field, + Expression buckets, + Expression from, + Expression to, + Configuration configuration, + long offset ) { super(source, fields(field, buckets, from, to)); this.field = field; @@ -240,6 +255,7 @@ public Bucket( this.from = from; this.to = to; this.configuration = configuration; + this.offset = offset; } private Bucket(StreamInput in) throws IOException { @@ -249,7 +265,8 @@ private Bucket(StreamInput in) throws IOException { in.readNamedWriteable(Expression.class), in.readOptionalNamedWriteable(Expression.class), in.readOptionalNamedWriteable(Expression.class), - ((PlanStreamInput) in).configuration() + ((PlanStreamInput) in).configuration(), + in.getTransportVersion().supports(ESQL_BUCKET_OFFSET) ? in.readZLong() : 0L ); } @@ -273,6 +290,16 @@ public void writeTo(StreamOutput out) throws IOException { out.writeNamedWriteable(buckets); out.writeOptionalNamedWriteable(from); out.writeOptionalNamedWriteable(to); + TransportVersion transportVersion = out.getTransportVersion(); + if (transportVersion.supports(ESQL_BUCKET_OFFSET)) { + out.writeZLong(offset); + } else if (offset != 0L) { + throw new EsqlIllegalArgumentException( + "bucket with offset is not supported in peer node's version [{}]. Upgrade to version [{}] or newer.", + transportVersion, + ESQL_BUCKET_OFFSET + ); + } } @Override @@ -334,16 +361,16 @@ public Rounding.Prepared getDateRounding(FoldContext foldContext, Long min, Long long f = foldToLong(foldContext, from); long t = foldToLong(foldContext, to); if (min != null && max != null) { - return new DateRoundingPicker(b, f, t, configuration.zoneId()).pickRounding().prepare(min, max); + return new DateRoundingPicker(b, f, t, configuration.zoneId(), offset).pickRounding().prepare(min, max); } - return new DateRoundingPicker(b, f, t, configuration.zoneId()).pickRounding().prepareForUnknown(); + return new DateRoundingPicker(b, f, t, configuration.zoneId(), offset).pickRounding().prepareForUnknown(); } else { assert DataType.isTemporalAmount(buckets.dataType()) : "Unexpected span data type [" + buckets.dataType() + "]"; - return DateTrunc.createRounding(buckets.fold(foldContext), configuration.zoneId(), min, max); + return DateTrunc.createRounding(buckets.fold(foldContext), configuration.zoneId(), min, max, offset); } } - private record DateRoundingPicker(int buckets, long from, long to, ZoneId zoneId) { + private record DateRoundingPicker(int buckets, long from, long to, ZoneId zoneId, long offset) { Rounding pickRounding() { Rounding best = findLastOk(DAY_OF_MONTH_OR_FINER); if (best != null) { @@ -511,12 +538,12 @@ public DataType dataType() { public Expression replaceChildren(List newChildren) { Expression from = newChildren.size() > 2 ? newChildren.get(2) : null; Expression to = newChildren.size() > 3 ? newChildren.get(3) : null; - return new Bucket(source(), newChildren.get(0), newChildren.get(1), from, to, configuration); + return new Bucket(source(), newChildren.get(0), newChildren.get(1), from, to, configuration, offset); } @Override protected NodeInfo info() { - return NodeInfo.create(this, Bucket::new, field, buckets, from, to, configuration); + return NodeInfo.create(this, Bucket::new, field, buckets, from, to, configuration, offset); } public Expression field() { @@ -535,14 +562,18 @@ public Expression to() { return to; } + public long offset() { + return offset; + } + @Override public String toString() { - return "Bucket{" + "field=" + field + ", buckets=" + buckets + ", from=" + from + ", to=" + to + '}'; + return "Bucket{" + "field=" + field + ", buckets=" + buckets + ", from=" + from + ", to=" + to + ", offset=" + offset + '}'; } @Override public int hashCode() { - return Objects.hash(getClass(), children(), configuration); + return Objects.hash(getClass(), children(), configuration, offset); } @Override @@ -552,6 +583,6 @@ public boolean equals(Object obj) { } Bucket other = (Bucket) obj; - return configuration.equals(other.configuration); + return configuration.equals(other.configuration) && offset == other.offset; } } diff --git a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/expression/function/scalar/date/DateTrunc.java b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/expression/function/scalar/date/DateTrunc.java index 5c89164899766..4ce6784009d8f 100644 --- a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/expression/function/scalar/date/DateTrunc.java +++ b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/expression/function/scalar/date/DateTrunc.java @@ -168,15 +168,19 @@ public boolean foldable() { } public static Rounding.Prepared createRounding(final Object interval, final ZoneId timeZone, Long min, Long max) { + return createRounding(interval, timeZone, min, max, 0L); + } + + public static Rounding.Prepared createRounding(final Object interval, final ZoneId timeZone, Long min, Long max, long offset) { if (interval instanceof Period period) { - return createRounding(period, timeZone, min, max); + return createRounding(period, timeZone, min, max, offset); } else if (interval instanceof Duration duration) { - return createRounding(duration, timeZone, min, max); + return createRounding(duration, timeZone, min, max, offset); } throw new IllegalArgumentException("Time interval is not supported"); } - private static Rounding.Prepared createRounding(final Period period, final ZoneId timeZone, Long min, Long max) { + private static Rounding.Prepared createRounding(final Period period, final ZoneId timeZone, Long min, Long max, long offset) { // Zero or negative intervals are not supported if (period == null || period.isNegative() || period.isZero()) { throw new IllegalArgumentException("Zero or negative time interval is not supported"); @@ -217,6 +221,7 @@ private static Rounding.Prepared createRounding(final Period period, final ZoneI } rounding.timeZone(timeZone); + rounding.offset(offset); if (min != null && max != null && tryPrepareWithMinMax) { // Multiple quantities calendar interval - day/week/month/quarter/year is not supported by PreparedRounding.maybeUseArray, // which is called by prepare(min, max), as it may hit an assert. Call prepare(min, max) only for single calendar interval. @@ -225,7 +230,7 @@ private static Rounding.Prepared createRounding(final Period period, final ZoneI return rounding.build().prepareForUnknown(); } - private static Rounding.Prepared createRounding(final Duration duration, final ZoneId timeZone, Long min, Long max) { + private static Rounding.Prepared createRounding(final Duration duration, final ZoneId timeZone, Long min, Long max, long offset) { // Zero or negative intervals are not supported if (duration == null || duration.isNegative() || duration.isZero()) { throw new IllegalArgumentException("Zero or negative time interval is not supported"); @@ -233,6 +238,7 @@ private static Rounding.Prepared createRounding(final Duration duration, final Z final Rounding.Builder rounding = new Rounding.Builder(TimeValue.timeValueMillis(duration.toMillis())); rounding.timeZone(timeZone); + rounding.offset(offset); if (min != null && max != null) { return rounding.build().prepare(min, max); } diff --git a/x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/expression/function/grouping/BucketOffsetTests.java b/x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/expression/function/grouping/BucketOffsetTests.java new file mode 100644 index 0000000000000..70c564357e043 --- /dev/null +++ b/x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/expression/function/grouping/BucketOffsetTests.java @@ -0,0 +1,54 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License + * 2.0; you may not use this file except in compliance with the Elastic License + * 2.0. + */ + +package org.elasticsearch.xpack.esql.expression.function.grouping; + +import org.elasticsearch.test.ESTestCase; +import org.elasticsearch.xpack.esql.capabilities.ConfigurationAware; +import org.elasticsearch.xpack.esql.core.expression.FoldContext; +import org.elasticsearch.xpack.esql.core.expression.Literal; +import org.elasticsearch.xpack.esql.core.tree.Source; + +import java.time.Duration; +import java.time.Instant; + +public class BucketOffsetTests extends ESTestCase { + + public void testDurationSpanUsesOffset() { + Bucket bucket = new Bucket( + Source.EMPTY, + Literal.dateTime(Source.EMPTY, Instant.parse("2024-01-01T00:00:00Z")), + Literal.timeDuration(Source.EMPTY, Duration.ofHours(1)), + null, + null, + ConfigurationAware.CONFIGURATION_MARKER, + Duration.ofMinutes(30).toMillis() + ); + + long value = Instant.parse("2024-01-01T01:20:00Z").toEpochMilli(); + long rounded = bucket.getDateRounding(FoldContext.small(), null, null).round(value); + assertEquals(Instant.parse("2024-01-01T00:30:00Z").toEpochMilli(), rounded); + } + + public void testAutoSpanIgnoresOffset() { + // The auto-span (numeric bucket count) path does not support offset yet. + // TSTEP only uses the duration span path where offset is applied. + Bucket bucket = new Bucket( + Source.EMPTY, + Literal.dateTime(Source.EMPTY, Instant.parse("2024-01-01T00:00:00Z")), + Literal.integer(Source.EMPTY, 4), + Literal.dateTime(Source.EMPTY, Instant.parse("2024-01-01T00:00:00Z")), + Literal.dateTime(Source.EMPTY, Instant.parse("2024-01-01T04:00:00Z")), + ConfigurationAware.CONFIGURATION_MARKER, + Duration.ofMinutes(30).toMillis() + ); + + long value = Instant.parse("2024-01-01T01:20:00Z").toEpochMilli(); + long rounded = bucket.getDateRounding(FoldContext.small(), null, null).round(value); + assertEquals(Instant.parse("2024-01-01T01:00:00Z").toEpochMilli(), rounded); + } +} diff --git a/x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/expression/function/grouping/BucketSerializationTests.java b/x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/expression/function/grouping/BucketSerializationTests.java index 5cb7b91e8a191..c72a13acf660e 100644 --- a/x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/expression/function/grouping/BucketSerializationTests.java +++ b/x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/expression/function/grouping/BucketSerializationTests.java @@ -7,12 +7,19 @@ package org.elasticsearch.xpack.esql.expression.function.grouping; +import org.elasticsearch.TransportVersion; +import org.elasticsearch.test.TransportVersionUtils; +import org.elasticsearch.xpack.esql.EsqlIllegalArgumentException; import org.elasticsearch.xpack.esql.core.expression.Expression; import org.elasticsearch.xpack.esql.core.tree.Source; import org.elasticsearch.xpack.esql.expression.AbstractExpressionSerializationTests; import org.elasticsearch.xpack.esql.session.Configuration; import java.io.IOException; +import java.time.Duration; + +import static org.hamcrest.Matchers.containsString; +import static org.hamcrest.Matchers.equalTo; public class BucketSerializationTests extends AbstractExpressionSerializationTests { @Override @@ -26,7 +33,8 @@ public static Bucket createRandomBucket(Configuration configuration) { Expression buckets = randomChild(); Expression from = randomChild(); Expression to = randomChild(); - return new Bucket(source, field, buckets, from, to, configuration); + long offset = randomLongBetween(-Duration.ofDays(1).toMillis(), Duration.ofDays(1).toMillis()); + return new Bucket(source, field, buckets, from, to, configuration, offset); } @Override @@ -36,12 +44,39 @@ protected Bucket mutateInstance(Bucket instance) throws IOException { Expression buckets = instance.buckets(); Expression from = instance.from(); Expression to = instance.to(); - switch (between(0, 3)) { + long offset = instance.offset(); + switch (between(0, 4)) { case 0 -> field = randomValueOtherThan(field, AbstractExpressionSerializationTests::randomChild); case 1 -> buckets = randomValueOtherThan(buckets, AbstractExpressionSerializationTests::randomChild); case 2 -> from = randomValueOtherThan(from, AbstractExpressionSerializationTests::randomChild); case 3 -> to = randomValueOtherThan(to, AbstractExpressionSerializationTests::randomChild); + case 4 -> offset = randomValueOtherThan( + offset, + () -> randomLongBetween(-Duration.ofDays(1).toMillis(), Duration.ofDays(1).toMillis()) + ); } - return new Bucket(source, field, buckets, from, to, configuration()); + return new Bucket(source, field, buckets, from, to, configuration(), offset); + } + + public void testOffsetBackcompatSerialization() throws IOException { + Bucket instance = new Bucket(randomSource(), randomChild(), randomChild(), randomChild(), randomChild(), configuration(), 0L); + TransportVersion oldVersion = TransportVersionUtils.getPreviousVersion(Bucket.ESQL_BUCKET_OFFSET); + Bucket copy = copyInstance(instance, oldVersion); + assertThat(copy.offset(), equalTo(0L)); + } + + public void testOffsetBackcompatSerializationRejectsNonZeroOffset() throws IOException { + Bucket instance = new Bucket( + randomSource(), + randomChild(), + randomChild(), + randomChild(), + randomChild(), + configuration(), + randomLongBetween(1, 1000) + ); + TransportVersion oldVersion = TransportVersionUtils.getPreviousVersion(Bucket.ESQL_BUCKET_OFFSET); + EsqlIllegalArgumentException e = expectThrows(EsqlIllegalArgumentException.class, () -> copyInstance(instance, oldVersion)); + assertThat(e.getMessage(), containsString("bucket with offset is not supported in peer node's version [" + oldVersion + "]")); } } diff --git a/x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/tree/EsqlNodeSubclassTests.java b/x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/tree/EsqlNodeSubclassTests.java index 592c5a9c4915d..9f34c6d9c222e 100644 --- a/x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/tree/EsqlNodeSubclassTests.java +++ b/x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/tree/EsqlNodeSubclassTests.java @@ -518,6 +518,9 @@ public void accept(Page page) { if (argClass == int.class) { return randomInt(); } + if (argClass == long.class) { + return randomLong(); + } if (argClass == String.class) { // Nor strings return randomAlphaOfLength(5);