Skip to content

Commit 8062483

Browse files
committed
pass the correct timestamp ref into translate rule
1 parent 244decd commit 8062483

File tree

7 files changed

+55
-19
lines changed

7 files changed

+55
-19
lines changed

x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/analysis/Analyzer.java

Lines changed: 15 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -560,21 +560,31 @@ protected LogicalPlan rule(LogicalPlan plan, AnalyzerContext context) {
560560
* This function is meant to deal with the implicit timestamp fields that some TS functions use.
561561
*/
562562
private TimeSeriesAggregate resolveTimeSeriesAggregate(TimeSeriesAggregate timeSeriesAggregate, List<Attribute> childrenOutput) {
563-
if (childrenOutput.stream().noneMatch(attr -> attr.name().equals(MetadataAttribute.TIMESTAMP_FIELD))) {
564-
// We only need to do something if there isn't a timestamp field in our output
563+
Attribute tsAtter = null;
564+
for (int i = 0; i < childrenOutput.size(); i++) {
565+
if (childrenOutput.get(i).name().equals(MetadataAttribute.TIMESTAMP_FIELD)) {
566+
tsAtter = childrenOutput.get(i);
567+
break;
568+
}
569+
}
570+
if (tsAtter == null) {
571+
// if we didn't find a timestamp in the children output, time to do more work
565572
Holder<String> tsAttributeName = new Holder<>(MetadataAttribute.TIMESTAMP_FIELD);
573+
Holder<Attribute> tsAttribute = new Holder<>();
566574
timeSeriesAggregate.forEachExpressionUp(Alias.class, a -> {
567575
if (a.child() instanceof Attribute c) {
568576
// will this ever not be true?
569577
if (c.name().equals(tsAttributeName.get())) {
570578
tsAttributeName.set(a.name());
579+
tsAttribute.set(a.toAttribute());
571580
}
572581
}
573582
});
574583

575584
// Now we know what timestamp is going to be called, replace our UnresolvedAttributes referencing timestamp with that name
576585
List<Expression> newGroupings = new ArrayList<>(timeSeriesAggregate.groupings().size());
577586
List<NamedExpression> newAggregates = new ArrayList<>(timeSeriesAggregate.aggregates().size());
587+
// TODO: Can we just resolve these here? we have the attribute
578588
for (int i = 0; i < timeSeriesAggregate.groupings().size(); i++) {
579589
newGroupings.add(timeSeriesAggregate.groupings().get(i).transformUp(UnresolvedAttribute.class, ua -> {
580590
if (ua.name().equals(MetadataAttribute.TIMESTAMP_FIELD)) {
@@ -594,7 +604,9 @@ private TimeSeriesAggregate resolveTimeSeriesAggregate(TimeSeriesAggregate timeS
594604
})
595605
);
596606
}
597-
timeSeriesAggregate = (TimeSeriesAggregate) timeSeriesAggregate.with(newGroupings, newAggregates);
607+
timeSeriesAggregate = timeSeriesAggregate.with(newGroupings, newAggregates, tsAttribute.get());
608+
} else {
609+
timeSeriesAggregate = timeSeriesAggregate.with(timeSeriesAggregate.groupings(), timeSeriesAggregate.aggregates(), tsAtter);
598610
}
599611

600612
// After correcting the timestamps, we still need to resolve the node as normal, so delegate to resolveAggregate

x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/optimizer/rules/logical/TranslateTimeSeriesAggregate.java

Lines changed: 6 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -165,15 +165,12 @@ LogicalPlan translate(TimeSeriesAggregate aggregate) {
165165
if (attr.name().equals(MetadataAttribute.TSID_FIELD)) {
166166
tsid.set(attr);
167167
}
168-
if (attr.name().equals(MetadataAttribute.TIMESTAMP_FIELD)) {
169-
timestamp.set(attr);
170-
}
171168
}
172169
});
173170
if (tsid.get() == null) {
174171
tsid.set(new MetadataAttribute(aggregate.source(), MetadataAttribute.TSID_FIELD, DataType.KEYWORD, false));
175172
}
176-
if (timestamp.get() == null) {
173+
if (aggregate.timestamp() == null) {
177174
throw new IllegalArgumentException("_tsid or @timestamp field are missing from the time-series source");
178175
}
179176
Map<AggregateFunction, Alias> timeSeriesAggs = new HashMap<>();
@@ -216,7 +213,7 @@ LogicalPlan translate(TimeSeriesAggregate aggregate) {
216213
} else {
217214
// TODO: reject over_time_aggregation only
218215
final Expression aggField = af.field();
219-
var tsAgg = new LastOverTime(af.source(), aggField, timestamp.get());
216+
var tsAgg = new LastOverTime(af.source(), aggField, aggregate.timestamp());
220217
final AggregateFunction firstStageFn;
221218
if (inlineFilter != null) {
222219
firstStageFn = tsAgg.perTimeSeriesAggregation().withFilter(inlineFilter);
@@ -247,12 +244,12 @@ LogicalPlan translate(TimeSeriesAggregate aggregate) {
247244
Holder<NamedExpression> timeBucketRef = new Holder<>();
248245
aggregate.child().forEachExpressionUp(NamedExpression.class, e -> {
249246
for (Expression child : e.children()) {
250-
if (child instanceof Bucket bucket && bucket.field().equals(timestamp.get())) {
247+
if (child instanceof Bucket bucket && bucket.field().equals(aggregate.timestamp())) {
251248
if (timeBucketRef.get() != null) {
252249
throw new IllegalArgumentException("expected at most one time bucket");
253250
}
254251
timeBucketRef.set(e);
255-
} else if (child instanceof TBucket tbucket && tbucket.field().equals(timestamp.get())) {
252+
} else if (child instanceof TBucket tbucket && tbucket.field().equals(aggregate.timestamp())) {
256253
if (timeBucketRef.get() != null) {
257254
throw new IllegalArgumentException("expected at most one time tbucket");
258255
}
@@ -296,7 +293,8 @@ LogicalPlan translate(TimeSeriesAggregate aggregate) {
296293
newChild,
297294
firstPassGroupings,
298295
mergeExpressions(firstPassAggs, firstPassGroupings),
299-
(Bucket) Alias.unwrap(timeBucket)
296+
(Bucket) Alias.unwrap(timeBucket),
297+
aggregate.timestamp()
300298
);
301299
return new Aggregate(firstPhase.source(), firstPhase, secondPassGroupings, mergeExpressions(secondPassAggs, secondPassGroupings));
302300
}

x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/parser/LogicalPlanBuilder.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -362,7 +362,7 @@ public PlanFactory visitStatsCommand(EsqlBaseParser.StatsCommandContext ctx) {
362362
return input -> {
363363
if (input.anyMatch(p -> p instanceof Aggregate) == false
364364
&& input.anyMatch(p -> p instanceof UnresolvedRelation ur && ur.indexMode() == IndexMode.TIME_SERIES)) {
365-
return new TimeSeriesAggregate(source(ctx), input, stats.groupings, stats.aggregates, null);
365+
return new TimeSeriesAggregate(source(ctx), input, stats.groupings, stats.aggregates, null, null);
366366
} else {
367367
return new Aggregate(source(ctx), input, stats.groupings, stats.aggregates);
368368
}

x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plan/logical/TimeSeriesAggregate.java

Lines changed: 22 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,7 @@
1212
import org.elasticsearch.core.Nullable;
1313
import org.elasticsearch.xpack.esql.common.Failures;
1414
import org.elasticsearch.xpack.esql.core.expression.Alias;
15+
import org.elasticsearch.xpack.esql.core.expression.Attribute;
1516
import org.elasticsearch.xpack.esql.core.expression.Expression;
1617
import org.elasticsearch.xpack.esql.core.expression.FieldAttribute;
1718
import org.elasticsearch.xpack.esql.core.expression.NamedExpression;
@@ -41,21 +42,30 @@ public class TimeSeriesAggregate extends Aggregate {
4142
);
4243

4344
private final Bucket timeBucket;
45+
/**
46+
* this field is used by {@link org.elasticsearch.xpack.esql.optimizer.rules.logical.TranslateTimeSeriesAggregate} to help with
47+
* resolving the timestamp field, but should not be needed after the initial logical planning on the coordinating node. As such,
48+
* it is not serialized.
49+
*/
50+
private final Attribute timestamp;
4451

4552
public TimeSeriesAggregate(
4653
Source source,
4754
LogicalPlan child,
4855
List<Expression> groupings,
4956
List<? extends NamedExpression> aggregates,
50-
Bucket timeBucket
57+
Bucket timeBucket,
58+
Attribute timestamp
5159
) {
5260
super(source, child, groupings, aggregates);
5361
this.timeBucket = timeBucket;
62+
this.timestamp = timestamp;
5463
}
5564

5665
public TimeSeriesAggregate(StreamInput in) throws IOException {
5766
super(in);
5867
this.timeBucket = in.readOptionalWriteable(inp -> (Bucket) Bucket.ENTRY.reader.read(inp));
68+
this.timestamp = null;
5969
}
6070

6171
@Override
@@ -71,17 +81,21 @@ public String getWriteableName() {
7181

7282
@Override
7383
protected NodeInfo<Aggregate> info() {
74-
return NodeInfo.create(this, TimeSeriesAggregate::new, child(), groupings, aggregates, timeBucket);
84+
return NodeInfo.create(this, TimeSeriesAggregate::new, child(), groupings, aggregates, timeBucket, timestamp);
7585
}
7686

7787
@Override
7888
public TimeSeriesAggregate replaceChild(LogicalPlan newChild) {
79-
return new TimeSeriesAggregate(source(), newChild, groupings, aggregates, timeBucket);
89+
return new TimeSeriesAggregate(source(), newChild, groupings, aggregates, timeBucket, timestamp);
8090
}
8191

8292
@Override
8393
public TimeSeriesAggregate with(LogicalPlan child, List<Expression> newGroupings, List<? extends NamedExpression> newAggregates) {
84-
return new TimeSeriesAggregate(source(), child, newGroupings, newAggregates, timeBucket);
94+
return new TimeSeriesAggregate(source(), child, newGroupings, newAggregates, timeBucket, timestamp);
95+
}
96+
97+
public TimeSeriesAggregate with(List<Expression> newGroupings, List<? extends NamedExpression> newAggregates, Attribute newTimestamp) {
98+
return new TimeSeriesAggregate(source(), child(), newGroupings, newAggregates, timeBucket, newTimestamp);
8599
}
86100

87101
@Override
@@ -94,6 +108,10 @@ public Bucket timeBucket() {
94108
return timeBucket;
95109
}
96110

111+
public Attribute timestamp() {
112+
return timestamp;
113+
}
114+
97115
@Override
98116
public int hashCode() {
99117
return Objects.hash(groupings, aggregates, child(), timeBucket);

x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/optimizer/rules/logical/TranslateTimeSeriesAggregateTests.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -187,9 +187,9 @@ public void testRateWithRename() {
187187

188188
TimeSeriesAggregate innerStats = as(outerStats.child(), TimeSeriesAggregate.class);
189189
assertEquals(2, innerStats.groupings().size());
190-
assertEquals(timeBucketGroup, innerStats.groupings().get(1));
191190
Attribute tsidGroup = as(innerStats.groupings().get(0), Attribute.class);
192191
assertEquals("_tsid", tsidGroup.name());
192+
assertEquals(timeBucketGroup, innerStats.groupings().get(1));
193193

194194
assertEquals(2, innerStats.aggregates().size());
195195
Alias innerAggFunction = as(innerStats.aggregates().get(0), Alias.class);

x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/parser/StatementParserTests.java

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2496,6 +2496,7 @@ public void testSimpleMetricsWithStats() {
24962496
new Alias(EMPTY, "load", new UnresolvedFunction(EMPTY, "avg", DEFAULT, List.of(attribute("cpu")))),
24972497
attribute("ts")
24982498
),
2499+
null,
24992500
null
25002501
)
25012502
);
@@ -2509,6 +2510,7 @@ public void testSimpleMetricsWithStats() {
25092510
new Alias(EMPTY, "load", new UnresolvedFunction(EMPTY, "avg", DEFAULT, List.of(attribute("cpu")))),
25102511
attribute("ts")
25112512
),
2513+
null,
25122514
null
25132515
)
25142516
);
@@ -2532,6 +2534,7 @@ public void testSimpleMetricsWithStats() {
25322534
),
25332535
attribute("ts")
25342536
),
2537+
null,
25352538
null
25362539
)
25372540
);
@@ -2542,6 +2545,7 @@ public void testSimpleMetricsWithStats() {
25422545
unresolvedTSRelation("foo*"),
25432546
List.of(),
25442547
List.of(new Alias(EMPTY, "count(errors)", new UnresolvedFunction(EMPTY, "count", DEFAULT, List.of(attribute("errors"))))),
2548+
null,
25452549
null
25462550
)
25472551
);
@@ -2552,6 +2556,7 @@ public void testSimpleMetricsWithStats() {
25522556
unresolvedTSRelation("foo*"),
25532557
List.of(),
25542558
List.of(new Alias(EMPTY, "a(b)", new UnresolvedFunction(EMPTY, "a", DEFAULT, List.of(attribute("b"))))),
2559+
null,
25552560
null
25562561
)
25572562
);
@@ -2562,6 +2567,7 @@ public void testSimpleMetricsWithStats() {
25622567
unresolvedTSRelation("foo*"),
25632568
List.of(),
25642569
List.of(new Alias(EMPTY, "a(b)", new UnresolvedFunction(EMPTY, "a", DEFAULT, List.of(attribute("b"))))),
2570+
null,
25652571
null
25662572
)
25672573
);
@@ -2572,6 +2578,7 @@ public void testSimpleMetricsWithStats() {
25722578
unresolvedTSRelation("foo*"),
25732579
List.of(),
25742580
List.of(new Alias(EMPTY, "a1(b2)", new UnresolvedFunction(EMPTY, "a1", DEFAULT, List.of(attribute("b2"))))),
2581+
null,
25752582
null
25762583
)
25772584
);
@@ -2586,6 +2593,7 @@ public void testSimpleMetricsWithStats() {
25862593
attribute("c"),
25872594
attribute("d.e")
25882595
),
2596+
null,
25892597
null
25902598
)
25912599
);

x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/plan/logical/TimeSeriesAggregateSerializationTests.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -24,7 +24,7 @@ protected TimeSeriesAggregate createTestInstance() {
2424
List<Expression> groupings = randomFieldAttributes(0, 5, false).stream().map(a -> (Expression) a).toList();
2525
List<? extends NamedExpression> aggregates = AggregateSerializationTests.randomAggregates();
2626
Bucket timeBucket = BucketSerializationTests.createRandomBucket();
27-
return new TimeSeriesAggregate(source, child, groupings, aggregates, timeBucket);
27+
return new TimeSeriesAggregate(source, child, groupings, aggregates, timeBucket, null);
2828
}
2929

3030
@Override
@@ -42,7 +42,7 @@ protected TimeSeriesAggregate mutateInstance(TimeSeriesAggregate instance) throw
4242
case 2 -> aggregates = randomValueOtherThan(aggregates, AggregateSerializationTests::randomAggregates);
4343
case 3 -> timeBucket = randomValueOtherThan(timeBucket, BucketSerializationTests::createRandomBucket);
4444
}
45-
return new TimeSeriesAggregate(instance.source(), child, groupings, aggregates, timeBucket);
45+
return new TimeSeriesAggregate(instance.source(), child, groupings, aggregates, timeBucket, null);
4646
}
4747

4848
@Override

0 commit comments

Comments
 (0)