- 
                Notifications
    You must be signed in to change notification settings 
- Fork 25.6k
Esql Fix bug with loading TS metadata #134648
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from 8 commits
f5f97f5
              fae7f67
              1d45d86
              32fb52b
              b9623ad
              efb5dd4
              ad3603d
              07d77b1
              3892be0
              c4fdf1c
              91fddd3
              19c15be
              784b289
              d636a9f
              8c11ae9
              File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change | 
|---|---|---|
|  | @@ -100,21 +100,33 @@ private static void walkMapping(String name, Object value, Map<String, EsField> | |
| properties = fromEs(content); | ||
| } | ||
| boolean docValues = boolSetting(content.get("doc_values"), esDataType.hasDocValues()); | ||
| boolean isDimension = boolSetting(content.get("time_series_dimension"), false); | ||
| boolean isMetric = content.containsKey("time_series_metric"); | ||
| if (isDimension && isMetric) { | ||
| throw new IllegalArgumentException("Field configured as both dimension and metric:" + value); | ||
|          | ||
| } | ||
| EsField.TimeSeriesFieldType tsType = EsField.TimeSeriesFieldType.NONE; | ||
| if (isDimension) { | ||
| tsType = EsField.TimeSeriesFieldType.DIMENSION; | ||
| } | ||
| if (isMetric) { | ||
| tsType = EsField.TimeSeriesFieldType.METRIC; | ||
| } | ||
| final EsField field; | ||
| if (esDataType == TEXT) { | ||
| field = new TextEsField(name, properties, docValues, false, EsField.TimeSeriesFieldType.NONE); | ||
| field = new TextEsField(name, properties, docValues, false, tsType); | ||
| } else if (esDataType == KEYWORD) { | ||
| int length = intSetting(content.get("ignore_above"), Short.MAX_VALUE); | ||
| boolean normalized = Strings.hasText(textSetting(content.get("normalizer"), null)); | ||
| field = new KeywordEsField(name, properties, docValues, length, normalized, false, EsField.TimeSeriesFieldType.NONE); | ||
| field = new KeywordEsField(name, properties, docValues, length, normalized, false, tsType); | ||
| } else if (esDataType == DATETIME) { | ||
| field = DateEsField.dateEsField(name, properties, docValues, EsField.TimeSeriesFieldType.NONE); | ||
| field = DateEsField.dateEsField(name, properties, docValues, tsType); | ||
| } else if (esDataType == UNSUPPORTED) { | ||
| String type = content.get("type").toString(); | ||
| field = new UnsupportedEsField(name, List.of(type), null, properties); | ||
| propagateUnsupportedType(name, type, properties); | ||
| } else { | ||
| field = new EsField(name, esDataType, properties, docValues, EsField.TimeSeriesFieldType.NONE); | ||
| field = new EsField(name, esDataType, properties, docValues, tsType); | ||
| } | ||
| mapping.put(name, field); | ||
| } else { | ||
|  | ||
| Original file line number | Diff line number | Diff line change | 
|---|---|---|
|  | @@ -610,4 +610,89 @@ public void testAvgOrSumOverTimeProfile() { | |
| } | ||
| } | ||
| } | ||
|  | ||
| public void testNullMetricsAreSkipped() { | ||
| Settings settings = Settings.builder().put("mode", "time_series").putList("routing_path", List.of("host", "cluster")).build(); | ||
| client().admin() | ||
| .indices() | ||
| .prepareCreate("sparse-hosts") | ||
| .setSettings(settings) | ||
| .setMapping( | ||
| "@timestamp", | ||
| "type=date", | ||
| "host", | ||
| "type=keyword,time_series_dimension=true", | ||
| "cluster", | ||
| "type=keyword,time_series_dimension=true", | ||
| "memory", | ||
| "type=long,time_series_metric=gauge", | ||
| "request_count", | ||
| "type=integer,time_series_metric=counter" | ||
| ) | ||
| .get(); | ||
| List<Doc> sparseDocs = new ArrayList<>(); | ||
| // generate 100 docs, 50 will have a null metric | ||
| // TODO: this is all copied from populateIndex(), refactor it sensibly. | ||
| There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I really don't want to refactor this test today, and copying a few dozen lines of test code isn't that big a sin. I'm going to leave this TODO for someone to take on a slow Friday. | ||
| Map<String, String> hostToClusters = new HashMap<>(); | ||
| for (int i = 0; i < 5; i++) { | ||
| hostToClusters.put("p" + i, randomFrom("qa", "prod")); | ||
| } | ||
| long timestamp = DEFAULT_DATE_TIME_FORMATTER.parseMillis("2024-04-15T00:00:00Z"); | ||
| int numDocs = 100; | ||
| Map<String, Integer> requestCounts = new HashMap<>(); | ||
| for (int i = 0; i < numDocs; i++) { | ||
| String host = randomFrom(hostToClusters.keySet()); | ||
| timestamp += between(1, 10) * 1000L; | ||
| int requestCount = requestCounts.compute(host, (k, curr) -> { | ||
| if (curr == null) { | ||
| return randomIntBetween(0, 10); | ||
| } else { | ||
| return curr + randomIntBetween(1, 10); | ||
| } | ||
| }); | ||
| int cpu = randomIntBetween(0, 100); | ||
| ByteSizeValue memory = ByteSizeValue.ofBytes(randomIntBetween(1024, 1024 * 1024)); | ||
| sparseDocs.add(new Doc(host, hostToClusters.get(host), timestamp, requestCount, cpu, memory)); | ||
| } | ||
|  | ||
| Randomness.shuffle(sparseDocs); | ||
| for (int i = 0; i < numDocs; i++) { | ||
| Doc doc = sparseDocs.get(i); | ||
| if (i % 2 == 0) { | ||
| client().prepareIndex("sparse-hosts") | ||
| .setSource("@timestamp", doc.timestamp, "host", doc.host, "cluster", doc.cluster, "request_count", doc.requestCount) | ||
| .get(); | ||
| } else { | ||
| client().prepareIndex("sparse-hosts") | ||
| .setSource("@timestamp", doc.timestamp, "host", doc.host, "cluster", doc.cluster, "memory", doc.memory.getBytes()) | ||
| .get(); | ||
| } | ||
| } | ||
| client().admin().indices().prepareRefresh("sparse-hosts").get(); | ||
| // Control test | ||
| /* | ||
| try (EsqlQueryResponse resp = run(""" | ||
| There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. leftover? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. probably worth keeping, I just forgot to uncomment it after running the debugger. | ||
| TS sparse-hosts | ||
| | WHERE request_count IS NOT NULL | ||
| | STATS sum(rate(request_count)) BY cluster, host | ||
| """)) { | ||
| assertEquals("Control failed, data loading is broken", 50, resp.documentsFound()); | ||
| } | ||
|  | ||
| */ | ||
|  | ||
| try (EsqlQueryResponse resp = run(""" | ||
| TS sparse-hosts | ||
| | STATS sum(max_over_time(memory)) BY cluster, host | ||
| """)) { | ||
| assertEquals("Did not filter nulls on gauge type", 50, resp.documentsFound()); | ||
| } | ||
|  | ||
| try (EsqlQueryResponse resp = run(""" | ||
| TS sparse-hosts | ||
| | STATS sum(rate(request_count)) BY cluster, host | ||
| """)) { | ||
| assertEquals("Did not filter nulls on counter type", 50, resp.documentsFound()); | ||
| } | ||
| } | ||
| } | ||
| Original file line number | Diff line number | Diff line change | 
|---|---|---|
|  | @@ -225,7 +225,7 @@ private static EsField createField( | |
| List<IndexFieldCapabilities> rest = fcs.subList(1, fcs.size()); | ||
| DataType type = EsqlDataTypeRegistry.INSTANCE.fromEs(first.type(), first.metricType()); | ||
| boolean aggregatable = first.isAggregatable(); | ||
| EsField.TimeSeriesFieldType timeSeriesFieldType = EsField.TimeSeriesFieldType.UNKNOWN; | ||
| EsField.TimeSeriesFieldType timeSeriesFieldType = EsField.TimeSeriesFieldType.fromIndexFieldCapabilities(first); | ||
| There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This is the entire bug fix. Everything else in this PR is tests and test scaffolding. | ||
| if (rest.isEmpty() == false) { | ||
| for (IndexFieldCapabilities fc : rest) { | ||
| if (first.metricType() != fc.metricType()) { | ||
|  | ||
| Original file line number | Diff line number | Diff line change | 
|---|---|---|
| @@ -0,0 +1,80 @@ | ||
| /* | ||
| * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one | ||
| * or more contributor license agreements. Licensed under the Elastic License | ||
| * 2.0; you may not use this file except in compliance with the Elastic License | ||
| * 2.0. | ||
| */ | ||
|  | ||
| package org.elasticsearch.xpack.esql.optimizer; | ||
|  | ||
| import org.elasticsearch.index.query.QueryBuilder; | ||
| import org.elasticsearch.xpack.esql.action.EsqlCapabilities; | ||
| import org.elasticsearch.xpack.esql.core.tree.NodeUtils; | ||
| import org.elasticsearch.xpack.esql.plan.physical.EsQueryExec; | ||
| import org.elasticsearch.xpack.esql.plan.physical.PhysicalPlan; | ||
| import org.elasticsearch.xpack.esql.session.Configuration; | ||
|  | ||
| import static org.elasticsearch.index.query.QueryBuilders.existsQuery; | ||
| import static org.elasticsearch.xpack.esql.core.querydsl.query.Query.unscore; | ||
| import static org.hamcrest.Matchers.is; | ||
|  | ||
| /** | ||
| * Tests for the {@link org.elasticsearch.xpack.esql.optimizer.rules.logical.local.IgnoreNullMetrics} planner rule, to | ||
| * verify that the filters are being pushed to Lucene. | ||
| */ | ||
| public class IgnoreNullMetricsPhysicalPlannerTests extends LocalPhysicalPlanOptimizerTests { | ||
| There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Inheriting from  | ||
| public IgnoreNullMetricsPhysicalPlannerTests(String name, Configuration config) { | ||
| super(name, config); | ||
| } | ||
|  | ||
| /** | ||
| * This tests that we get the same end result plan with an explicit isNotNull and the implicit one added by the rule | ||
| */ | ||
| public void testSamePhysicalPlans() { | ||
| assumeTrue("requires metrics command", EsqlCapabilities.Cap.METRICS_COMMAND.isEnabled()); | ||
| String testQuery = """ | ||
| TS k8s | ||
| | STATS max(rate(network.total_bytes_in)) BY Bucket(@timestamp, 1 hour) | ||
| | LIMIT 10 | ||
| """; | ||
| PhysicalPlan actualPlan = plannerOptimizerTimeSeries.plan(testQuery); | ||
|  | ||
| String controlQuery = """ | ||
| TS k8s | ||
| | WHERE network.total_bytes_in IS NOT NULL | ||
| | STATS max(rate(network.total_bytes_in)) BY Bucket(@timestamp, 1 hour) | ||
| | LIMIT 10 | ||
| """; | ||
| PhysicalPlan expectedPlan = plannerOptimizerTimeSeries.plan(controlQuery); | ||
|  | ||
| assertEquals(NodeUtils.diffString(expectedPlan, actualPlan), expectedPlan, actualPlan); | ||
| } | ||
|  | ||
| public void testPushdownOfSimpleCounterQuery() { | ||
| assumeTrue("requires metrics command", EsqlCapabilities.Cap.METRICS_COMMAND.isEnabled()); | ||
| String query = """ | ||
| TS k8s | ||
| | STATS max(rate(network.total_bytes_in)) BY Bucket(@timestamp, 1 hour) | ||
| | LIMIT 10 | ||
| """; | ||
| PhysicalPlan actualPlan = plannerOptimizerTimeSeries.plan(query); | ||
| EsQueryExec queryExec = (EsQueryExec) actualPlan.collect(node -> node instanceof EsQueryExec).get(0); | ||
|  | ||
| QueryBuilder expected = unscore(existsQuery("network.total_bytes_in")); | ||
| assertThat(queryExec.query().toString(), is(expected.toString())); | ||
| } | ||
|  | ||
| public void testPushdownOfSimpleGagueQuery() { | ||
| assumeTrue("requires metrics command", EsqlCapabilities.Cap.METRICS_COMMAND.isEnabled()); | ||
| String query = """ | ||
| TS k8s | ||
| | STATS max(max_over_time(network.eth0.tx)) BY Bucket(@timestamp, 1 hour) | ||
| | LIMIT 10 | ||
| """; | ||
| PhysicalPlan actualPlan = plannerOptimizerTimeSeries.plan(query); | ||
| EsQueryExec queryExec = (EsQueryExec) actualPlan.collect(node -> node instanceof EsQueryExec).get(0); | ||
|  | ||
| QueryBuilder expected = unscore(existsQuery("network.eth0.tx")); | ||
| assertThat(queryExec.query().toString(), is(expected.toString())); | ||
| } | ||
| } | ||
| Original file line number | Diff line number | Diff line change | 
|---|---|---|
|  | @@ -119,14 +119,15 @@ public void testSimple() { | |
| assumeTrue("requires metrics command", EsqlCapabilities.Cap.METRICS_COMMAND.isEnabled()); | ||
| LogicalPlan actual = localPlan(""" | ||
| TS test | ||
| | STATS max(max_over_time(metric_1)) | ||
| | STATS max(max_over_time(metric_1)) BY BUCKET(@timestamp, 1 min) | ||
| There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Makes sure we're testing this rule and not  | ||
| | LIMIT 10 | ||
| """); | ||
| Limit limit = as(actual, Limit.class); | ||
| Aggregate agg = as(limit.child(), Aggregate.class); | ||
| // The optimizer expands the STATS out into two STATS steps | ||
| Aggregate tsAgg = as(agg.child(), Aggregate.class); | ||
| Filter filter = as(tsAgg.child(), Filter.class); | ||
| Eval bucketEval = as(tsAgg.child(), Eval.class); | ||
| Filter filter = as(bucketEval.child(), Filter.class); | ||
| IsNotNull condition = as(filter.condition(), IsNotNull.class); | ||
| FieldAttribute attribute = as(condition.field(), FieldAttribute.class); | ||
| assertEquals("metric_1", attribute.fieldName().string()); | ||
|  | ||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
These changes are necessary to enable running planner tests that load dimension and metric information from our json schema files. My earlier tests for this feature hard coded a test schema rather than load from json, so I didn't need this before, but we'll want it for many tests going forward.