Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions docs/changelog/136062.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
pr: 136062
summary: ESQL Fix bug when renaming @timestamp in TS queries
area: ES|QL
type: bug
issues:
- 134994
Original file line number Diff line number Diff line change
Expand Up @@ -559,3 +559,26 @@ cnt:long | cluster:keyword | pod:keyword
1 | prod | two
1 | prod | three
;

Max of Rate with Bucket
required_capability: ts_command_v0

TS k8s
| STATS maxRate = max(rate(network.total_cost)) BY tbucket = bucket(@timestamp, 1hour)
;

maxRate:double | tbucket:datetime
0.058979885057471274 | 2024-05-10T00:00:00.000Z
;

Max of Rate with Bucket, Rename Timestamp
required_capability: ts_command_v0

TS k8s
| RENAME `@timestamp` AS newTs
| STATS maxRate = max(rate(network.total_cost)) BY tbucket = bucket(newTs, 1hour)
;

maxRate:double | tbucket:datetime
0.058979885057471274 | 2024-05-10T00:00:00.000Z
;
Original file line number Diff line number Diff line change
Expand Up @@ -488,10 +488,15 @@ protected LogicalPlan rule(LogicalPlan plan, AnalyzerContext context) {
// Gather all the children's output in case of non-unary plans; even for unaries, we need to copy because we may mutate this to
// simplify resolution of e.g. RENAME.
for (LogicalPlan child : plan.children()) {
var output = child.output();
List<Attribute> output = child.output();
childrenOutput.addAll(output);
}

if (plan instanceof TimeSeriesAggregate tsAggregate) {
// NOTE: This MUST be checked before the Aggregate version, since TimeSeriesAggregate is a subclass of Aggregate
return resolveTimeSeriesAggregate(tsAggregate, childrenOutput);
}

if (plan instanceof Aggregate aggregate) {
return resolveAggregate(aggregate, childrenOutput);
}
Expand Down Expand Up @@ -551,6 +556,63 @@ protected LogicalPlan rule(LogicalPlan plan, AnalyzerContext context) {
return plan.transformExpressionsOnly(UnresolvedAttribute.class, ua -> maybeResolveAttribute(ua, childrenOutput));
}

/**
* This function is meant to deal with the implicit timestamp fields that some TS functions use.
*/
private TimeSeriesAggregate resolveTimeSeriesAggregate(TimeSeriesAggregate timeSeriesAggregate, List<Attribute> childrenOutput) {
Attribute tsAtter = null;
for (int i = 0; i < childrenOutput.size(); i++) {
if (childrenOutput.get(i).name().equals(MetadataAttribute.TIMESTAMP_FIELD)) {
tsAtter = childrenOutput.get(i);
break;
}
}
if (tsAtter == null) {
// if we didn't find a timestamp in the children output, time to do more work
Holder<String> tsAttributeName = new Holder<>(MetadataAttribute.TIMESTAMP_FIELD);
Holder<Attribute> tsAttribute = new Holder<>();
timeSeriesAggregate.forEachExpressionUp(Alias.class, a -> {
if (a.child() instanceof Attribute c) {
// will this ever not be true?
if (c.name().equals(tsAttributeName.get())) {
tsAttributeName.set(a.name());
tsAttribute.set(a.toAttribute());
}
}
});

// Now we know what timestamp is going to be called, replace our UnresolvedAttributes referencing timestamp with that name
List<Expression> newGroupings = new ArrayList<>(timeSeriesAggregate.groupings().size());
List<NamedExpression> newAggregates = new ArrayList<>(timeSeriesAggregate.aggregates().size());
// TODO: Can we just resolve these here? we have the attribute
for (int i = 0; i < timeSeriesAggregate.groupings().size(); i++) {
newGroupings.add(timeSeriesAggregate.groupings().get(i).transformUp(UnresolvedAttribute.class, ua -> {
if (ua.name().equals(MetadataAttribute.TIMESTAMP_FIELD)) {
return new UnresolvedAttribute(ua.source(), tsAttributeName.get());
}
return ua;
}));
}

for (int i = 0; i < timeSeriesAggregate.aggregates().size(); i++) {
newAggregates.add(
(NamedExpression) timeSeriesAggregate.aggregates().get(i).transformUp(UnresolvedAttribute.class, ua -> {
if (ua.name().equals(MetadataAttribute.TIMESTAMP_FIELD)) {
return new UnresolvedAttribute(ua.source(), tsAttributeName.get());
}
return ua;
})
);
}
timeSeriesAggregate = timeSeriesAggregate.with(newGroupings, newAggregates, tsAttribute.get());
} else {
timeSeriesAggregate = timeSeriesAggregate.with(timeSeriesAggregate.groupings(), timeSeriesAggregate.aggregates(), tsAtter);
}

// After correcting the timestamps, we still need to resolve the node as normal, so delegate to resolveAggregate
return (TimeSeriesAggregate) resolveAggregate(timeSeriesAggregate, childrenOutput);
}

private Aggregate resolveAggregate(Aggregate aggregate, List<Attribute> childrenOutput) {
// if the grouping is resolved but the aggs are not, use the former to resolve the latter
// e.g. STATS a ... GROUP BY a = x + 1
Expand Down Expand Up @@ -1083,7 +1145,7 @@ private Attribute resolveAttribute(UnresolvedAttribute ua, List<Attribute> child

private static Attribute resolveAttribute(UnresolvedAttribute ua, List<Attribute> childrenOutput, Logger logger) {
Attribute resolved = ua;
var named = resolveAgainstList(ua, childrenOutput);
List<Attribute> named = resolveAgainstList(ua, childrenOutput);
// if resolved, return it; otherwise keep it in place to be resolved later
if (named.size() == 1) {
resolved = named.get(0);
Expand Down Expand Up @@ -1253,9 +1315,9 @@ public static List<NamedExpression> projectionsForRename(Rename rename, List<Att
projections.removeIf(x -> x.name().equals(alias.name()));
childrenOutput.removeIf(x -> x.name().equals(alias.name()));

var resolved = maybeResolveAttribute(ua, childrenOutput, logger);
Attribute resolved = maybeResolveAttribute(ua, childrenOutput, logger);
if (resolved instanceof UnsupportedAttribute || resolved.resolved()) {
var realiased = (NamedExpression) alias.replaceChildren(List.of(resolved));
NamedExpression realiased = alias.replaceChildren(List.of(resolved));
projections.replaceAll(x -> x.equals(resolved) ? realiased : x);
childrenOutput.removeIf(x -> x.equals(resolved));
reverseAliasing.put(resolved.name(), alias.name());
Expand Down Expand Up @@ -1356,7 +1418,7 @@ private static List<Attribute> resolveAgainstList(UnresolvedNamePattern up, Coll
}

private static List<Attribute> resolveAgainstList(UnresolvedAttribute ua, Collection<Attribute> attrList) {
var matches = AnalyzerRules.maybeResolveAgainstList(ua, attrList, a -> Analyzer.handleSpecialFields(ua, a));
List<Attribute> matches = AnalyzerRules.maybeResolveAgainstList(ua, attrList, a -> Analyzer.handleSpecialFields(ua, a));
return potentialCandidatesIfNoMatchesFound(ua, matches, attrList, list -> UnresolvedAttribute.errorMessage(ua.name(), list));
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -87,7 +87,7 @@ public static AttributeMap<Expression> foldableReferences(LogicalPlan plan, Logi

// collect aliases bottom-up
plan.forEachExpressionUp(Alias.class, a -> {
var c = a.child();
Expression c = a.child();
boolean shouldCollect = c.foldable();
// try to resolve the expression based on an existing foldables
if (shouldCollect == false) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -165,15 +165,12 @@ LogicalPlan translate(TimeSeriesAggregate aggregate) {
if (attr.name().equals(MetadataAttribute.TSID_FIELD)) {
tsid.set(attr);
}
if (attr.name().equals(MetadataAttribute.TIMESTAMP_FIELD)) {
timestamp.set(attr);
}
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I consider it a happy bonus of this PR that we are no longer fishing out the timestamp as part of the translate rule. I wonder if we should do something similar for the TSID. It just seems wrong to be doing this here.

}
});
if (tsid.get() == null) {
tsid.set(new MetadataAttribute(aggregate.source(), MetadataAttribute.TSID_FIELD, DataType.KEYWORD, false));
}
if (timestamp.get() == null) {
if (aggregate.timestamp() == null) {
throw new IllegalArgumentException("_tsid or @timestamp field are missing from the time-series source");
}
Map<AggregateFunction, Alias> timeSeriesAggs = new HashMap<>();
Expand Down Expand Up @@ -216,7 +213,7 @@ LogicalPlan translate(TimeSeriesAggregate aggregate) {
} else {
// TODO: reject over_time_aggregation only
final Expression aggField = af.field();
var tsAgg = new LastOverTime(af.source(), aggField, timestamp.get());
var tsAgg = new LastOverTime(af.source(), aggField, aggregate.timestamp());
final AggregateFunction firstStageFn;
if (inlineFilter != null) {
firstStageFn = tsAgg.perTimeSeriesAggregation().withFilter(inlineFilter);
Expand Down Expand Up @@ -247,12 +244,12 @@ LogicalPlan translate(TimeSeriesAggregate aggregate) {
Holder<NamedExpression> timeBucketRef = new Holder<>();
aggregate.child().forEachExpressionUp(NamedExpression.class, e -> {
for (Expression child : e.children()) {
if (child instanceof Bucket bucket && bucket.field().equals(timestamp.get())) {
if (child instanceof Bucket bucket && bucket.field().equals(aggregate.timestamp())) {
if (timeBucketRef.get() != null) {
throw new IllegalArgumentException("expected at most one time bucket");
}
timeBucketRef.set(e);
} else if (child instanceof TBucket tbucket && tbucket.field().equals(timestamp.get())) {
} else if (child instanceof TBucket tbucket && tbucket.field().equals(aggregate.timestamp())) {
if (timeBucketRef.get() != null) {
throw new IllegalArgumentException("expected at most one time tbucket");
}
Expand Down Expand Up @@ -296,7 +293,8 @@ LogicalPlan translate(TimeSeriesAggregate aggregate) {
newChild,
firstPassGroupings,
mergeExpressions(firstPassAggs, firstPassGroupings),
(Bucket) Alias.unwrap(timeBucket)
(Bucket) Alias.unwrap(timeBucket),
aggregate.timestamp()
);
return new Aggregate(firstPhase.source(), firstPhase, secondPassGroupings, mergeExpressions(secondPassAggs, secondPassGroupings));
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -362,7 +362,7 @@ public PlanFactory visitStatsCommand(EsqlBaseParser.StatsCommandContext ctx) {
return input -> {
if (input.anyMatch(p -> p instanceof Aggregate) == false
&& input.anyMatch(p -> p instanceof UnresolvedRelation ur && ur.indexMode() == IndexMode.TIME_SERIES)) {
return new TimeSeriesAggregate(source(ctx), input, stats.groupings, stats.aggregates, null);
return new TimeSeriesAggregate(source(ctx), input, stats.groupings, stats.aggregates, null, null);
} else {
return new Aggregate(source(ctx), input, stats.groupings, stats.aggregates);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -203,6 +203,15 @@ public <E extends Expression> void forEachExpressionDown(Class<? extends E> type
forEachPropertyDown(Object.class, e -> doForEachExpression(e, exp -> exp.forEachDown(typeToken, rule)));
}

/**
* This traverses all {@link Expression} nodes for all children of the current node, applying the given function to each of them.
* It does not transform (i.e. replace) those nodes, it just hands them to the consumer, which can read but not modify (since the
* nodes are immutable)
*
* @param typeToken Only process expressions matching the given type
* @param rule a non-modifying consumer which operates on the given token type
* @param <E> the type of expression this pass will process
*/
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I had to ask @fang-xing-esql how this worked, so I've written down what I learned from that conversation.

public <E extends Expression> void forEachExpressionUp(Class<E> typeToken, Consumer<? super E> rule) {
forEachPropertyUp(Object.class, e -> doForEachExpression(e, exp -> exp.forEachUp(typeToken, rule)));
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
import org.elasticsearch.core.Nullable;
import org.elasticsearch.xpack.esql.common.Failures;
import org.elasticsearch.xpack.esql.core.expression.Alias;
import org.elasticsearch.xpack.esql.core.expression.Attribute;
import org.elasticsearch.xpack.esql.core.expression.Expression;
import org.elasticsearch.xpack.esql.core.expression.FieldAttribute;
import org.elasticsearch.xpack.esql.core.expression.NamedExpression;
Expand Down Expand Up @@ -41,21 +42,30 @@ public class TimeSeriesAggregate extends Aggregate {
);

private final Bucket timeBucket;
/**
* this field is used by {@link org.elasticsearch.xpack.esql.optimizer.rules.logical.TranslateTimeSeriesAggregate} to help with
* resolving the timestamp field, but should not be needed after the initial logical planning on the coordinating node. As such,
* it is not serialized.
*/
private final Attribute timestamp;

public TimeSeriesAggregate(
Source source,
LogicalPlan child,
List<Expression> groupings,
List<? extends NamedExpression> aggregates,
Bucket timeBucket
Bucket timeBucket,
Attribute timestamp
) {
super(source, child, groupings, aggregates);
this.timeBucket = timeBucket;
this.timestamp = timestamp;
}

public TimeSeriesAggregate(StreamInput in) throws IOException {
super(in);
this.timeBucket = in.readOptionalWriteable(inp -> (Bucket) Bucket.ENTRY.reader.read(inp));
this.timestamp = null;
}

@Override
Expand All @@ -71,17 +81,21 @@ public String getWriteableName() {

@Override
protected NodeInfo<Aggregate> info() {
return NodeInfo.create(this, TimeSeriesAggregate::new, child(), groupings, aggregates, timeBucket);
return NodeInfo.create(this, TimeSeriesAggregate::new, child(), groupings, aggregates, timeBucket, timestamp);
}

@Override
public TimeSeriesAggregate replaceChild(LogicalPlan newChild) {
return new TimeSeriesAggregate(source(), newChild, groupings, aggregates, timeBucket);
return new TimeSeriesAggregate(source(), newChild, groupings, aggregates, timeBucket, timestamp);
}

@Override
public TimeSeriesAggregate with(LogicalPlan child, List<Expression> newGroupings, List<? extends NamedExpression> newAggregates) {
return new TimeSeriesAggregate(source(), child, newGroupings, newAggregates, timeBucket);
return new TimeSeriesAggregate(source(), child, newGroupings, newAggregates, timeBucket, timestamp);
}

public TimeSeriesAggregate with(List<Expression> newGroupings, List<? extends NamedExpression> newAggregates, Attribute newTimestamp) {
return new TimeSeriesAggregate(source(), child(), newGroupings, newAggregates, timeBucket, newTimestamp);
}

@Override
Expand All @@ -94,6 +108,10 @@ public Bucket timeBucket() {
return timeBucket;
}

public Attribute timestamp() {
return timestamp;
}

@Override
public int hashCode() {
return Objects.hash(groupings, aggregates, child(), timeBucket);
Expand Down
Loading