Skip to content

Commit f974070

Browse files
authored
Do not pack non-dimension fields in TS (#138929)
We accidentally pack non-dimension fields when translating TS stats in #136216. Relates #136216
1 parent 406bcf0 commit f974070

File tree

3 files changed

+190
-25
lines changed

3 files changed

+190
-25
lines changed

docs/changelog/138929.yaml

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,5 @@
1+
pr: 138929
2+
summary: Do not pack non-dimension fields in TS
3+
area: ES|QL
4+
type: bug
5+
issues: []

x-pack/plugin/esql/src/internalClusterTest/java/org/elasticsearch/xpack/esql/action/TimeSeriesIT.java

Lines changed: 157 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -9,22 +9,27 @@
99

1010
import org.elasticsearch.cluster.metadata.IndexMetadata;
1111
import org.elasticsearch.common.Randomness;
12+
import org.elasticsearch.common.Rounding;
1213
import org.elasticsearch.common.settings.Settings;
1314
import org.elasticsearch.common.unit.ByteSizeValue;
1415
import org.elasticsearch.common.util.iterable.Iterables;
16+
import org.elasticsearch.common.util.set.Sets;
1517
import org.elasticsearch.compute.lucene.LuceneSliceQueue;
1618
import org.elasticsearch.compute.lucene.LuceneSourceOperator;
1719
import org.elasticsearch.compute.lucene.read.ValuesSourceReaderOperatorStatus;
1820
import org.elasticsearch.compute.operator.DriverProfile;
1921
import org.elasticsearch.compute.operator.OperatorStatus;
2022
import org.elasticsearch.compute.operator.TimeSeriesAggregationOperator;
23+
import org.elasticsearch.core.TimeValue;
24+
import org.elasticsearch.core.Tuple;
2125
import org.elasticsearch.xpack.esql.EsqlTestUtils;
2226
import org.elasticsearch.xpack.esql.core.type.DataType;
2327
import org.elasticsearch.xpack.esql.plugin.QueryPragmas;
2428
import org.hamcrest.Matchers;
2529
import org.junit.Before;
2630

2731
import java.util.ArrayList;
32+
import java.util.Collection;
2833
import java.util.Comparator;
2934
import java.util.HashMap;
3035
import java.util.List;
@@ -53,6 +58,8 @@ public void testEmpty() {
5358
.setMapping(
5459
"@timestamp",
5560
"type=date",
61+
"project",
62+
"type=keyword",
5663
"host",
5764
"type=keyword,time_series_dimension=true",
5865
"cpu",
@@ -62,7 +69,15 @@ public void testEmpty() {
6269
run("TS empty_index | LIMIT 1").close();
6370
}
6471

65-
record Doc(String host, String cluster, long timestamp, int requestCount, double cpu, ByteSizeValue memory) {}
72+
record Doc(
73+
Collection<String> project,
74+
String host,
75+
String cluster,
76+
long timestamp,
77+
int requestCount,
78+
double cpu,
79+
ByteSizeValue memory
80+
) {}
6681

6782
final List<Doc> docs = new ArrayList<>();
6883

@@ -98,6 +113,8 @@ public void populateIndex() {
98113
.setMapping(
99114
"@timestamp",
100115
"type=date",
116+
"project",
117+
"type=keyword",
101118
"host",
102119
"type=keyword,time_series_dimension=true",
103120
"cluster",
@@ -118,6 +135,7 @@ public void populateIndex() {
118135
int numDocs = between(20, 100);
119136
docs.clear();
120137
Map<String, Integer> requestCounts = new HashMap<>();
138+
List<String> allProjects = List.of("project-1", "project-2", "project-3");
121139
for (int i = 0; i < numDocs; i++) {
122140
List<String> hosts = randomSubsetOf(between(1, hostToClusters.size()), hostToClusters.keySet());
123141
timestamp += between(1, 10) * 1000L;
@@ -131,7 +149,8 @@ public void populateIndex() {
131149
});
132150
int cpu = randomIntBetween(0, 100);
133151
ByteSizeValue memory = ByteSizeValue.ofBytes(randomIntBetween(1024, 1024 * 1024));
134-
docs.add(new Doc(host, hostToClusters.get(host), timestamp, requestCount, cpu, memory));
152+
List<String> projects = randomSubsetOf(between(1, 3), allProjects);
153+
docs.add(new Doc(projects, host, hostToClusters.get(host), timestamp, requestCount, cpu, memory));
135154
}
136155
}
137156
Randomness.shuffle(docs);
@@ -140,6 +159,8 @@ public void populateIndex() {
140159
.setSource(
141160
"@timestamp",
142161
doc.timestamp,
162+
"project",
163+
doc.project,
143164
"host",
144165
doc.host,
145166
"cluster",
@@ -672,7 +693,8 @@ public void testNullMetricsAreSkipped() {
672693
});
673694
int cpu = randomIntBetween(0, 100);
674695
ByteSizeValue memory = ByteSizeValue.ofBytes(randomIntBetween(1024, 1024 * 1024));
675-
sparseDocs.add(new Doc(host, hostToClusters.get(host), timestamp, requestCount, cpu, memory));
696+
String project = randomFrom("project-1", "project-2", "project-3");
697+
sparseDocs.add(new Doc(List.of(project), host, hostToClusters.get(host), timestamp, requestCount, cpu, memory));
676698
}
677699

678700
Randomness.shuffle(sparseDocs);
@@ -729,4 +751,136 @@ public void testTSIDMetadataAttribute() {
729751
}
730752
}
731753

754+
public void testGroupByProject() {
755+
record TimeSeries(String cluster, String host) {
756+
757+
}
758+
record Sample(int count, double sum) {
759+
760+
}
761+
Map<TimeSeries, Tuple<Sample, Set<String>>> buckets = new HashMap<>();
762+
for (Doc doc : docs) {
763+
TimeSeries timeSeries = new TimeSeries(doc.cluster, doc.host);
764+
buckets.compute(timeSeries, (k, v) -> {
765+
if (v == null) {
766+
return Tuple.tuple(new Sample(1, doc.cpu), Set.copyOf(doc.project));
767+
} else {
768+
Set<String> projects = Sets.union(v.v2(), Sets.newHashSet(doc.project));
769+
return Tuple.tuple(new Sample(v.v1().count + 1, v.v1().sum + doc.cpu), projects);
770+
}
771+
});
772+
}
773+
try (var resp = run("TS host* | STATS avg(avg_over_time(cpu)) BY project")) {
774+
Map<String, Integer> countPerProject = new HashMap<>();
775+
Map<String, Double> sumOfAvgPerProject = new HashMap<>();
776+
for (var e : buckets.entrySet()) {
777+
Sample sample = e.getValue().v1();
778+
for (String project : e.getValue().v2()) {
779+
countPerProject.merge(project, 1, Integer::sum);
780+
sumOfAvgPerProject.merge(project, sample.sum / sample.count, Double::sum);
781+
}
782+
}
783+
List<List<Object>> rows = EsqlTestUtils.getValuesList(resp);
784+
assertThat(rows, hasSize(sumOfAvgPerProject.size()));
785+
for (List<Object> r : rows) {
786+
String project = (String) r.get(1);
787+
double actualAvg = (Double) r.get(0);
788+
double expectedAvg = sumOfAvgPerProject.get(project) / countPerProject.get(project);
789+
assertThat(actualAvg, closeTo(expectedAvg, 0.5));
790+
}
791+
}
792+
try (var resp = run("TS host* | STATS avg(avg_over_time(cpu)) BY project, cluster")) {
793+
record Key(String project, String cluster) {
794+
795+
}
796+
Map<Key, Integer> countPerProject = new HashMap<>();
797+
Map<Key, Double> sumOfAvgPerProject = new HashMap<>();
798+
for (var e : buckets.entrySet()) {
799+
Sample sample = e.getValue().v1();
800+
for (String project : e.getValue().v2()) {
801+
Key key = new Key(project, e.getKey().cluster);
802+
countPerProject.merge(key, 1, Integer::sum);
803+
sumOfAvgPerProject.merge(key, sample.sum / sample.count, Double::sum);
804+
}
805+
}
806+
List<List<Object>> rows = EsqlTestUtils.getValuesList(resp);
807+
assertThat(rows, hasSize(sumOfAvgPerProject.size()));
808+
for (List<Object> r : rows) {
809+
Key key = new Key((String) r.get(1), (String) r.get(2));
810+
double actualAvg = (Double) r.get(0);
811+
double expectedAvg = sumOfAvgPerProject.get(key) / countPerProject.get(key);
812+
assertThat(actualAvg, closeTo(expectedAvg, 0.5));
813+
}
814+
}
815+
}
816+
817+
public void testGroupByProjectAndTBucket() {
818+
record TimeSeries(String cluster, String host, String tbucket) {
819+
820+
}
821+
record Sample(int count, double sum) {
822+
823+
}
824+
Map<TimeSeries, Tuple<Sample, Set<String>>> buckets = new HashMap<>();
825+
var rounding = new Rounding.Builder(TimeValue.timeValueMillis(TimeValue.timeValueMinutes(1).millis())).build().prepareForUnknown();
826+
for (Doc doc : docs) {
827+
var tbucket = DEFAULT_DATE_TIME_FORMATTER.formatMillis(rounding.round(doc.timestamp));
828+
TimeSeries timeSeries = new TimeSeries(doc.cluster, doc.host, tbucket);
829+
buckets.compute(timeSeries, (k, v) -> {
830+
if (v == null) {
831+
return Tuple.tuple(new Sample(1, doc.cpu), Set.copyOf(doc.project));
832+
} else {
833+
Set<String> projects = Sets.union(v.v2(), Sets.newHashSet(doc.project));
834+
return Tuple.tuple(new Sample(v.v1().count + 1, v.v1().sum + doc.cpu), projects);
835+
}
836+
});
837+
}
838+
try (var resp = run("TS host* | STATS avg(avg_over_time(cpu)) BY project, tbucket(1minute)")) {
839+
record Key(String project, String tbucket) {
840+
841+
}
842+
Map<Key, Integer> countPerProject = new HashMap<>();
843+
Map<Key, Double> sumOfAvgPerProject = new HashMap<>();
844+
for (var e : buckets.entrySet()) {
845+
Sample sample = e.getValue().v1();
846+
for (String project : e.getValue().v2()) {
847+
Key key = new Key(project, e.getKey().tbucket);
848+
countPerProject.merge(key, 1, Integer::sum);
849+
sumOfAvgPerProject.merge(key, sample.sum / sample.count, Double::sum);
850+
}
851+
}
852+
List<List<Object>> rows = EsqlTestUtils.getValuesList(resp);
853+
assertThat(rows, hasSize(sumOfAvgPerProject.size()));
854+
for (List<Object> r : rows) {
855+
Key key = new Key((String) r.get(1), (String) r.get(2));
856+
double actualAvg = (Double) r.get(0);
857+
double expectedAvg = sumOfAvgPerProject.get(key) / countPerProject.get(key);
858+
assertThat(actualAvg, closeTo(expectedAvg, 0.5));
859+
}
860+
}
861+
try (var resp = run("TS host* | STATS avg(avg_over_time(cpu)) BY tbucket(1minute), project, cluster")) {
862+
record Key(String project, String cluster, String tbucket) {
863+
864+
}
865+
Map<Key, Integer> countPerProject = new HashMap<>();
866+
Map<Key, Double> sumOfAvgPerProject = new HashMap<>();
867+
for (var e : buckets.entrySet()) {
868+
Sample sample = e.getValue().v1();
869+
for (String project : e.getValue().v2()) {
870+
Key key = new Key(project, e.getKey().cluster, e.getKey().tbucket);
871+
countPerProject.merge(key, 1, Integer::sum);
872+
sumOfAvgPerProject.merge(key, sample.sum / sample.count, Double::sum);
873+
}
874+
}
875+
List<List<Object>> rows = EsqlTestUtils.getValuesList(resp);
876+
assertThat(rows, hasSize(sumOfAvgPerProject.size()));
877+
for (List<Object> r : rows) {
878+
Key key = new Key((String) r.get(2), (String) r.get(3), (String) r.get(1));
879+
double actualAvg = (Double) r.get(0);
880+
double expectedAvg = sumOfAvgPerProject.get(key) / countPerProject.get(key);
881+
assertThat(actualAvg, closeTo(expectedAvg, 0.5));
882+
}
883+
}
884+
}
885+
732886
}

x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/optimizer/rules/logical/TranslateTimeSeriesAggregate.java

Lines changed: 28 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -300,7 +300,9 @@ protected LogicalPlan rule(TimeSeriesAggregate aggregate, LogicalOptimizerContex
300300
}
301301
});
302302
NamedExpression timeBucket = timeBucketRef.get();
303-
for (var group : aggregate.groupings()) {
303+
boolean[] packPositions = new boolean[aggregate.groupings().size()];
304+
for (int i = 0; i < aggregate.groupings().size(); i++) {
305+
var group = aggregate.groupings().get(i);
304306
if (group instanceof Attribute == false) {
305307
throw new EsqlIllegalArgumentException("expected named expression for grouping; got " + group);
306308
}
@@ -312,21 +314,26 @@ protected LogicalPlan rule(TimeSeriesAggregate aggregate, LogicalOptimizerContex
312314
} else {
313315
var valuesAgg = new Alias(g.source(), g.name(), valuesAggregate(context, g));
314316
firstPassAggs.add(valuesAgg);
315-
Alias pack = new Alias(
316-
g.source(),
317-
internalNames.next("pack" + g.name()),
318-
new PackDimension(g.source(), valuesAgg.toAttribute())
319-
);
320-
packDimensions.add(pack);
321-
Alias grouping = new Alias(g.source(), internalNames.next("group" + g.name()), pack.toAttribute());
322-
secondPassGroupings.add(grouping);
323-
Alias unpack = new Alias(
324-
g.source(),
325-
g.name(),
326-
new UnpackDimension(g.source(), grouping.toAttribute(), g.dataType().noText()),
327-
g.id()
328-
);
329-
unpackDimensions.add(unpack);
317+
if (g.isDimension()) {
318+
Alias pack = new Alias(
319+
g.source(),
320+
internalNames.next("pack_" + g.name()),
321+
new PackDimension(g.source(), valuesAgg.toAttribute())
322+
);
323+
packDimensions.add(pack);
324+
Alias grouping = new Alias(g.source(), internalNames.next("group_" + g.name()), pack.toAttribute());
325+
secondPassGroupings.add(grouping);
326+
Alias unpack = new Alias(
327+
g.source(),
328+
g.name(),
329+
new UnpackDimension(g.source(), grouping.toAttribute(), g.dataType().noText()),
330+
g.id()
331+
);
332+
unpackDimensions.add(unpack);
333+
packPositions[i] = true;
334+
} else {
335+
secondPassGroupings.add(new Alias(g.source(), g.name(), valuesAgg.toAttribute(), g.id()));
336+
}
330337
}
331338
}
332339
LogicalPlan newChild = aggregate.child().transformUp(EsRelation.class, r -> {
@@ -365,13 +372,12 @@ protected LogicalPlan rule(TimeSeriesAggregate aggregate, LogicalOptimizerContex
365372
for (NamedExpression agg : secondPassAggs) {
366373
projects.add(Expressions.attribute(agg));
367374
}
368-
int pos = 0;
369-
for (Expression group : secondPassGroupings) {
370-
Attribute g = Expressions.attribute(group);
371-
if (timeBucket != null && g.id().equals(timeBucket.id())) {
372-
projects.add(g);
375+
int packPos = 0;
376+
for (int i = 0; i < secondPassGroupings.size(); i++) {
377+
if (packPositions[i]) {
378+
projects.add(unpackDimensions.get(packPos++).toAttribute());
373379
} else {
374-
projects.add(unpackDimensions.get(pos++).toAttribute());
380+
projects.add(Expressions.attribute(secondPassGroupings.get(i)));
375381
}
376382
}
377383
return new Project(newChild.source(), unpackValues, projects);

0 commit comments

Comments
 (0)