Skip to content
Merged
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -211,6 +211,7 @@ static TransportVersion def(int id) {
public static final TransportVersion REPOSITORIES_METADATA_AS_PROJECT_CUSTOM = def(9_042_0_00);
public static final TransportVersion BATCHED_QUERY_PHASE_VERSION = def(9_043_0_00);
public static final TransportVersion REMOTE_EXCEPTION = def(9_044_0_00);
public static final TransportVersion ESQL_REMOVE_AGGREGATE_TYPE = def(9_045_0_00);

/*
* STOP! READ THIS FIRST! No, really,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,19 +23,18 @@
public class TimeSeriesAggregationOperator extends HashAggregationOperator {

public record Factory(
BlockHash.GroupSpec tsidGroup,
BlockHash.GroupSpec timestampGroup,
List<BlockHash.GroupSpec> groups,
AggregatorMode aggregatorMode,
List<GroupingAggregator.Factory> aggregators,
int maxPageSize
) implements OperatorFactory {
@Override
public Operator get(DriverContext driverContext) {
// TODO: use TimeSeriesBlockHash when possible
return new HashAggregationOperator(
return new TimeSeriesAggregationOperator(
aggregators,
() -> BlockHash.build(
List.of(tsidGroup, timestampGroup),
groups,
driverContext.blockFactory(),
maxPageSize,
true // we can enable optimizations as the inputs are vectors
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,7 @@ protected LogicalPlan rule(UnaryPlan plan) {
// project can be fully removed
if (newAggs != null) {
var newGroups = replacePrunedAliasesUsedInGroupBy(a.groupings(), aggs, newAggs);
plan = new Aggregate(a.source(), a.child(), a.aggregateType(), newGroups, newAggs);
plan = a.with(newGroups, newAggs);
}
}
return plan;
Expand All @@ -75,10 +75,8 @@ protected LogicalPlan rule(UnaryPlan plan) {
throw new EsqlIllegalArgumentException("Expected an Attribute, got {}", grouping);
}
}
plan = new Aggregate(
a.source(),
plan = a.with(
p.child(),
a.aggregateType(),
combineUpperGroupingsAndLowerProjections(groupingAttrs, p.projections()),
combineProjections(a.aggregates(), p.projections())
);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -67,12 +67,6 @@ protected LogicalPlan rule(Aggregate aggregate) {
}

// build the new agg on top of extracted filter
return new Aggregate(
aggregate.source(),
new Filter(aggregate.source(), aggregate.child(), common.v1()),
aggregate.aggregateType(),
aggregate.groupings(),
newAggs
);
return aggregate.with(new Filter(aggregate.source(), aggregate.child(), common.v1()), aggregate.groupings(), newAggs);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -73,22 +73,10 @@ public LogicalPlan apply(LogicalPlan plan) {
} else {
// Aggs cannot produce pages with 0 columns, so retain one grouping.
remaining = List.of(Expressions.attribute(aggregate.groupings().get(0)));
p = new Aggregate(
aggregate.source(),
aggregate.child(),
aggregate.aggregateType(),
aggregate.groupings(),
remaining
);
p = aggregate.with(aggregate.groupings(), remaining);
}
} else {
p = new Aggregate(
aggregate.source(),
aggregate.child(),
aggregate.aggregateType(),
aggregate.groupings(),
remaining
);
p = aggregate.with(aggregate.groupings(), remaining);
}
}
} else if (p instanceof Eval eval) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -107,7 +107,7 @@ protected LogicalPlan rule(Aggregate aggregate) {
if (changed) {
var source = aggregate.source();
if (newAggs.isEmpty() == false) {
plan = new Aggregate(source, aggregate.child(), aggregate.aggregateType(), aggregate.groupings(), newAggs);
plan = aggregate.with(aggregate.child(), aggregate.groupings(), newAggs);
} else {
// All aggs actually have been surrogates for (foldable) expressions, e.g.
// \_Aggregate[[],[AVG([1, 2][INTEGER]) AS s]]
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
import org.elasticsearch.xpack.esql.plan.logical.Aggregate;
import org.elasticsearch.xpack.esql.plan.logical.EsRelation;
import org.elasticsearch.xpack.esql.plan.logical.LogicalPlan;
import org.elasticsearch.xpack.esql.plan.logical.TimeSeriesAggregate;

import java.util.ArrayList;
import java.util.HashMap;
Expand Down Expand Up @@ -123,10 +124,14 @@ public TranslateTimeSeriesAggregate() {

@Override
protected LogicalPlan rule(Aggregate aggregate) {
return translate(aggregate);
if (aggregate instanceof TimeSeriesAggregate ts) {
return translate(ts);
} else {
return aggregate;
}
}

LogicalPlan translate(Aggregate aggregate) {
LogicalPlan translate(TimeSeriesAggregate aggregate) {
Map<Rate, Alias> rateAggs = new HashMap<>();
List<NamedExpression> firstPassAggs = new ArrayList<>();
List<NamedExpression> secondPassAggs = new ArrayList<>();
Expand All @@ -153,7 +158,7 @@ LogicalPlan translate(Aggregate aggregate) {
}
}
if (rateAggs.isEmpty()) {
return aggregate;
return new Aggregate(aggregate.source(), aggregate.child(), aggregate.groupings(), aggregate.aggregates());
}
Holder<Attribute> tsid = new Holder<>();
Holder<Attribute> timestamp = new Holder<>();
Expand Down Expand Up @@ -204,7 +209,7 @@ LogicalPlan translate(Aggregate aggregate) {
}
secondPassGroupings.add(new Alias(g.source(), g.name(), newFinalGroup.toAttribute(), g.id()));
}
LogicalPlan relation = aggregate.child().transformUp(EsRelation.class, r -> {
LogicalPlan newChild = aggregate.child().transformUp(EsRelation.class, r -> {
if (r.output().contains(tsid.get()) == false) {
return new EsRelation(
r.source(),
Expand All @@ -217,26 +222,19 @@ LogicalPlan translate(Aggregate aggregate) {
return r;
}
});
return newAggregate(
newAggregate(relation, Aggregate.AggregateType.TIME_SERIES, firstPassAggs, firstPassGroupings),
Aggregate.AggregateType.STANDARD,
secondPassAggs,
secondPassGroupings
final var firstPhase = new TimeSeriesAggregate(
newChild.source(),
newChild,
firstPassGroupings,
mergeExpressions(firstPassAggs, firstPassGroupings)
);
return new Aggregate(firstPhase.source(), firstPhase, secondPassGroupings, mergeExpressions(secondPassAggs, secondPassGroupings));
}

private static Aggregate newAggregate(
LogicalPlan child,
Aggregate.AggregateType type,
private static List<? extends NamedExpression> mergeExpressions(
List<? extends NamedExpression> aggregates,
List<Expression> groupings
) {
return new Aggregate(
child.source(),
child,
type,
groupings,
Stream.concat(aggregates.stream(), groupings.stream().map(Expressions::attribute)).toList()
);
return Stream.concat(aggregates.stream(), groupings.stream().map(Expressions::attribute)).toList();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,7 @@
import org.elasticsearch.xpack.esql.plan.logical.Rename;
import org.elasticsearch.xpack.esql.plan.logical.Row;
import org.elasticsearch.xpack.esql.plan.logical.RrfScoreEval;
import org.elasticsearch.xpack.esql.plan.logical.TimeSeriesAggregate;
import org.elasticsearch.xpack.esql.plan.logical.UnresolvedRelation;
import org.elasticsearch.xpack.esql.plan.logical.join.LookupJoin;
import org.elasticsearch.xpack.esql.plan.logical.join.StubRelation;
Expand Down Expand Up @@ -313,7 +314,13 @@ public PlanFactory visitInsistCommand(EsqlBaseParser.InsistCommandContext ctx) {
@Override
public PlanFactory visitStatsCommand(EsqlBaseParser.StatsCommandContext ctx) {
final Stats stats = stats(source(ctx), ctx.grouping, ctx.stats);
return input -> new Aggregate(source(ctx), input, Aggregate.AggregateType.STANDARD, stats.groupings, stats.aggregates);
return input -> {
if (input.anyMatch(p -> p instanceof UnresolvedRelation ur && ur.indexMode() == IndexMode.TIME_SERIES)) {
return new TimeSeriesAggregate(source(ctx), input, stats.groupings, stats.aggregates);
} else {
return new Aggregate(source(ctx), input, stats.groupings, stats.aggregates);
}
};
}

private record Stats(List<Expression> groupings, List<? extends NamedExpression> aggregates) {}
Expand Down Expand Up @@ -363,10 +370,7 @@ public PlanFactory visitInlinestatsCommand(EsqlBaseParser.InlinestatsCommandCont
List<NamedExpression> groupings = visitGrouping(ctx.grouping);
aggregates.addAll(groupings);
// TODO: add support for filters
return input -> new InlineStats(
source(ctx),
new Aggregate(source(ctx), input, Aggregate.AggregateType.STANDARD, new ArrayList<>(groupings), aggregates)
);
return input -> new InlineStats(source(ctx), new Aggregate(source(ctx), input, new ArrayList<>(groupings), aggregates));
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
import org.elasticsearch.xpack.esql.plan.logical.MvExpand;
import org.elasticsearch.xpack.esql.plan.logical.OrderBy;
import org.elasticsearch.xpack.esql.plan.logical.Project;
import org.elasticsearch.xpack.esql.plan.logical.TimeSeriesAggregate;
import org.elasticsearch.xpack.esql.plan.logical.TopN;
import org.elasticsearch.xpack.esql.plan.logical.join.InlineJoin;
import org.elasticsearch.xpack.esql.plan.logical.join.Join;
Expand All @@ -46,6 +47,7 @@
import org.elasticsearch.xpack.esql.plan.physical.ProjectExec;
import org.elasticsearch.xpack.esql.plan.physical.ShowExec;
import org.elasticsearch.xpack.esql.plan.physical.SubqueryExec;
import org.elasticsearch.xpack.esql.plan.physical.TimeSeriesAggregateExec;
import org.elasticsearch.xpack.esql.plan.physical.TopNExec;

import java.util.ArrayList;
Expand Down Expand Up @@ -79,6 +81,7 @@ public static List<NamedWriteableRegistry.Entry> logical() {
MvExpand.ENTRY,
OrderBy.ENTRY,
Project.ENTRY,
TimeSeriesAggregate.ENTRY,
TopN.ENTRY
);
}
Expand All @@ -105,6 +108,7 @@ public static List<NamedWriteableRegistry.Entry> physical() {
ProjectExec.ENTRY,
ShowExec.ENTRY,
SubqueryExec.ENTRY,
TimeSeriesAggregateExec.ENTRY,
TopNExec.ENTRY
);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -48,61 +48,35 @@ public class Aggregate extends UnaryPlan implements PostAnalysisVerificationAwar
Aggregate::new
);

public enum AggregateType {
STANDARD,
TIME_SERIES;

static void writeType(StreamOutput out, AggregateType type) throws IOException {
if (out.getTransportVersion().onOrAfter(TransportVersions.V_8_15_0)) {
out.writeString(type.name());
} else if (type != STANDARD) {
throw new IllegalStateException("cluster is not ready to support aggregate type [" + type + "]");
}
}

static AggregateType readType(StreamInput in) throws IOException {
if (in.getTransportVersion().onOrAfter(TransportVersions.V_8_15_0)) {
return AggregateType.valueOf(in.readString());
} else {
return STANDARD;
}
}
}

private final AggregateType aggregateType;
private final List<Expression> groupings;
private final List<? extends NamedExpression> aggregates;
protected final List<Expression> groupings;
protected final List<? extends NamedExpression> aggregates;

private List<Attribute> lazyOutput;
protected List<Attribute> lazyOutput;

public Aggregate(
Source source,
LogicalPlan child,
AggregateType aggregateType,
List<Expression> groupings,
List<? extends NamedExpression> aggregates
) {
public Aggregate(Source source, LogicalPlan child, List<Expression> groupings, List<? extends NamedExpression> aggregates) {
super(source, child);
this.aggregateType = aggregateType;
this.groupings = groupings;
this.aggregates = aggregates;
}

public Aggregate(StreamInput in) throws IOException {
this(
Source.readFrom((PlanStreamInput) in),
in.readNamedWriteable(LogicalPlan.class),
AggregateType.readType(in),
in.readNamedWriteableCollectionAsList(Expression.class),
in.readNamedWriteableCollectionAsList(NamedExpression.class)
);
super(Source.readFrom((PlanStreamInput) in), in.readNamedWriteable(LogicalPlan.class));
if (in.getTransportVersion().onOrAfter(TransportVersions.V_8_15_0)
&& in.getTransportVersion().before(TransportVersions.ESQL_REMOVE_AGGREGATE_TYPE)) {
in.readString();
}
this.groupings = in.readNamedWriteableCollectionAsList(Expression.class);
this.aggregates = in.readNamedWriteableCollectionAsList(NamedExpression.class);
}

@Override
public void writeTo(StreamOutput out) throws IOException {
Source.EMPTY.writeTo(out);
out.writeNamedWriteable(child());
AggregateType.writeType(out, aggregateType());
if (out.getTransportVersion().onOrAfter(TransportVersions.V_8_15_0)
&& out.getTransportVersion().before(TransportVersions.ESQL_REMOVE_AGGREGATE_TYPE)) {
out.writeString("STANDARD");
}
out.writeNamedWriteableCollection(groupings);
out.writeNamedWriteableCollection(aggregates());
}
Expand All @@ -113,25 +87,21 @@ public String getWriteableName() {
}

@Override
protected NodeInfo<Aggregate> info() {
return NodeInfo.create(this, Aggregate::new, child(), aggregateType, groupings, aggregates);
protected NodeInfo<? extends Aggregate> info() {
return NodeInfo.create(this, Aggregate::new, child(), groupings, aggregates);
}

@Override
public Aggregate replaceChild(LogicalPlan newChild) {
return new Aggregate(source(), newChild, aggregateType, groupings, aggregates);
return new Aggregate(source(), newChild, groupings, aggregates);
}

public Aggregate with(List<Expression> newGroupings, List<? extends NamedExpression> newAggregates) {
return with(child(), newGroupings, newAggregates);
}

public Aggregate with(LogicalPlan child, List<Expression> newGroupings, List<? extends NamedExpression> newAggregates) {
return new Aggregate(source(), child, aggregateType(), newGroupings, newAggregates);
}

public AggregateType aggregateType() {
return aggregateType;
return new Aggregate(source(), child, newGroupings, newAggregates);
}

public List<Expression> groupings() {
Expand All @@ -144,10 +114,7 @@ public List<? extends NamedExpression> aggregates() {

@Override
public String telemetryLabel() {
return switch (aggregateType) {
case STANDARD -> "STATS";
case TIME_SERIES -> "TIME_SERIES";
};
return "STATS";
}

@Override
Expand Down Expand Up @@ -184,7 +151,7 @@ public static AttributeSet computeReferences(List<? extends NamedExpression> agg

@Override
public int hashCode() {
return Objects.hash(aggregateType, groupings, aggregates, child());
return Objects.hash(groupings, aggregates, child());
}

@Override
Expand All @@ -198,8 +165,7 @@ public boolean equals(Object obj) {
}

Aggregate other = (Aggregate) obj;
return aggregateType == other.aggregateType
&& Objects.equals(groupings, other.groupings)
return Objects.equals(groupings, other.groupings)
&& Objects.equals(aggregates, other.aggregates)
&& Objects.equals(child(), other.child());
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -71,7 +71,7 @@ public UnaryPlan replaceChild(LogicalPlan newChild) {

@Override
public LogicalPlan surrogate() {
return new Aggregate(source(), child(), Aggregate.AggregateType.STANDARD, new ArrayList<>(groupings), finalAggs());
return new Aggregate(source(), child(), new ArrayList<>(groupings), finalAggs());
}

public List<NamedExpression> aggregates() {
Expand Down
Loading