Skip to content

Commit 558b7c0

Browse files
committed
Add offset support to BUCKET function for step-aligned time grouping
1 parent af22a82 commit 558b7c0

File tree

7 files changed

+148
-18
lines changed

7 files changed

+148
-18
lines changed
Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1 @@
1+
9327000
Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1 +1 @@
1-
azure_openai_oauth_settings,9326000
1+
esql_bucket_offset,9327000

x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/expression/function/grouping/Bucket.java

Lines changed: 41 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,7 @@
88
package org.elasticsearch.xpack.esql.expression.function.grouping;
99

1010
import org.apache.lucene.util.BytesRef;
11+
import org.elasticsearch.TransportVersion;
1112
import org.elasticsearch.common.Rounding;
1213
import org.elasticsearch.common.io.stream.NamedWriteableRegistry;
1314
import org.elasticsearch.common.io.stream.StreamInput;
@@ -67,6 +68,7 @@ public class Bucket extends GroupingFunction.EvaluatableGroupingFunction
6768
TwoOptionalArguments,
6869
ConfigurationFunction {
6970
public static final NamedWriteableRegistry.Entry ENTRY = new NamedWriteableRegistry.Entry(Expression.class, "Bucket", Bucket::new);
71+
public static final TransportVersion ESQL_BUCKET_OFFSET = TransportVersion.fromName("esql_bucket_offset");
7072

7173
// TODO maybe we should just cover the whole of representable dates here - like ten years, 100 years, 1000 years, all the way up.
7274
// That way you never end up with more than the target number of buckets.
@@ -113,6 +115,7 @@ public Rounding apply(ZoneId zoneId) {
113115
private final Expression buckets;
114116
private final Expression from;
115117
private final Expression to;
118+
private final long offset;
116119

117120
@FunctionInfo(
118121
returnType = { "double", "date", "date_nanos" },
@@ -233,13 +236,26 @@ public Bucket(
233236
description = "End of the range. Can be a number, a date or a date expressed as a string."
234237
) Expression to,
235238
Configuration configuration
239+
) {
240+
this(source, field, buckets, from, to, configuration, 0L);
241+
}
242+
243+
public Bucket(
244+
Source source,
245+
Expression field,
246+
Expression buckets,
247+
Expression from,
248+
Expression to,
249+
Configuration configuration,
250+
long offset
236251
) {
237252
super(source, fields(field, buckets, from, to));
238253
this.field = field;
239254
this.buckets = buckets;
240255
this.from = from;
241256
this.to = to;
242257
this.configuration = configuration;
258+
this.offset = offset;
243259
}
244260

245261
private Bucket(StreamInput in) throws IOException {
@@ -249,7 +265,8 @@ private Bucket(StreamInput in) throws IOException {
249265
in.readNamedWriteable(Expression.class),
250266
in.readOptionalNamedWriteable(Expression.class),
251267
in.readOptionalNamedWriteable(Expression.class),
252-
((PlanStreamInput) in).configuration()
268+
((PlanStreamInput) in).configuration(),
269+
in.getTransportVersion().supports(ESQL_BUCKET_OFFSET) ? in.readZLong() : 0L
253270
);
254271
}
255272

@@ -273,6 +290,16 @@ public void writeTo(StreamOutput out) throws IOException {
273290
out.writeNamedWriteable(buckets);
274291
out.writeOptionalNamedWriteable(from);
275292
out.writeOptionalNamedWriteable(to);
293+
TransportVersion transportVersion = out.getTransportVersion();
294+
if (transportVersion.supports(ESQL_BUCKET_OFFSET)) {
295+
out.writeZLong(offset);
296+
} else if (offset != 0L) {
297+
throw new EsqlIllegalArgumentException(
298+
"bucket with offset is not supported in peer node's version [{}]. Upgrade to version [{}] or newer.",
299+
transportVersion,
300+
ESQL_BUCKET_OFFSET
301+
);
302+
}
276303
}
277304

278305
@Override
@@ -334,16 +361,16 @@ public Rounding.Prepared getDateRounding(FoldContext foldContext, Long min, Long
334361
long f = foldToLong(foldContext, from);
335362
long t = foldToLong(foldContext, to);
336363
if (min != null && max != null) {
337-
return new DateRoundingPicker(b, f, t, configuration.zoneId()).pickRounding().prepare(min, max);
364+
return new DateRoundingPicker(b, f, t, configuration.zoneId(), offset).pickRounding().prepare(min, max);
338365
}
339-
return new DateRoundingPicker(b, f, t, configuration.zoneId()).pickRounding().prepareForUnknown();
366+
return new DateRoundingPicker(b, f, t, configuration.zoneId(), offset).pickRounding().prepareForUnknown();
340367
} else {
341368
assert DataType.isTemporalAmount(buckets.dataType()) : "Unexpected span data type [" + buckets.dataType() + "]";
342-
return DateTrunc.createRounding(buckets.fold(foldContext), configuration.zoneId(), min, max);
369+
return DateTrunc.createRounding(buckets.fold(foldContext), configuration.zoneId(), min, max, offset);
343370
}
344371
}
345372

346-
private record DateRoundingPicker(int buckets, long from, long to, ZoneId zoneId) {
373+
private record DateRoundingPicker(int buckets, long from, long to, ZoneId zoneId, long offset) {
347374
Rounding pickRounding() {
348375
Rounding best = findLastOk(DAY_OF_MONTH_OR_FINER);
349376
if (best != null) {
@@ -511,12 +538,12 @@ public DataType dataType() {
511538
public Expression replaceChildren(List<Expression> newChildren) {
512539
Expression from = newChildren.size() > 2 ? newChildren.get(2) : null;
513540
Expression to = newChildren.size() > 3 ? newChildren.get(3) : null;
514-
return new Bucket(source(), newChildren.get(0), newChildren.get(1), from, to, configuration);
541+
return new Bucket(source(), newChildren.get(0), newChildren.get(1), from, to, configuration, offset);
515542
}
516543

517544
@Override
518545
protected NodeInfo<? extends Expression> info() {
519-
return NodeInfo.create(this, Bucket::new, field, buckets, from, to, configuration);
546+
return NodeInfo.create(this, Bucket::new, field, buckets, from, to, configuration, offset);
520547
}
521548

522549
public Expression field() {
@@ -535,14 +562,18 @@ public Expression to() {
535562
return to;
536563
}
537564

565+
public long offset() {
566+
return offset;
567+
}
568+
538569
@Override
539570
public String toString() {
540-
return "Bucket{" + "field=" + field + ", buckets=" + buckets + ", from=" + from + ", to=" + to + '}';
571+
return "Bucket{" + "field=" + field + ", buckets=" + buckets + ", from=" + from + ", to=" + to + ", offset=" + offset + '}';
541572
}
542573

543574
@Override
544575
public int hashCode() {
545-
return Objects.hash(getClass(), children(), configuration);
576+
return Objects.hash(getClass(), children(), configuration, offset);
546577
}
547578

548579
@Override
@@ -552,6 +583,6 @@ public boolean equals(Object obj) {
552583
}
553584
Bucket other = (Bucket) obj;
554585

555-
return configuration.equals(other.configuration);
586+
return configuration.equals(other.configuration) && offset == other.offset;
556587
}
557588
}

x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/expression/function/scalar/date/DateTrunc.java

Lines changed: 10 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -168,15 +168,19 @@ public boolean foldable() {
168168
}
169169

170170
public static Rounding.Prepared createRounding(final Object interval, final ZoneId timeZone, Long min, Long max) {
171+
return createRounding(interval, timeZone, min, max, 0L);
172+
}
173+
174+
public static Rounding.Prepared createRounding(final Object interval, final ZoneId timeZone, Long min, Long max, long offset) {
171175
if (interval instanceof Period period) {
172-
return createRounding(period, timeZone, min, max);
176+
return createRounding(period, timeZone, min, max, offset);
173177
} else if (interval instanceof Duration duration) {
174-
return createRounding(duration, timeZone, min, max);
178+
return createRounding(duration, timeZone, min, max, offset);
175179
}
176180
throw new IllegalArgumentException("Time interval is not supported");
177181
}
178182

179-
private static Rounding.Prepared createRounding(final Period period, final ZoneId timeZone, Long min, Long max) {
183+
private static Rounding.Prepared createRounding(final Period period, final ZoneId timeZone, Long min, Long max, long offset) {
180184
// Zero or negative intervals are not supported
181185
if (period == null || period.isNegative() || period.isZero()) {
182186
throw new IllegalArgumentException("Zero or negative time interval is not supported");
@@ -217,6 +221,7 @@ private static Rounding.Prepared createRounding(final Period period, final ZoneI
217221
}
218222

219223
rounding.timeZone(timeZone);
224+
rounding.offset(offset);
220225
if (min != null && max != null && tryPrepareWithMinMax) {
221226
// Multiple quantities calendar interval - day/week/month/quarter/year is not supported by PreparedRounding.maybeUseArray,
222227
// which is called by prepare(min, max), as it may hit an assert. Call prepare(min, max) only for single calendar interval.
@@ -225,14 +230,15 @@ private static Rounding.Prepared createRounding(final Period period, final ZoneI
225230
return rounding.build().prepareForUnknown();
226231
}
227232

228-
private static Rounding.Prepared createRounding(final Duration duration, final ZoneId timeZone, Long min, Long max) {
233+
private static Rounding.Prepared createRounding(final Duration duration, final ZoneId timeZone, Long min, Long max, long offset) {
229234
// Zero or negative intervals are not supported
230235
if (duration == null || duration.isNegative() || duration.isZero()) {
231236
throw new IllegalArgumentException("Zero or negative time interval is not supported");
232237
}
233238

234239
final Rounding.Builder rounding = new Rounding.Builder(TimeValue.timeValueMillis(duration.toMillis()));
235240
rounding.timeZone(timeZone);
241+
rounding.offset(offset);
236242
if (min != null && max != null) {
237243
return rounding.build().prepare(min, max);
238244
}
Lines changed: 54 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,54 @@
1+
/*
2+
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
3+
* or more contributor license agreements. Licensed under the Elastic License
4+
* 2.0; you may not use this file except in compliance with the Elastic License
5+
* 2.0.
6+
*/
7+
8+
package org.elasticsearch.xpack.esql.expression.function.grouping;
9+
10+
import org.elasticsearch.test.ESTestCase;
11+
import org.elasticsearch.xpack.esql.capabilities.ConfigurationAware;
12+
import org.elasticsearch.xpack.esql.core.expression.FoldContext;
13+
import org.elasticsearch.xpack.esql.core.expression.Literal;
14+
import org.elasticsearch.xpack.esql.core.tree.Source;
15+
16+
import java.time.Duration;
17+
import java.time.Instant;
18+
19+
public class BucketOffsetTests extends ESTestCase {
20+
21+
public void testDurationSpanUsesOffset() {
22+
Bucket bucket = new Bucket(
23+
Source.EMPTY,
24+
Literal.dateTime(Source.EMPTY, Instant.parse("2024-01-01T00:00:00Z")),
25+
Literal.timeDuration(Source.EMPTY, Duration.ofHours(1)),
26+
null,
27+
null,
28+
ConfigurationAware.CONFIGURATION_MARKER,
29+
Duration.ofMinutes(30).toMillis()
30+
);
31+
32+
long value = Instant.parse("2024-01-01T01:20:00Z").toEpochMilli();
33+
long rounded = bucket.getDateRounding(FoldContext.small(), null, null).round(value);
34+
assertEquals(Instant.parse("2024-01-01T00:30:00Z").toEpochMilli(), rounded);
35+
}
36+
37+
public void testAutoSpanIgnoresOffset() {
38+
// The auto-span (numeric bucket count) path does not support offset yet.
39+
// TSTEP only uses the duration span path where offset is applied.
40+
Bucket bucket = new Bucket(
41+
Source.EMPTY,
42+
Literal.dateTime(Source.EMPTY, Instant.parse("2024-01-01T00:00:00Z")),
43+
Literal.integer(Source.EMPTY, 4),
44+
Literal.dateTime(Source.EMPTY, Instant.parse("2024-01-01T00:00:00Z")),
45+
Literal.dateTime(Source.EMPTY, Instant.parse("2024-01-01T04:00:00Z")),
46+
ConfigurationAware.CONFIGURATION_MARKER,
47+
Duration.ofMinutes(30).toMillis()
48+
);
49+
50+
long value = Instant.parse("2024-01-01T01:20:00Z").toEpochMilli();
51+
long rounded = bucket.getDateRounding(FoldContext.small(), null, null).round(value);
52+
assertEquals(Instant.parse("2024-01-01T01:00:00Z").toEpochMilli(), rounded);
53+
}
54+
}

x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/expression/function/grouping/BucketSerializationTests.java

Lines changed: 38 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -7,12 +7,19 @@
77

88
package org.elasticsearch.xpack.esql.expression.function.grouping;
99

10+
import org.elasticsearch.TransportVersion;
11+
import org.elasticsearch.test.TransportVersionUtils;
12+
import org.elasticsearch.xpack.esql.EsqlIllegalArgumentException;
1013
import org.elasticsearch.xpack.esql.core.expression.Expression;
1114
import org.elasticsearch.xpack.esql.core.tree.Source;
1215
import org.elasticsearch.xpack.esql.expression.AbstractExpressionSerializationTests;
1316
import org.elasticsearch.xpack.esql.session.Configuration;
1417

1518
import java.io.IOException;
19+
import java.time.Duration;
20+
21+
import static org.hamcrest.Matchers.containsString;
22+
import static org.hamcrest.Matchers.equalTo;
1623

1724
public class BucketSerializationTests extends AbstractExpressionSerializationTests<Bucket> {
1825
@Override
@@ -26,7 +33,8 @@ public static Bucket createRandomBucket(Configuration configuration) {
2633
Expression buckets = randomChild();
2734
Expression from = randomChild();
2835
Expression to = randomChild();
29-
return new Bucket(source, field, buckets, from, to, configuration);
36+
long offset = randomLongBetween(-Duration.ofDays(1).toMillis(), Duration.ofDays(1).toMillis());
37+
return new Bucket(source, field, buckets, from, to, configuration, offset);
3038
}
3139

3240
@Override
@@ -36,12 +44,39 @@ protected Bucket mutateInstance(Bucket instance) throws IOException {
3644
Expression buckets = instance.buckets();
3745
Expression from = instance.from();
3846
Expression to = instance.to();
39-
switch (between(0, 3)) {
47+
long offset = instance.offset();
48+
switch (between(0, 4)) {
4049
case 0 -> field = randomValueOtherThan(field, AbstractExpressionSerializationTests::randomChild);
4150
case 1 -> buckets = randomValueOtherThan(buckets, AbstractExpressionSerializationTests::randomChild);
4251
case 2 -> from = randomValueOtherThan(from, AbstractExpressionSerializationTests::randomChild);
4352
case 3 -> to = randomValueOtherThan(to, AbstractExpressionSerializationTests::randomChild);
53+
case 4 -> offset = randomValueOtherThan(
54+
offset,
55+
() -> randomLongBetween(-Duration.ofDays(1).toMillis(), Duration.ofDays(1).toMillis())
56+
);
4457
}
45-
return new Bucket(source, field, buckets, from, to, configuration());
58+
return new Bucket(source, field, buckets, from, to, configuration(), offset);
59+
}
60+
61+
public void testOffsetBackcompatSerialization() throws IOException {
62+
Bucket instance = new Bucket(randomSource(), randomChild(), randomChild(), randomChild(), randomChild(), configuration(), 0L);
63+
TransportVersion oldVersion = TransportVersionUtils.getPreviousVersion(Bucket.ESQL_BUCKET_OFFSET);
64+
Bucket copy = copyInstance(instance, oldVersion);
65+
assertThat(copy.offset(), equalTo(0L));
66+
}
67+
68+
public void testOffsetBackcompatSerializationRejectsNonZeroOffset() throws IOException {
69+
Bucket instance = new Bucket(
70+
randomSource(),
71+
randomChild(),
72+
randomChild(),
73+
randomChild(),
74+
randomChild(),
75+
configuration(),
76+
randomLongBetween(1, 1000)
77+
);
78+
TransportVersion oldVersion = TransportVersionUtils.getPreviousVersion(Bucket.ESQL_BUCKET_OFFSET);
79+
EsqlIllegalArgumentException e = expectThrows(EsqlIllegalArgumentException.class, () -> copyInstance(instance, oldVersion));
80+
assertThat(e.getMessage(), containsString("bucket with offset is not supported in peer node's version [" + oldVersion + "]"));
4681
}
4782
}

x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/tree/EsqlNodeSubclassTests.java

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -518,6 +518,9 @@ public void accept(Page page) {
518518
if (argClass == int.class) {
519519
return randomInt();
520520
}
521+
if (argClass == long.class) {
522+
return randomLong();
523+
}
521524
if (argClass == String.class) {
522525
// Nor strings
523526
return randomAlphaOfLength(5);

0 commit comments

Comments
 (0)