Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
9327000
2 changes: 1 addition & 1 deletion server/src/main/resources/transport/upper_bounds/9.4.csv
Original file line number Diff line number Diff line change
@@ -1 +1 @@
azure_openai_oauth_settings,9326000
esql_bucket_offset,9327000
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
package org.elasticsearch.xpack.esql.expression.function.grouping;

import org.apache.lucene.util.BytesRef;
import org.elasticsearch.TransportVersion;
import org.elasticsearch.common.Rounding;
import org.elasticsearch.common.io.stream.NamedWriteableRegistry;
import org.elasticsearch.common.io.stream.StreamInput;
Expand Down Expand Up @@ -67,6 +68,7 @@ public class Bucket extends GroupingFunction.EvaluatableGroupingFunction
TwoOptionalArguments,
ConfigurationFunction {
public static final NamedWriteableRegistry.Entry ENTRY = new NamedWriteableRegistry.Entry(Expression.class, "Bucket", Bucket::new);
public static final TransportVersion ESQL_BUCKET_OFFSET = TransportVersion.fromName("esql_bucket_offset");

// TODO maybe we should just cover the whole of representable dates here - like ten years, 100 years, 1000 years, all the way up.
// That way you never end up with more than the target number of buckets.
Expand Down Expand Up @@ -113,6 +115,7 @@ public Rounding apply(ZoneId zoneId) {
private final Expression buckets;
private final Expression from;
private final Expression to;
private final long offset;

@FunctionInfo(
returnType = { "double", "date", "date_nanos" },
Expand Down Expand Up @@ -233,13 +236,26 @@ public Bucket(
description = "End of the range. Can be a number, a date or a date expressed as a string."
) Expression to,
Configuration configuration
) {
this(source, field, buckets, from, to, configuration, 0L);
}

public Bucket(
Source source,
Expression field,
Expression buckets,
Expression from,
Expression to,
Configuration configuration,
long offset
) {
super(source, fields(field, buckets, from, to));
this.field = field;
this.buckets = buckets;
this.from = from;
this.to = to;
this.configuration = configuration;
this.offset = offset;
}

private Bucket(StreamInput in) throws IOException {
Expand All @@ -249,7 +265,8 @@ private Bucket(StreamInput in) throws IOException {
in.readNamedWriteable(Expression.class),
in.readOptionalNamedWriteable(Expression.class),
in.readOptionalNamedWriteable(Expression.class),
((PlanStreamInput) in).configuration()
((PlanStreamInput) in).configuration(),
in.getTransportVersion().supports(ESQL_BUCKET_OFFSET) ? in.readZLong() : 0L
);
}

Expand All @@ -273,6 +290,16 @@ public void writeTo(StreamOutput out) throws IOException {
out.writeNamedWriteable(buckets);
out.writeOptionalNamedWriteable(from);
out.writeOptionalNamedWriteable(to);
TransportVersion transportVersion = out.getTransportVersion();
if (transportVersion.supports(ESQL_BUCKET_OFFSET)) {
out.writeZLong(offset);
} else if (offset != 0L) {
throw new EsqlIllegalArgumentException(
"bucket with offset is not supported in peer node's version [{}]. Upgrade to version [{}] or newer.",
transportVersion,
ESQL_BUCKET_OFFSET
);
}
}

@Override
Expand Down Expand Up @@ -334,16 +361,16 @@ public Rounding.Prepared getDateRounding(FoldContext foldContext, Long min, Long
long f = foldToLong(foldContext, from);
long t = foldToLong(foldContext, to);
if (min != null && max != null) {
return new DateRoundingPicker(b, f, t, configuration.zoneId()).pickRounding().prepare(min, max);
return new DateRoundingPicker(b, f, t, configuration.zoneId(), offset).pickRounding().prepare(min, max);
}
return new DateRoundingPicker(b, f, t, configuration.zoneId()).pickRounding().prepareForUnknown();
return new DateRoundingPicker(b, f, t, configuration.zoneId(), offset).pickRounding().prepareForUnknown();
} else {
assert DataType.isTemporalAmount(buckets.dataType()) : "Unexpected span data type [" + buckets.dataType() + "]";
return DateTrunc.createRounding(buckets.fold(foldContext), configuration.zoneId(), min, max);
return DateTrunc.createRounding(buckets.fold(foldContext), configuration.zoneId(), min, max, offset);
}
}

private record DateRoundingPicker(int buckets, long from, long to, ZoneId zoneId) {
private record DateRoundingPicker(int buckets, long from, long to, ZoneId zoneId, long offset) {
Rounding pickRounding() {
Rounding best = findLastOk(DAY_OF_MONTH_OR_FINER);
if (best != null) {
Expand Down Expand Up @@ -511,12 +538,12 @@ public DataType dataType() {
public Expression replaceChildren(List<Expression> newChildren) {
Expression from = newChildren.size() > 2 ? newChildren.get(2) : null;
Expression to = newChildren.size() > 3 ? newChildren.get(3) : null;
return new Bucket(source(), newChildren.get(0), newChildren.get(1), from, to, configuration);
return new Bucket(source(), newChildren.get(0), newChildren.get(1), from, to, configuration, offset);
}

@Override
protected NodeInfo<? extends Expression> info() {
return NodeInfo.create(this, Bucket::new, field, buckets, from, to, configuration);
return NodeInfo.create(this, Bucket::new, field, buckets, from, to, configuration, offset);
}

public Expression field() {
Expand All @@ -535,14 +562,18 @@ public Expression to() {
return to;
}

public long offset() {
return offset;
}

@Override
public String toString() {
return "Bucket{" + "field=" + field + ", buckets=" + buckets + ", from=" + from + ", to=" + to + '}';
return "Bucket{" + "field=" + field + ", buckets=" + buckets + ", from=" + from + ", to=" + to + ", offset=" + offset + '}';
}

@Override
public int hashCode() {
return Objects.hash(getClass(), children(), configuration);
return Objects.hash(getClass(), children(), configuration, offset);
}

@Override
Expand All @@ -552,6 +583,6 @@ public boolean equals(Object obj) {
}
Bucket other = (Bucket) obj;

return configuration.equals(other.configuration);
return configuration.equals(other.configuration) && offset == other.offset;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -168,15 +168,19 @@ public boolean foldable() {
}

public static Rounding.Prepared createRounding(final Object interval, final ZoneId timeZone, Long min, Long max) {
return createRounding(interval, timeZone, min, max, 0L);
}

public static Rounding.Prepared createRounding(final Object interval, final ZoneId timeZone, Long min, Long max, long offset) {
if (interval instanceof Period period) {
return createRounding(period, timeZone, min, max);
return createRounding(period, timeZone, min, max, offset);
} else if (interval instanceof Duration duration) {
return createRounding(duration, timeZone, min, max);
return createRounding(duration, timeZone, min, max, offset);
}
throw new IllegalArgumentException("Time interval is not supported");
}

private static Rounding.Prepared createRounding(final Period period, final ZoneId timeZone, Long min, Long max) {
private static Rounding.Prepared createRounding(final Period period, final ZoneId timeZone, Long min, Long max, long offset) {
// Zero or negative intervals are not supported
if (period == null || period.isNegative() || period.isZero()) {
throw new IllegalArgumentException("Zero or negative time interval is not supported");
Expand Down Expand Up @@ -217,6 +221,7 @@ private static Rounding.Prepared createRounding(final Period period, final ZoneI
}

rounding.timeZone(timeZone);
rounding.offset(offset);
if (min != null && max != null && tryPrepareWithMinMax) {
// Multiple quantities calendar interval - day/week/month/quarter/year is not supported by PreparedRounding.maybeUseArray,
// which is called by prepare(min, max), as it may hit an assert. Call prepare(min, max) only for single calendar interval.
Expand All @@ -225,14 +230,15 @@ private static Rounding.Prepared createRounding(final Period period, final ZoneI
return rounding.build().prepareForUnknown();
}

private static Rounding.Prepared createRounding(final Duration duration, final ZoneId timeZone, Long min, Long max) {
private static Rounding.Prepared createRounding(final Duration duration, final ZoneId timeZone, Long min, Long max, long offset) {
// Zero or negative intervals are not supported
if (duration == null || duration.isNegative() || duration.isZero()) {
throw new IllegalArgumentException("Zero or negative time interval is not supported");
}

final Rounding.Builder rounding = new Rounding.Builder(TimeValue.timeValueMillis(duration.toMillis()));
rounding.timeZone(timeZone);
rounding.offset(offset);
if (min != null && max != null) {
return rounding.build().prepare(min, max);
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,54 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License
* 2.0; you may not use this file except in compliance with the Elastic License
* 2.0.
*/

package org.elasticsearch.xpack.esql.expression.function.grouping;

import org.elasticsearch.test.ESTestCase;
import org.elasticsearch.xpack.esql.capabilities.ConfigurationAware;
import org.elasticsearch.xpack.esql.core.expression.FoldContext;
import org.elasticsearch.xpack.esql.core.expression.Literal;
import org.elasticsearch.xpack.esql.core.tree.Source;

import java.time.Duration;
import java.time.Instant;

public class BucketOffsetTests extends ESTestCase {

public void testDurationSpanUsesOffset() {
Bucket bucket = new Bucket(
Source.EMPTY,
Literal.dateTime(Source.EMPTY, Instant.parse("2024-01-01T00:00:00Z")),
Literal.timeDuration(Source.EMPTY, Duration.ofHours(1)),
null,
null,
ConfigurationAware.CONFIGURATION_MARKER,
Duration.ofMinutes(30).toMillis()
);

long value = Instant.parse("2024-01-01T01:20:00Z").toEpochMilli();
long rounded = bucket.getDateRounding(FoldContext.small(), null, null).round(value);
assertEquals(Instant.parse("2024-01-01T00:30:00Z").toEpochMilli(), rounded);
}

public void testAutoSpanIgnoresOffset() {
// The auto-span (numeric bucket count) path does not support offset yet.
// TSTEP only uses the duration span path where offset is applied.
Bucket bucket = new Bucket(
Source.EMPTY,
Literal.dateTime(Source.EMPTY, Instant.parse("2024-01-01T00:00:00Z")),
Literal.integer(Source.EMPTY, 4),
Literal.dateTime(Source.EMPTY, Instant.parse("2024-01-01T00:00:00Z")),
Literal.dateTime(Source.EMPTY, Instant.parse("2024-01-01T04:00:00Z")),
ConfigurationAware.CONFIGURATION_MARKER,
Duration.ofMinutes(30).toMillis()
);

long value = Instant.parse("2024-01-01T01:20:00Z").toEpochMilli();
long rounded = bucket.getDateRounding(FoldContext.small(), null, null).round(value);
assertEquals(Instant.parse("2024-01-01T01:00:00Z").toEpochMilli(), rounded);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -7,12 +7,19 @@

package org.elasticsearch.xpack.esql.expression.function.grouping;

import org.elasticsearch.TransportVersion;
import org.elasticsearch.test.TransportVersionUtils;
import org.elasticsearch.xpack.esql.EsqlIllegalArgumentException;
import org.elasticsearch.xpack.esql.core.expression.Expression;
import org.elasticsearch.xpack.esql.core.tree.Source;
import org.elasticsearch.xpack.esql.expression.AbstractExpressionSerializationTests;
import org.elasticsearch.xpack.esql.session.Configuration;

import java.io.IOException;
import java.time.Duration;

import static org.hamcrest.Matchers.containsString;
import static org.hamcrest.Matchers.equalTo;

public class BucketSerializationTests extends AbstractExpressionSerializationTests<Bucket> {
@Override
Expand All @@ -26,7 +33,8 @@ public static Bucket createRandomBucket(Configuration configuration) {
Expression buckets = randomChild();
Expression from = randomChild();
Expression to = randomChild();
return new Bucket(source, field, buckets, from, to, configuration);
long offset = randomLongBetween(-Duration.ofDays(1).toMillis(), Duration.ofDays(1).toMillis());
return new Bucket(source, field, buckets, from, to, configuration, offset);
}

@Override
Expand All @@ -36,12 +44,39 @@ protected Bucket mutateInstance(Bucket instance) throws IOException {
Expression buckets = instance.buckets();
Expression from = instance.from();
Expression to = instance.to();
switch (between(0, 3)) {
long offset = instance.offset();
switch (between(0, 4)) {
case 0 -> field = randomValueOtherThan(field, AbstractExpressionSerializationTests::randomChild);
case 1 -> buckets = randomValueOtherThan(buckets, AbstractExpressionSerializationTests::randomChild);
case 2 -> from = randomValueOtherThan(from, AbstractExpressionSerializationTests::randomChild);
case 3 -> to = randomValueOtherThan(to, AbstractExpressionSerializationTests::randomChild);
case 4 -> offset = randomValueOtherThan(
offset,
() -> randomLongBetween(-Duration.ofDays(1).toMillis(), Duration.ofDays(1).toMillis())
);
}
return new Bucket(source, field, buckets, from, to, configuration());
return new Bucket(source, field, buckets, from, to, configuration(), offset);
}

public void testOffsetBackcompatSerialization() throws IOException {
Bucket instance = new Bucket(randomSource(), randomChild(), randomChild(), randomChild(), randomChild(), configuration(), 0L);
TransportVersion oldVersion = TransportVersionUtils.getPreviousVersion(Bucket.ESQL_BUCKET_OFFSET);
Bucket copy = copyInstance(instance, oldVersion);
assertThat(copy.offset(), equalTo(0L));
}

public void testOffsetBackcompatSerializationRejectsNonZeroOffset() throws IOException {
Bucket instance = new Bucket(
randomSource(),
randomChild(),
randomChild(),
randomChild(),
randomChild(),
configuration(),
randomLongBetween(1, 1000)
);
TransportVersion oldVersion = TransportVersionUtils.getPreviousVersion(Bucket.ESQL_BUCKET_OFFSET);
EsqlIllegalArgumentException e = expectThrows(EsqlIllegalArgumentException.class, () -> copyInstance(instance, oldVersion));
assertThat(e.getMessage(), containsString("bucket with offset is not supported in peer node's version [" + oldVersion + "]"));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -518,6 +518,9 @@ public void accept(Page page) {
if (argClass == int.class) {
return randomInt();
}
if (argClass == long.class) {
return randomLong();
}
if (argClass == String.class) {
// Nor strings
return randomAlphaOfLength(5);
Expand Down
Loading