Skip to content

Commit 311f082

Browse files
committed
Rule to fix at least some of this
1 parent 83f8909 commit 311f082

File tree

5 files changed

+72
-9
lines changed

5 files changed

+72
-9
lines changed

x-pack/plugin/esql/qa/testFixtures/src/main/resources/k8s-timeseries.csv-spec

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -561,6 +561,8 @@ cnt:long | cluster:keyword | pod:keyword
561561
;
562562

563563
Max of Rate with Bucket
564+
required_capability: ts_command_v0
565+
564566
TS k8s
565567
| STATS maxRate = max(rate(network.total_cost)) BY tbucket = bucket(@timestamp, 1hour)
566568
;
@@ -570,6 +572,8 @@ maxRate:double | tbucket:datetime
570572
;
571573

572574
Max of Rate with Bucket, Rename Timestamp
575+
required_capability: ts_command_v0
576+
573577
TS k8s
574578
| RENAME `@timestamp` AS newTs
575579
| STATS maxRate = max(rate(network.total_cost)) BY tbucket = bucket(newTs, 1hour)

x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/analysis/Analyzer.java

Lines changed: 55 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -488,10 +488,15 @@ protected LogicalPlan rule(LogicalPlan plan, AnalyzerContext context) {
488488
// Gather all the children's output in case of non-unary plans; even for unaries, we need to copy because we may mutate this to
489489
// simplify resolution of e.g. RENAME.
490490
for (LogicalPlan child : plan.children()) {
491-
var output = child.output();
491+
List<Attribute> output = child.output();
492492
childrenOutput.addAll(output);
493493
}
494494

495+
if (plan instanceof TimeSeriesAggregate tsAggregate) {
496+
// NOTE: This MUST be checked before the Aggregate version, since TimeSeriesAggregate is a subclass of Aggregate
497+
return resolveTimeSeriesAggregate(tsAggregate, childrenOutput);
498+
}
499+
495500
if (plan instanceof Aggregate aggregate) {
496501
return resolveAggregate(aggregate, childrenOutput);
497502
}
@@ -551,6 +556,51 @@ protected LogicalPlan rule(LogicalPlan plan, AnalyzerContext context) {
551556
return plan.transformExpressionsOnly(UnresolvedAttribute.class, ua -> maybeResolveAttribute(ua, childrenOutput));
552557
}
553558

559+
/**
560+
* This function is meant to deal with the implicit timestamp fields that some TS functions use.
561+
*/
562+
private TimeSeriesAggregate resolveTimeSeriesAggregate(TimeSeriesAggregate timeSeriesAggregate, List<Attribute> childrenOutput) {
563+
if (childrenOutput.stream().noneMatch(attr -> attr.name().equals(MetadataAttribute.TIMESTAMP_FIELD))) {
564+
// We only need to do something if there isn't a timestamp field in our output
565+
Holder<String> tsAttributeName = new Holder<>(MetadataAttribute.TIMESTAMP_FIELD);
566+
timeSeriesAggregate.forEachExpressionUp(Alias.class, a -> {
567+
if (a.child() instanceof Attribute c) {
568+
// will this ever not be true?
569+
if (c.name().equals(tsAttributeName.get())) {
570+
tsAttributeName.set(a.name());
571+
}
572+
}
573+
});
574+
575+
// Now we know what timestamp is going to be called, replace our UnresolvedAttributes referencing timestamp with that name
576+
List<Expression> newGroupings = new ArrayList<>(timeSeriesAggregate.groupings().size());
577+
List<NamedExpression> newAggregates = new ArrayList<>(timeSeriesAggregate.aggregates().size());
578+
for (int i = 0; i < timeSeriesAggregate.groupings().size(); i++) {
579+
newGroupings.add(timeSeriesAggregate.groupings().get(i).transformUp(UnresolvedAttribute.class, ua -> {
580+
if (ua.name().equals(MetadataAttribute.TIMESTAMP_FIELD)) {
581+
return new UnresolvedAttribute(ua.source(), tsAttributeName.get());
582+
}
583+
return ua;
584+
}));
585+
}
586+
587+
for (int i = 0; i < timeSeriesAggregate.aggregates().size(); i++) {
588+
newAggregates.add(
589+
(NamedExpression) timeSeriesAggregate.aggregates().get(i).transformUp(UnresolvedAttribute.class, ua -> {
590+
if (ua.name().equals(MetadataAttribute.TIMESTAMP_FIELD)) {
591+
return new UnresolvedAttribute(ua.source(), tsAttributeName.get());
592+
}
593+
return ua;
594+
})
595+
);
596+
}
597+
timeSeriesAggregate = (TimeSeriesAggregate) timeSeriesAggregate.with(newGroupings, newAggregates);
598+
}
599+
600+
// After correcting the timestamps, we still need to resolve the node as normal, so delegate to resolveAggregate
601+
return (TimeSeriesAggregate) resolveAggregate(timeSeriesAggregate, childrenOutput);
602+
}
603+
554604
private Aggregate resolveAggregate(Aggregate aggregate, List<Attribute> childrenOutput) {
555605
// if the grouping is resolved but the aggs are not, use the former to resolve the latter
556606
// e.g. STATS a ... GROUP BY a = x + 1
@@ -1083,7 +1133,7 @@ private Attribute resolveAttribute(UnresolvedAttribute ua, List<Attribute> child
10831133

10841134
private static Attribute resolveAttribute(UnresolvedAttribute ua, List<Attribute> childrenOutput, Logger logger) {
10851135
Attribute resolved = ua;
1086-
var named = resolveAgainstList(ua, childrenOutput);
1136+
List<Attribute> named = resolveAgainstList(ua, childrenOutput);
10871137
// if resolved, return it; otherwise keep it in place to be resolved later
10881138
if (named.size() == 1) {
10891139
resolved = named.get(0);
@@ -1253,9 +1303,9 @@ public static List<NamedExpression> projectionsForRename(Rename rename, List<Att
12531303
projections.removeIf(x -> x.name().equals(alias.name()));
12541304
childrenOutput.removeIf(x -> x.name().equals(alias.name()));
12551305

1256-
var resolved = maybeResolveAttribute(ua, childrenOutput, logger);
1306+
Attribute resolved = maybeResolveAttribute(ua, childrenOutput, logger);
12571307
if (resolved instanceof UnsupportedAttribute || resolved.resolved()) {
1258-
var realiased = (NamedExpression) alias.replaceChildren(List.of(resolved));
1308+
NamedExpression realiased = alias.replaceChildren(List.of(resolved));
12591309
projections.replaceAll(x -> x.equals(resolved) ? realiased : x);
12601310
childrenOutput.removeIf(x -> x.equals(resolved));
12611311
reverseAliasing.put(resolved.name(), alias.name());
@@ -1356,7 +1406,7 @@ private static List<Attribute> resolveAgainstList(UnresolvedNamePattern up, Coll
13561406
}
13571407

13581408
private static List<Attribute> resolveAgainstList(UnresolvedAttribute ua, Collection<Attribute> attrList) {
1359-
var matches = AnalyzerRules.maybeResolveAgainstList(ua, attrList, a -> Analyzer.handleSpecialFields(ua, a));
1409+
List<Attribute> matches = AnalyzerRules.maybeResolveAgainstList(ua, attrList, a -> Analyzer.handleSpecialFields(ua, a));
13601410
return potentialCandidatesIfNoMatchesFound(ua, matches, attrList, list -> UnresolvedAttribute.errorMessage(ua.name(), list));
13611411
}
13621412

x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/optimizer/rules/RuleUtils.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -87,7 +87,7 @@ public static AttributeMap<Expression> foldableReferences(LogicalPlan plan, Logi
8787

8888
// collect aliases bottom-up
8989
plan.forEachExpressionUp(Alias.class, a -> {
90-
var c = a.child();
90+
Expression c = a.child();
9191
boolean shouldCollect = c.foldable();
9292
// try to resolve the expression based on an existing foldables
9393
if (shouldCollect == false) {

x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plan/QueryPlan.java

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -203,6 +203,15 @@ public <E extends Expression> void forEachExpressionDown(Class<? extends E> type
203203
forEachPropertyDown(Object.class, e -> doForEachExpression(e, exp -> exp.forEachDown(typeToken, rule)));
204204
}
205205

206+
/**
207+
* This traverses all {@link Expression} nodes for all children of the current node, applying the given function to each of them.
208+
* It does not transform (i.e. replace) those nodes, it just hands them to the consumer, which can read but not modify (since the
209+
* nodes are immutable)
210+
*
211+
* @param typeToken Only process expressions matching the given type
212+
* @param rule a non-modifying consumer which operates on the given token type
213+
* @param <E> the type of expression this pass will process
214+
*/
206215
public <E extends Expression> void forEachExpressionUp(Class<E> typeToken, Consumer<? super E> rule) {
207216
forEachPropertyUp(Object.class, e -> doForEachExpression(e, exp -> exp.forEachUp(typeToken, rule)));
208217
}

x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/optimizer/rules/logical/TranslateTimeSeriesAggregateTests.java

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -143,7 +143,7 @@ public void testAvgOfAvgOverTime() {
143143
EsRelation relation = as(bucketEval.child(), EsRelation.class);
144144
}
145145

146-
public void testRenameTimestampWithRate() {
146+
public void testRateWithRename() {
147147
assumeTrue("requires metrics command", EsqlCapabilities.Cap.TS_COMMAND_V0.isEnabled());
148148
LogicalPlan plan = planK8s("""
149149
TS k8s
@@ -152,7 +152,7 @@ public void testRenameTimestampWithRate() {
152152
""");
153153
}
154154

155-
public void testRenameTimestampWithOverTimeFunction() {
155+
public void testOverTimeFunctionWithRename() {
156156
assumeTrue("requires metrics command", EsqlCapabilities.Cap.TS_COMMAND_V0.isEnabled());
157157
LogicalPlan plan = planK8s("""
158158
TS k8s
@@ -161,7 +161,7 @@ public void testRenameTimestampWithOverTimeFunction() {
161161
""");
162162
}
163163

164-
public void testRenameTimestampWithOverTimeFunctionWithTbucket() {
164+
public void testTbucketWithRename() {
165165
assumeTrue("requires metrics command", EsqlCapabilities.Cap.TS_COMMAND_V0.isEnabled());
166166
LogicalPlan plan = planK8s("""
167167
TS k8s

0 commit comments

Comments
 (0)