Skip to content

Commit adc011a

Browse files
authored
Merge branch 'main' into feature/multi-project/health-dlm
2 parents 780ab4e + b0d7c7d commit adc011a

File tree

23 files changed

+346
-221
lines changed

23 files changed

+346
-221
lines changed

muted-tests.yml

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -395,6 +395,9 @@ tests:
395395
- class: org.elasticsearch.packaging.test.DockerTests
396396
method: test020PluginsListWithNoPlugins
397397
issue: https://github.com/elastic/elasticsearch/issues/126232
398+
- class: org.elasticsearch.xpack.test.rest.XPackRestIT
399+
method: test {p0=transform/transforms_reset/Test force reseting a running transform}
400+
issue: https://github.com/elastic/elasticsearch/issues/126240
398401

399402
# Examples:
400403
#

server/src/main/java/org/elasticsearch/TransportVersions.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -211,7 +211,8 @@ static TransportVersion def(int id) {
211211
public static final TransportVersion REPOSITORIES_METADATA_AS_PROJECT_CUSTOM = def(9_042_0_00);
212212
public static final TransportVersion BATCHED_QUERY_PHASE_VERSION = def(9_043_0_00);
213213
public static final TransportVersion REMOTE_EXCEPTION = def(9_044_0_00);
214-
public static final TransportVersion ADD_PROJECT_ID_TO_DSL_ERROR_INFO = def(9_045_0_00);
214+
public static final TransportVersion ESQL_REMOVE_AGGREGATE_TYPE = def(9_045_0_00);
215+
public static final TransportVersion ADD_PROJECT_ID_TO_DSL_ERROR_INFO = def(9_046_0_00);
215216

216217
/*
217218
* STOP! READ THIS FIRST! No, really,

x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/operator/TimeSeriesAggregationOperator.java

Lines changed: 3 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -23,19 +23,18 @@
2323
public class TimeSeriesAggregationOperator extends HashAggregationOperator {
2424

2525
public record Factory(
26-
BlockHash.GroupSpec tsidGroup,
27-
BlockHash.GroupSpec timestampGroup,
26+
List<BlockHash.GroupSpec> groups,
2827
AggregatorMode aggregatorMode,
2928
List<GroupingAggregator.Factory> aggregators,
3029
int maxPageSize
3130
) implements OperatorFactory {
3231
@Override
3332
public Operator get(DriverContext driverContext) {
3433
// TODO: use TimeSeriesBlockHash when possible
35-
return new HashAggregationOperator(
34+
return new TimeSeriesAggregationOperator(
3635
aggregators,
3736
() -> BlockHash.build(
38-
List.of(tsidGroup, timestampGroup),
37+
groups,
3938
driverContext.blockFactory(),
4039
maxPageSize,
4140
true // we can enable optimizations as the inputs are vectors

x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/optimizer/rules/logical/CombineProjections.java

Lines changed: 2 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -53,7 +53,7 @@ protected LogicalPlan rule(UnaryPlan plan) {
5353
// project can be fully removed
5454
if (newAggs != null) {
5555
var newGroups = replacePrunedAliasesUsedInGroupBy(a.groupings(), aggs, newAggs);
56-
plan = new Aggregate(a.source(), a.child(), a.aggregateType(), newGroups, newAggs);
56+
plan = a.with(newGroups, newAggs);
5757
}
5858
}
5959
return plan;
@@ -75,10 +75,8 @@ protected LogicalPlan rule(UnaryPlan plan) {
7575
throw new EsqlIllegalArgumentException("Expected an Attribute, got {}", grouping);
7676
}
7777
}
78-
plan = new Aggregate(
79-
a.source(),
78+
plan = a.with(
8079
p.child(),
81-
a.aggregateType(),
8280
combineUpperGroupingsAndLowerProjections(groupingAttrs, p.projections()),
8381
combineProjections(a.aggregates(), p.projections())
8482
);

x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/optimizer/rules/logical/ExtractAggregateCommonFilter.java

Lines changed: 1 addition & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -67,12 +67,6 @@ protected LogicalPlan rule(Aggregate aggregate) {
6767
}
6868

6969
// build the new agg on top of extracted filter
70-
return new Aggregate(
71-
aggregate.source(),
72-
new Filter(aggregate.source(), aggregate.child(), common.v1()),
73-
aggregate.aggregateType(),
74-
aggregate.groupings(),
75-
newAggs
76-
);
70+
return aggregate.with(new Filter(aggregate.source(), aggregate.child(), common.v1()), aggregate.groupings(), newAggs);
7771
}
7872
}

x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/optimizer/rules/logical/PruneColumns.java

Lines changed: 2 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -73,22 +73,10 @@ public LogicalPlan apply(LogicalPlan plan) {
7373
} else {
7474
// Aggs cannot produce pages with 0 columns, so retain one grouping.
7575
remaining = List.of(Expressions.attribute(aggregate.groupings().get(0)));
76-
p = new Aggregate(
77-
aggregate.source(),
78-
aggregate.child(),
79-
aggregate.aggregateType(),
80-
aggregate.groupings(),
81-
remaining
82-
);
76+
p = aggregate.with(aggregate.groupings(), remaining);
8377
}
8478
} else {
85-
p = new Aggregate(
86-
aggregate.source(),
87-
aggregate.child(),
88-
aggregate.aggregateType(),
89-
aggregate.groupings(),
90-
remaining
91-
);
79+
p = aggregate.with(aggregate.groupings(), remaining);
9280
}
9381
}
9482
} else if (p instanceof Eval eval) {

x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/optimizer/rules/logical/SubstituteSurrogates.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -107,7 +107,7 @@ protected LogicalPlan rule(Aggregate aggregate) {
107107
if (changed) {
108108
var source = aggregate.source();
109109
if (newAggs.isEmpty() == false) {
110-
plan = new Aggregate(source, aggregate.child(), aggregate.aggregateType(), aggregate.groupings(), newAggs);
110+
plan = aggregate.with(aggregate.child(), aggregate.groupings(), newAggs);
111111
} else {
112112
// All aggs actually have been surrogates for (foldable) expressions, e.g.
113113
// \_Aggregate[[],[AVG([1, 2][INTEGER]) AS s]]

x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/optimizer/rules/logical/TranslateTimeSeriesAggregate.java

Lines changed: 21 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -26,12 +26,12 @@
2626
import org.elasticsearch.xpack.esql.plan.logical.Aggregate;
2727
import org.elasticsearch.xpack.esql.plan.logical.EsRelation;
2828
import org.elasticsearch.xpack.esql.plan.logical.LogicalPlan;
29+
import org.elasticsearch.xpack.esql.plan.logical.TimeSeriesAggregate;
2930

3031
import java.util.ArrayList;
3132
import java.util.HashMap;
3233
import java.util.List;
3334
import java.util.Map;
34-
import java.util.stream.Stream;
3535

3636
/**
3737
* Rate aggregation is special because it must be computed per time series, regardless of the grouping keys.
@@ -123,10 +123,14 @@ public TranslateTimeSeriesAggregate() {
123123

124124
@Override
125125
protected LogicalPlan rule(Aggregate aggregate) {
126-
return translate(aggregate);
126+
if (aggregate instanceof TimeSeriesAggregate ts) {
127+
return translate(ts);
128+
} else {
129+
return aggregate;
130+
}
127131
}
128132

129-
LogicalPlan translate(Aggregate aggregate) {
133+
LogicalPlan translate(TimeSeriesAggregate aggregate) {
130134
Map<Rate, Alias> rateAggs = new HashMap<>();
131135
List<NamedExpression> firstPassAggs = new ArrayList<>();
132136
List<NamedExpression> secondPassAggs = new ArrayList<>();
@@ -153,7 +157,8 @@ LogicalPlan translate(Aggregate aggregate) {
153157
}
154158
}
155159
if (rateAggs.isEmpty()) {
156-
return aggregate;
160+
// no time-series aggregations, run a regular aggregation instead.
161+
return new Aggregate(aggregate.source(), aggregate.child(), aggregate.groupings(), aggregate.aggregates());
157162
}
158163
Holder<Attribute> tsid = new Holder<>();
159164
Holder<Attribute> timestamp = new Holder<>();
@@ -204,7 +209,7 @@ LogicalPlan translate(Aggregate aggregate) {
204209
}
205210
secondPassGroupings.add(new Alias(g.source(), g.name(), newFinalGroup.toAttribute(), g.id()));
206211
}
207-
LogicalPlan relation = aggregate.child().transformUp(EsRelation.class, r -> {
212+
LogicalPlan newChild = aggregate.child().transformUp(EsRelation.class, r -> {
208213
if (r.output().contains(tsid.get()) == false) {
209214
return new EsRelation(
210215
r.source(),
@@ -217,26 +222,22 @@ LogicalPlan translate(Aggregate aggregate) {
217222
return r;
218223
}
219224
});
220-
return newAggregate(
221-
newAggregate(relation, Aggregate.AggregateType.TIME_SERIES, firstPassAggs, firstPassGroupings),
222-
Aggregate.AggregateType.STANDARD,
223-
secondPassAggs,
224-
secondPassGroupings
225+
final var firstPhase = new TimeSeriesAggregate(
226+
newChild.source(),
227+
newChild,
228+
firstPassGroupings,
229+
mergeExpressions(firstPassAggs, firstPassGroupings)
225230
);
231+
return new Aggregate(firstPhase.source(), firstPhase, secondPassGroupings, mergeExpressions(secondPassAggs, secondPassGroupings));
226232
}
227233

228-
private static Aggregate newAggregate(
229-
LogicalPlan child,
230-
Aggregate.AggregateType type,
234+
private static List<? extends NamedExpression> mergeExpressions(
231235
List<? extends NamedExpression> aggregates,
232236
List<Expression> groupings
233237
) {
234-
return new Aggregate(
235-
child.source(),
236-
child,
237-
type,
238-
groupings,
239-
Stream.concat(aggregates.stream(), groupings.stream().map(Expressions::attribute)).toList()
240-
);
238+
List<NamedExpression> merged = new ArrayList<>(aggregates.size() + groupings.size());
239+
merged.addAll(aggregates);
240+
groupings.forEach(g -> merged.add(Expressions.attribute(g)));
241+
return merged;
241242
}
242243
}

x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/parser/LogicalPlanBuilder.java

Lines changed: 9 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -66,6 +66,7 @@
6666
import org.elasticsearch.xpack.esql.plan.logical.Rename;
6767
import org.elasticsearch.xpack.esql.plan.logical.Row;
6868
import org.elasticsearch.xpack.esql.plan.logical.RrfScoreEval;
69+
import org.elasticsearch.xpack.esql.plan.logical.TimeSeriesAggregate;
6970
import org.elasticsearch.xpack.esql.plan.logical.UnresolvedRelation;
7071
import org.elasticsearch.xpack.esql.plan.logical.join.LookupJoin;
7172
import org.elasticsearch.xpack.esql.plan.logical.show.ShowInfo;
@@ -312,7 +313,13 @@ public PlanFactory visitInsistCommand(EsqlBaseParser.InsistCommandContext ctx) {
312313
@Override
313314
public PlanFactory visitStatsCommand(EsqlBaseParser.StatsCommandContext ctx) {
314315
final Stats stats = stats(source(ctx), ctx.grouping, ctx.stats);
315-
return input -> new Aggregate(source(ctx), input, Aggregate.AggregateType.STANDARD, stats.groupings, stats.aggregates);
316+
return input -> {
317+
if (input.anyMatch(p -> p instanceof UnresolvedRelation ur && ur.indexMode() == IndexMode.TIME_SERIES)) {
318+
return new TimeSeriesAggregate(source(ctx), input, stats.groupings, stats.aggregates);
319+
} else {
320+
return new Aggregate(source(ctx), input, stats.groupings, stats.aggregates);
321+
}
322+
};
316323
}
317324

318325
private record Stats(List<Expression> groupings, List<? extends NamedExpression> aggregates) {}
@@ -362,10 +369,7 @@ public PlanFactory visitInlinestatsCommand(EsqlBaseParser.InlinestatsCommandCont
362369
List<NamedExpression> groupings = visitGrouping(ctx.grouping);
363370
aggregates.addAll(groupings);
364371
// TODO: add support for filters
365-
return input -> new InlineStats(
366-
source(ctx),
367-
new Aggregate(source(ctx), input, Aggregate.AggregateType.STANDARD, new ArrayList<>(groupings), aggregates)
368-
);
372+
return input -> new InlineStats(source(ctx), new Aggregate(source(ctx), input, new ArrayList<>(groupings), aggregates));
369373
}
370374

371375
@Override

x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plan/PlanWritables.java

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,7 @@
2121
import org.elasticsearch.xpack.esql.plan.logical.MvExpand;
2222
import org.elasticsearch.xpack.esql.plan.logical.OrderBy;
2323
import org.elasticsearch.xpack.esql.plan.logical.Project;
24+
import org.elasticsearch.xpack.esql.plan.logical.TimeSeriesAggregate;
2425
import org.elasticsearch.xpack.esql.plan.logical.TopN;
2526
import org.elasticsearch.xpack.esql.plan.logical.join.InlineJoin;
2627
import org.elasticsearch.xpack.esql.plan.logical.join.Join;
@@ -46,6 +47,7 @@
4647
import org.elasticsearch.xpack.esql.plan.physical.ProjectExec;
4748
import org.elasticsearch.xpack.esql.plan.physical.ShowExec;
4849
import org.elasticsearch.xpack.esql.plan.physical.SubqueryExec;
50+
import org.elasticsearch.xpack.esql.plan.physical.TimeSeriesAggregateExec;
4951
import org.elasticsearch.xpack.esql.plan.physical.TopNExec;
5052

5153
import java.util.ArrayList;
@@ -79,6 +81,7 @@ public static List<NamedWriteableRegistry.Entry> logical() {
7981
MvExpand.ENTRY,
8082
OrderBy.ENTRY,
8183
Project.ENTRY,
84+
TimeSeriesAggregate.ENTRY,
8285
TopN.ENTRY
8386
);
8487
}
@@ -105,6 +108,7 @@ public static List<NamedWriteableRegistry.Entry> physical() {
105108
ProjectExec.ENTRY,
106109
ShowExec.ENTRY,
107110
SubqueryExec.ENTRY,
111+
TimeSeriesAggregateExec.ENTRY,
108112
TopNExec.ENTRY
109113
);
110114
}

0 commit comments

Comments
 (0)