Skip to content

Commit 534cdb6

Browse files
authored
Esql Fix bug with loading TS metadata (#134648)
Relates to #133087 @dnhatn reported that we weren't seeing the performance improvement we expected from the null filters. I added tests and investigated, and it turns out I had forgotten to load the TS metadata from a single filed caps response. This PR includes the tests that help find the problem and the fix.
1 parent 136321f commit 534cdb6

File tree

9 files changed

+196
-13
lines changed

9 files changed

+196
-13
lines changed

x-pack/plugin/esql/qa/testFixtures/src/main/java/org/elasticsearch/xpack/esql/LoadMapping.java

Lines changed: 16 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -100,21 +100,33 @@ private static void walkMapping(String name, Object value, Map<String, EsField>
100100
properties = fromEs(content);
101101
}
102102
boolean docValues = boolSetting(content.get("doc_values"), esDataType.hasDocValues());
103+
boolean isDimension = boolSetting(content.get("time_series_dimension"), false);
104+
boolean isMetric = content.containsKey("time_series_metric");
105+
if (isDimension && isMetric) {
106+
throw new IllegalStateException("Field configured as both dimension and metric:" + value);
107+
}
108+
EsField.TimeSeriesFieldType tsType = EsField.TimeSeriesFieldType.NONE;
109+
if (isDimension) {
110+
tsType = EsField.TimeSeriesFieldType.DIMENSION;
111+
}
112+
if (isMetric) {
113+
tsType = EsField.TimeSeriesFieldType.METRIC;
114+
}
103115
final EsField field;
104116
if (esDataType == TEXT) {
105-
field = new TextEsField(name, properties, docValues, false, EsField.TimeSeriesFieldType.NONE);
117+
field = new TextEsField(name, properties, docValues, false, tsType);
106118
} else if (esDataType == KEYWORD) {
107119
int length = intSetting(content.get("ignore_above"), Short.MAX_VALUE);
108120
boolean normalized = Strings.hasText(textSetting(content.get("normalizer"), null));
109-
field = new KeywordEsField(name, properties, docValues, length, normalized, false, EsField.TimeSeriesFieldType.NONE);
121+
field = new KeywordEsField(name, properties, docValues, length, normalized, false, tsType);
110122
} else if (esDataType == DATETIME) {
111-
field = DateEsField.dateEsField(name, properties, docValues, EsField.TimeSeriesFieldType.NONE);
123+
field = DateEsField.dateEsField(name, properties, docValues, tsType);
112124
} else if (esDataType == UNSUPPORTED) {
113125
String type = content.get("type").toString();
114126
field = new UnsupportedEsField(name, List.of(type), null, properties);
115127
propagateUnsupportedType(name, type, properties);
116128
} else {
117-
field = new EsField(name, esDataType, properties, docValues, EsField.TimeSeriesFieldType.NONE);
129+
field = new EsField(name, esDataType, properties, docValues, tsType);
118130
}
119131
mapping.put(name, field);
120132
} else {

x-pack/plugin/esql/src/internalClusterTest/java/org/elasticsearch/xpack/esql/action/TimeSeriesIT.java

Lines changed: 82 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -610,4 +610,86 @@ public void testAvgOrSumOverTimeProfile() {
610610
}
611611
}
612612
}
613+
614+
public void testNullMetricsAreSkipped() {
615+
Settings settings = Settings.builder().put("mode", "time_series").putList("routing_path", List.of("host", "cluster")).build();
616+
client().admin()
617+
.indices()
618+
.prepareCreate("sparse-hosts")
619+
.setSettings(settings)
620+
.setMapping(
621+
"@timestamp",
622+
"type=date",
623+
"host",
624+
"type=keyword,time_series_dimension=true",
625+
"cluster",
626+
"type=keyword,time_series_dimension=true",
627+
"memory",
628+
"type=long,time_series_metric=gauge",
629+
"request_count",
630+
"type=integer,time_series_metric=counter"
631+
)
632+
.get();
633+
List<Doc> sparseDocs = new ArrayList<>();
634+
// generate 100 docs, 50 will have a null metric
635+
// TODO: this is all copied from populateIndex(), refactor it sensibly.
636+
Map<String, String> hostToClusters = new HashMap<>();
637+
for (int i = 0; i < 5; i++) {
638+
hostToClusters.put("p" + i, randomFrom("qa", "prod"));
639+
}
640+
long timestamp = DEFAULT_DATE_TIME_FORMATTER.parseMillis("2024-04-15T00:00:00Z");
641+
int numDocs = 100;
642+
Map<String, Integer> requestCounts = new HashMap<>();
643+
for (int i = 0; i < numDocs; i++) {
644+
String host = randomFrom(hostToClusters.keySet());
645+
timestamp += between(1, 10) * 1000L;
646+
int requestCount = requestCounts.compute(host, (k, curr) -> {
647+
if (curr == null) {
648+
return randomIntBetween(0, 10);
649+
} else {
650+
return curr + randomIntBetween(1, 10);
651+
}
652+
});
653+
int cpu = randomIntBetween(0, 100);
654+
ByteSizeValue memory = ByteSizeValue.ofBytes(randomIntBetween(1024, 1024 * 1024));
655+
sparseDocs.add(new Doc(host, hostToClusters.get(host), timestamp, requestCount, cpu, memory));
656+
}
657+
658+
Randomness.shuffle(sparseDocs);
659+
for (int i = 0; i < numDocs; i++) {
660+
Doc doc = sparseDocs.get(i);
661+
if (i % 2 == 0) {
662+
client().prepareIndex("sparse-hosts")
663+
.setSource("@timestamp", doc.timestamp, "host", doc.host, "cluster", doc.cluster, "request_count", doc.requestCount)
664+
.get();
665+
} else {
666+
client().prepareIndex("sparse-hosts")
667+
.setSource("@timestamp", doc.timestamp, "host", doc.host, "cluster", doc.cluster, "memory", doc.memory.getBytes())
668+
.get();
669+
}
670+
}
671+
client().admin().indices().prepareRefresh("sparse-hosts").get();
672+
// Control test
673+
try (EsqlQueryResponse resp = run("""
674+
TS sparse-hosts
675+
| WHERE request_count IS NOT NULL
676+
| STATS sum(rate(request_count)) BY cluster, host
677+
""")) {
678+
assertEquals("Control failed, data loading is broken", 50, resp.documentsFound());
679+
}
680+
681+
try (EsqlQueryResponse resp = run("""
682+
TS sparse-hosts
683+
| STATS sum(max_over_time(memory)) BY cluster, host
684+
""")) {
685+
assertEquals("Did not filter nulls on gauge type", 50, resp.documentsFound());
686+
}
687+
688+
try (EsqlQueryResponse resp = run("""
689+
TS sparse-hosts
690+
| STATS sum(rate(request_count)) BY cluster, host
691+
""")) {
692+
assertEquals("Did not filter nulls on counter type", 50, resp.documentsFound());
693+
}
694+
}
613695
}

