Skip to content
Open
Show file tree
Hide file tree
Changes from 7 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions docs/changelog/136062.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
pr: 136062
summary: ESQL Fix bug when renaming @timestamp in TS queries
area: ES|QL
type: bug
issues:
- 134994
Original file line number Diff line number Diff line change
Expand Up @@ -559,3 +559,26 @@ cnt:long | cluster:keyword | pod:keyword
1 | prod | two
1 | prod | three
;

Max of Rate with Bucket
required_capability: ts_command_v0

TS k8s
| STATS maxRate = max(rate(network.total_cost)) BY tbucket = bucket(@timestamp, 1hour)
;

maxRate:double | tbucket:datetime
0.058979885057471274 | 2024-05-10T00:00:00.000Z
;

Max of Rate with Bucket, Rename Timestamp
required_capability: ts_command_v0

TS k8s
| RENAME `@timestamp` AS newTs
| STATS maxRate = max(rate(network.total_cost)) BY tbucket = bucket(newTs, 1hour)
;

maxRate:double | tbucket:datetime
0.058979885057471274 | 2024-05-10T00:00:00.000Z
;
Original file line number Diff line number Diff line change
Expand Up @@ -488,10 +488,15 @@ protected LogicalPlan rule(LogicalPlan plan, AnalyzerContext context) {
// Gather all the children's output in case of non-unary plans; even for unaries, we need to copy because we may mutate this to
// simplify resolution of e.g. RENAME.
for (LogicalPlan child : plan.children()) {
var output = child.output();
List<Attribute> output = child.output();
childrenOutput.addAll(output);
}

if (plan instanceof TimeSeriesAggregate tsAggregate) {
// NOTE: This MUST be checked before the Aggregate version, since TimeSeriesAggregate is a subclass of Aggregate
return resolveTimeSeriesAggregate(tsAggregate, childrenOutput);
}

if (plan instanceof Aggregate aggregate) {
return resolveAggregate(aggregate, childrenOutput);
}
Expand Down Expand Up @@ -551,6 +556,63 @@ protected LogicalPlan rule(LogicalPlan plan, AnalyzerContext context) {
return plan.transformExpressionsOnly(UnresolvedAttribute.class, ua -> maybeResolveAttribute(ua, childrenOutput));
}

/**
* This function is meant to deal with the implicit timestamp fields that some TS functions use.
*/
private TimeSeriesAggregate resolveTimeSeriesAggregate(TimeSeriesAggregate timeSeriesAggregate, List<Attribute> childrenOutput) {
Attribute tsAtter = null;
for (int i = 0; i < childrenOutput.size(); i++) {
if (childrenOutput.get(i).name().equals(MetadataAttribute.TIMESTAMP_FIELD)) {
tsAtter = childrenOutput.get(i);
break;
}
}
if (tsAtter == null) {
// if we didn't find a timestamp in the children output, time to do more work
Holder<String> tsAttributeName = new Holder<>(MetadataAttribute.TIMESTAMP_FIELD);
Holder<Attribute> tsAttribute = new Holder<>();
timeSeriesAggregate.forEachExpressionUp(Alias.class, a -> {
if (a.child() instanceof Attribute c) {
// will this ever not be true?
if (c.name().equals(tsAttributeName.get())) {
tsAttributeName.set(a.name());
tsAttribute.set(a.toAttribute());
}
}
});

// Now we know what timestamp is going to be called, replace our UnresolvedAttributes referencing timestamp with that name
List<Expression> newGroupings = new ArrayList<>(timeSeriesAggregate.groupings().size());
List<NamedExpression> newAggregates = new ArrayList<>(timeSeriesAggregate.aggregates().size());
// TODO: Can we just resolve these here? we have the attribute
for (int i = 0; i < timeSeriesAggregate.groupings().size(); i++) {
newGroupings.add(timeSeriesAggregate.groupings().get(i).transformUp(UnresolvedAttribute.class, ua -> {
if (ua.name().equals(MetadataAttribute.TIMESTAMP_FIELD)) {
return new UnresolvedAttribute(ua.source(), tsAttributeName.get());
}
return ua;
}));
}

for (int i = 0; i < timeSeriesAggregate.aggregates().size(); i++) {
newAggregates.add(
(NamedExpression) timeSeriesAggregate.aggregates().get(i).transformUp(UnresolvedAttribute.class, ua -> {
if (ua.name().equals(MetadataAttribute.TIMESTAMP_FIELD)) {
return new UnresolvedAttribute(ua.source(), tsAttributeName.get());
}
return ua;
})
);
}
timeSeriesAggregate = timeSeriesAggregate.with(newGroupings, newAggregates, tsAttribute.get());
} else {
timeSeriesAggregate = timeSeriesAggregate.with(timeSeriesAggregate.groupings(), timeSeriesAggregate.aggregates(), tsAtter);
}

// After correcting the timestamps, we still need to resolve the node as normal, so delegate to resolveAggregate
return (TimeSeriesAggregate) resolveAggregate(timeSeriesAggregate, childrenOutput);
}

private Aggregate resolveAggregate(Aggregate aggregate, List<Attribute> childrenOutput) {
// if the grouping is resolved but the aggs are not, use the former to resolve the latter
// e.g. STATS a ... GROUP BY a = x + 1
Expand Down Expand Up @@ -1083,7 +1145,7 @@ private Attribute resolveAttribute(UnresolvedAttribute ua, List<Attribute> child

private static Attribute resolveAttribute(UnresolvedAttribute ua, List<Attribute> childrenOutput, Logger logger) {
Attribute resolved = ua;
var named = resolveAgainstList(ua, childrenOutput);
List<Attribute> named = resolveAgainstList(ua, childrenOutput);
// if resolved, return it; otherwise keep it in place to be resolved later
if (named.size() == 1) {
resolved = named.get(0);
Expand Down Expand Up @@ -1253,9 +1315,9 @@ public static List<NamedExpression> projectionsForRename(Rename rename, List<Att
projections.removeIf(x -> x.name().equals(alias.name()));
childrenOutput.removeIf(x -> x.name().equals(alias.name()));

var resolved = maybeResolveAttribute(ua, childrenOutput, logger);
Attribute resolved = maybeResolveAttribute(ua, childrenOutput, logger);
if (resolved instanceof UnsupportedAttribute || resolved.resolved()) {
var realiased = (NamedExpression) alias.replaceChildren(List.of(resolved));
NamedExpression realiased = alias.replaceChildren(List.of(resolved));
projections.replaceAll(x -> x.equals(resolved) ? realiased : x);
childrenOutput.removeIf(x -> x.equals(resolved));
reverseAliasing.put(resolved.name(), alias.name());
Expand Down Expand Up @@ -1356,7 +1418,7 @@ private static List<Attribute> resolveAgainstList(UnresolvedNamePattern up, Coll
}

private static List<Attribute> resolveAgainstList(UnresolvedAttribute ua, Collection<Attribute> attrList) {
var matches = AnalyzerRules.maybeResolveAgainstList(ua, attrList, a -> Analyzer.handleSpecialFields(ua, a));
List<Attribute> matches = AnalyzerRules.maybeResolveAgainstList(ua, attrList, a -> Analyzer.handleSpecialFields(ua, a));
return potentialCandidatesIfNoMatchesFound(ua, matches, attrList, list -> UnresolvedAttribute.errorMessage(ua.name(), list));
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -87,7 +87,7 @@ public static AttributeMap<Expression> foldableReferences(LogicalPlan plan, Logi

// collect aliases bottom-up
plan.forEachExpressionUp(Alias.class, a -> {
var c = a.child();
Expression c = a.child();
boolean shouldCollect = c.foldable();
// try to resolve the expression based on an existing foldables
if (shouldCollect == false) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -165,15 +165,12 @@ LogicalPlan translate(TimeSeriesAggregate aggregate) {
if (attr.name().equals(MetadataAttribute.TSID_FIELD)) {
tsid.set(attr);
}
if (attr.name().equals(MetadataAttribute.TIMESTAMP_FIELD)) {
timestamp.set(attr);
}
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I consider it a happy bonus of this PR that we are no longer fishing out the timestamp as part of the translate rule. I wonder if we should do something similar for the TSID. It just seems wrong to be doing this here.

}
});
if (tsid.get() == null) {
tsid.set(new MetadataAttribute(aggregate.source(), MetadataAttribute.TSID_FIELD, DataType.KEYWORD, false));
}
if (timestamp.get() == null) {
if (aggregate.timestamp() == null) {
throw new IllegalArgumentException("_tsid or @timestamp field are missing from the time-series source");
}
Map<AggregateFunction, Alias> timeSeriesAggs = new HashMap<>();
Expand Down Expand Up @@ -216,7 +213,7 @@ LogicalPlan translate(TimeSeriesAggregate aggregate) {
} else {
// TODO: reject over_time_aggregation only
final Expression aggField = af.field();
var tsAgg = new LastOverTime(af.source(), aggField, timestamp.get());
var tsAgg = new LastOverTime(af.source(), aggField, aggregate.timestamp());
final AggregateFunction firstStageFn;
if (inlineFilter != null) {
firstStageFn = tsAgg.perTimeSeriesAggregation().withFilter(inlineFilter);
Expand Down Expand Up @@ -247,12 +244,12 @@ LogicalPlan translate(TimeSeriesAggregate aggregate) {
Holder<NamedExpression> timeBucketRef = new Holder<>();
aggregate.child().forEachExpressionUp(NamedExpression.class, e -> {
for (Expression child : e.children()) {
if (child instanceof Bucket bucket && bucket.field().equals(timestamp.get())) {
if (child instanceof Bucket bucket && bucket.field().equals(aggregate.timestamp())) {
if (timeBucketRef.get() != null) {
throw new IllegalArgumentException("expected at most one time bucket");
}
timeBucketRef.set(e);
} else if (child instanceof TBucket tbucket && tbucket.field().equals(timestamp.get())) {
} else if (child instanceof TBucket tbucket && tbucket.field().equals(aggregate.timestamp())) {
if (timeBucketRef.get() != null) {
throw new IllegalArgumentException("expected at most one time tbucket");
}
Expand Down Expand Up @@ -296,7 +293,8 @@ LogicalPlan translate(TimeSeriesAggregate aggregate) {
newChild,
firstPassGroupings,
mergeExpressions(firstPassAggs, firstPassGroupings),
(Bucket) Alias.unwrap(timeBucket)
(Bucket) Alias.unwrap(timeBucket),
aggregate.timestamp()
);
return new Aggregate(firstPhase.source(), firstPhase, secondPassGroupings, mergeExpressions(secondPassAggs, secondPassGroupings));
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -362,7 +362,7 @@ public PlanFactory visitStatsCommand(EsqlBaseParser.StatsCommandContext ctx) {
return input -> {
if (input.anyMatch(p -> p instanceof Aggregate) == false
&& input.anyMatch(p -> p instanceof UnresolvedRelation ur && ur.indexMode() == IndexMode.TIME_SERIES)) {
return new TimeSeriesAggregate(source(ctx), input, stats.groupings, stats.aggregates, null);
return new TimeSeriesAggregate(source(ctx), input, stats.groupings, stats.aggregates, null, null);
} else {
return new Aggregate(source(ctx), input, stats.groupings, stats.aggregates);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -203,6 +203,15 @@ public <E extends Expression> void forEachExpressionDown(Class<? extends E> type
forEachPropertyDown(Object.class, e -> doForEachExpression(e, exp -> exp.forEachDown(typeToken, rule)));
}

/**
* This traverses all {@link Expression} nodes for all children of the current node, applying the given function to each of them.
* It does not transform (i.e. replace) those nodes, it just hands them to the consumer, which can read but not modify (since the
* nodes are immutable)
*
* @param typeToken Only process expressions matching the given type
* @param rule a non-modifying consumer which operates on the given token type
* @param <E> the type of expression this pass will process
*/
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I had to ask @fang-xing-esql how this worked, so I've written down what I learned from that conversation.

public <E extends Expression> void forEachExpressionUp(Class<E> typeToken, Consumer<? super E> rule) {
forEachPropertyUp(Object.class, e -> doForEachExpression(e, exp -> exp.forEachUp(typeToken, rule)));
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
import org.elasticsearch.core.Nullable;
import org.elasticsearch.xpack.esql.common.Failures;
import org.elasticsearch.xpack.esql.core.expression.Alias;
import org.elasticsearch.xpack.esql.core.expression.Attribute;
import org.elasticsearch.xpack.esql.core.expression.Expression;
import org.elasticsearch.xpack.esql.core.expression.FieldAttribute;
import org.elasticsearch.xpack.esql.core.expression.NamedExpression;
Expand Down Expand Up @@ -41,21 +42,30 @@ public class TimeSeriesAggregate extends Aggregate {
);

private final Bucket timeBucket;
/**
* this field is used by {@link org.elasticsearch.xpack.esql.optimizer.rules.logical.TranslateTimeSeriesAggregate} to help with
* resolving the timestamp field, but should not be needed after the initial logical planning on the coordinating node. As such,
* it is not serialized.
*/
private final Attribute timestamp;

public TimeSeriesAggregate(
Source source,
LogicalPlan child,
List<Expression> groupings,
List<? extends NamedExpression> aggregates,
Bucket timeBucket
Bucket timeBucket,
Attribute timestamp
) {
super(source, child, groupings, aggregates);
this.timeBucket = timeBucket;
this.timestamp = timestamp;
}

public TimeSeriesAggregate(StreamInput in) throws IOException {
super(in);
this.timeBucket = in.readOptionalWriteable(inp -> (Bucket) Bucket.ENTRY.reader.read(inp));
this.timestamp = null;
}

@Override
Expand All @@ -71,17 +81,21 @@ public String getWriteableName() {

@Override
protected NodeInfo<Aggregate> info() {
return NodeInfo.create(this, TimeSeriesAggregate::new, child(), groupings, aggregates, timeBucket);
return NodeInfo.create(this, TimeSeriesAggregate::new, child(), groupings, aggregates, timeBucket, timestamp);
}

@Override
public TimeSeriesAggregate replaceChild(LogicalPlan newChild) {
return new TimeSeriesAggregate(source(), newChild, groupings, aggregates, timeBucket);
return new TimeSeriesAggregate(source(), newChild, groupings, aggregates, timeBucket, timestamp);
}

@Override
public TimeSeriesAggregate with(LogicalPlan child, List<Expression> newGroupings, List<? extends NamedExpression> newAggregates) {
return new TimeSeriesAggregate(source(), child, newGroupings, newAggregates, timeBucket);
return new TimeSeriesAggregate(source(), child, newGroupings, newAggregates, timeBucket, timestamp);
}

public TimeSeriesAggregate with(List<Expression> newGroupings, List<? extends NamedExpression> newAggregates, Attribute newTimestamp) {
return new TimeSeriesAggregate(source(), child(), newGroupings, newAggregates, timeBucket, newTimestamp);
}

@Override
Expand All @@ -94,6 +108,10 @@ public Bucket timeBucket() {
return timeBucket;
}

public Attribute timestamp() {
return timestamp;
}

@Override
public int hashCode() {
return Objects.hash(groupings, aggregates, child(), timeBucket);
Expand Down
Loading