x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/action/EsqlQueryResponse.java

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -196,6 +196,10 @@ public Iterator<Object> column(int columnIndex) {
196196
return ResponseValueUtils.valuesForColumn(columnIndex, columns.get(columnIndex).type(), pages);
197197
}
198198

199+
/**
200+
* @return the number of "documents" we got back from lucene, as input into the compute engine. Note that in this context, we think
201+
* of things like the result of LuceneMaxOperator as single documents.
202+
*/
199203
public long documentsFound() {
200204
return documentsFound;
201205
}

x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/planner/PlannerUtils.java

Lines changed: 5 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -41,6 +41,7 @@
4141
import org.elasticsearch.xpack.esql.plan.QueryPlan;
4242
import org.elasticsearch.xpack.esql.plan.logical.EsRelation;
4343
import org.elasticsearch.xpack.esql.plan.logical.Filter;
44+
import org.elasticsearch.xpack.esql.plan.logical.LogicalPlan;
4445
import org.elasticsearch.xpack.esql.plan.logical.PipelineBreaker;
4546
import org.elasticsearch.xpack.esql.plan.physical.AggregateExec;
4647
import org.elasticsearch.xpack.esql.plan.physical.EsSourceExec;
@@ -215,17 +216,17 @@ public static PhysicalPlan localPlan(
215216
.map(x -> ((LookupJoinExec) x).right())
216217
.collect(Collectors.toSet());
217218

218-
var localPhysicalPlan = plan.transformUp(FragmentExec.class, f -> {
219+
PhysicalPlan localPhysicalPlan = plan.transformUp(FragmentExec.class, f -> {
219220
if (lookupJoinExecRightChildren.contains(f)) {
220221
// Do not optimize the right child of a lookup join exec
221222
// The data node does not have the right stats to perform the optimization because the stats are on the lookup node
222223
// Also we only ship logical plans across the network, so the plan needs to remain logical
223224
return f;
224225
}
225226
isCoordPlan.set(Boolean.FALSE);
226-
var optimizedFragment = logicalOptimizer.localOptimize(f.fragment());
227-
var physicalFragment = localMapper.map(optimizedFragment);
228-
var filter = f.esFilter();
227+
LogicalPlan optimizedFragment = logicalOptimizer.localOptimize(f.fragment());
228+
PhysicalPlan physicalFragment = localMapper.map(optimizedFragment);
229+
QueryBuilder filter = f.esFilter();
229230
if (filter != null) {
230231
physicalFragment = physicalFragment.transformUp(
231232
EsSourceExec.class,

x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/session/IndexResolver.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -225,7 +225,7 @@ private static EsField createField(
225225
List<IndexFieldCapabilities> rest = fcs.subList(1, fcs.size());
226226
DataType type = EsqlDataTypeRegistry.INSTANCE.fromEs(first.type(), first.metricType());
227227
boolean aggregatable = first.isAggregatable();
228-
EsField.TimeSeriesFieldType timeSeriesFieldType = EsField.TimeSeriesFieldType.UNKNOWN;
228+
EsField.TimeSeriesFieldType timeSeriesFieldType = EsField.TimeSeriesFieldType.fromIndexFieldCapabilities(first);
229229
if (rest.isEmpty() == false) {
230230
for (IndexFieldCapabilities fc : rest) {
231231
if (first.metricType() != fc.metricType()) {
Lines changed: 80 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,80 @@
1+
/*
2+
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
3+
* or more contributor license agreements. Licensed under the Elastic License
4+
* 2.0; you may not use this file except in compliance with the Elastic License
5+
* 2.0.
6+
*/
7+
8+
package org.elasticsearch.xpack.esql.optimizer;
9+
10+
import org.elasticsearch.index.query.QueryBuilder;
11+
import org.elasticsearch.xpack.esql.action.EsqlCapabilities;
12+
import org.elasticsearch.xpack.esql.core.tree.NodeUtils;
13+
import org.elasticsearch.xpack.esql.plan.physical.EsQueryExec;
14+
import org.elasticsearch.xpack.esql.plan.physical.PhysicalPlan;
15+
import org.elasticsearch.xpack.esql.session.Configuration;
16+
17+
import static org.elasticsearch.index.query.QueryBuilders.existsQuery;
18+
import static org.elasticsearch.xpack.esql.core.querydsl.query.Query.unscore;
19+
import static org.hamcrest.Matchers.is;
20+
21+
/**
22+
* Tests for the {@link org.elasticsearch.xpack.esql.optimizer.rules.logical.local.IgnoreNullMetrics} planner rule, to
23+
* verify that the filters are being pushed to Lucene.
24+
*/
25+
public class IgnoreNullMetricsPhysicalPlannerTests extends LocalPhysicalPlanOptimizerTests {
26+
public IgnoreNullMetricsPhysicalPlannerTests(String name, Configuration config) {
27+
super(name, config);
28+
}
29+
30+
/**
31+
* This tests that we get the same end result plan with an explicit isNotNull and the implicit one added by the rule
32+
*/
33+
public void testSamePhysicalPlans() {
34+
assumeTrue("requires metrics command", EsqlCapabilities.Cap.METRICS_COMMAND.isEnabled());
35+
String testQuery = """
36+
TS k8s
37+
| STATS max(rate(network.total_bytes_in)) BY Bucket(@timestamp, 1 hour)
38+
| LIMIT 10
39+
""";
40+
PhysicalPlan actualPlan = plannerOptimizerTimeSeries.plan(testQuery);
41+
42+
String controlQuery = """
43+
TS k8s
44+
| WHERE network.total_bytes_in IS NOT NULL
45+
| STATS max(rate(network.total_bytes_in)) BY Bucket(@timestamp, 1 hour)
46+
| LIMIT 10
47+
""";
48+
PhysicalPlan expectedPlan = plannerOptimizerTimeSeries.plan(controlQuery);
49+
50+
assertEquals(NodeUtils.diffString(expectedPlan, actualPlan), expectedPlan, actualPlan);
51+
}
52+
53+
public void testPushdownOfSimpleCounterQuery() {
54+
assumeTrue("requires metrics command", EsqlCapabilities.Cap.METRICS_COMMAND.isEnabled());
55+
String query = """
56+
TS k8s
57+
| STATS max(rate(network.total_bytes_in)) BY Bucket(@timestamp, 1 hour)
58+
| LIMIT 10
59+
""";
60+
PhysicalPlan actualPlan = plannerOptimizerTimeSeries.plan(query);
61+
EsQueryExec queryExec = (EsQueryExec) actualPlan.collect(node -> node instanceof EsQueryExec).get(0);
62+
63+
QueryBuilder expected = unscore(existsQuery("network.total_bytes_in"));
64+
assertThat(queryExec.query().toString(), is(expected.toString()));
65+
}
66+
67+
public void testPushdownOfSimpleGagueQuery() {
68+
assumeTrue("requires metrics command", EsqlCapabilities.Cap.METRICS_COMMAND.isEnabled());
69+
String query = """
70+
TS k8s
71+
| STATS max(max_over_time(network.eth0.tx)) BY Bucket(@timestamp, 1 hour)
72+
| LIMIT 10
73+
""";
74+
PhysicalPlan actualPlan = plannerOptimizerTimeSeries.plan(query);
75+
EsQueryExec queryExec = (EsQueryExec) actualPlan.collect(node -> node instanceof EsQueryExec).get(0);
76+
77+
QueryBuilder expected = unscore(existsQuery("network.eth0.tx"));
78+
assertThat(queryExec.query().toString(), is(expected.toString()));
79+
}
80+
}

x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/optimizer/LocalPhysicalPlanOptimizerTests.java

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -174,6 +174,7 @@ public class LocalPhysicalPlanOptimizerTests extends MapperServiceTestCase {
174174
protected TestPlannerOptimizer plannerOptimizer;
175175
private TestPlannerOptimizer plannerOptimizerDateDateNanosUnionTypes;
176176
private Analyzer timeSeriesAnalyzer;
177+
protected TestPlannerOptimizer plannerOptimizerTimeSeries;
177178
private final Configuration config;
178179
private final SearchStats IS_SV_STATS = new TestSearchStats() {
179180
@Override
@@ -240,6 +241,7 @@ public void init() {
240241
),
241242
TEST_VERIFIER
242243
);
244+
plannerOptimizerTimeSeries = new TestPlannerOptimizer(config, timeSeriesAnalyzer);
243245
}
244246

245247
private Analyzer makeAnalyzer(String mappingFileName, EnrichResolution enrichResolution) {

x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/optimizer/TestPlannerOptimizer.java

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,7 @@
1111
import org.elasticsearch.xpack.esql.analysis.Analyzer;
1212
import org.elasticsearch.xpack.esql.core.expression.FoldContext;
1313
import org.elasticsearch.xpack.esql.parser.EsqlParser;
14+
import org.elasticsearch.xpack.esql.plan.logical.LogicalPlan;
1415
import org.elasticsearch.xpack.esql.plan.physical.EstimatesRowSize;
1516
import org.elasticsearch.xpack.esql.plan.physical.PhysicalPlan;
1617
import org.elasticsearch.xpack.esql.planner.PlannerUtils;
@@ -88,9 +89,9 @@ private PhysicalPlan optimizedPlan(PhysicalPlan plan, SearchStats searchStats, E
8889
}
8990

9091
private PhysicalPlan physicalPlan(String query, Analyzer analyzer) {
91-
var logical = logicalOptimizer.optimize(analyzer.analyze(parser.createStatement(query, EsqlTestUtils.TEST_CFG)));
92+
LogicalPlan logical = logicalOptimizer.optimize(analyzer.analyze(parser.createStatement(query, EsqlTestUtils.TEST_CFG)));
9293
// System.out.println("Logical\n" + logical);
93-
var physical = mapper.map(logical);
94+
PhysicalPlan physical = mapper.map(logical);
9495
return physical;
9596
}
9697
}

x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/optimizer/rules/logical/local/IgnoreNullMetricsTests.java

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -119,14 +119,15 @@ public void testSimple() {
119119
assumeTrue("requires metrics command", EsqlCapabilities.Cap.METRICS_COMMAND.isEnabled());
120120
LogicalPlan actual = localPlan("""
121121
TS test
122-
| STATS max(max_over_time(metric_1))
122+
| STATS max(max_over_time(metric_1)) BY BUCKET(@timestamp, 1 min)
123123
| LIMIT 10
124124
""");
125125
Limit limit = as(actual, Limit.class);
126126
Aggregate agg = as(limit.child(), Aggregate.class);
127127
// The optimizer expands the STATS out into two STATS steps
128128
Aggregate tsAgg = as(agg.child(), Aggregate.class);
129-
Filter filter = as(tsAgg.child(), Filter.class);
129+
Eval bucketEval = as(tsAgg.child(), Eval.class);
130+
Filter filter = as(bucketEval.child(), Filter.class);
130131
IsNotNull condition = as(filter.condition(), IsNotNull.class);
131132
FieldAttribute attribute = as(condition.field(), FieldAttribute.class);
132133
assertEquals("metric_1", attribute.fieldName().string());

0 commit comments

Comments
 (0)