diff --git a/test/framework/src/main/java/org/elasticsearch/datageneration/FieldType.java b/test/framework/src/main/java/org/elasticsearch/datageneration/FieldType.java index eab2149019204..4631b1a94bb0f 100644 --- a/test/framework/src/main/java/org/elasticsearch/datageneration/FieldType.java +++ b/test/framework/src/main/java/org/elasticsearch/datageneration/FieldType.java @@ -50,6 +50,7 @@ public enum FieldType { TEXT("text"), IP("ip"), CONSTANT_KEYWORD("constant_keyword"), + PASSTHROUGH("passthrough"), // For now this field type does not have default generators. WILDCARD("wildcard"); private final String name; @@ -78,6 +79,7 @@ public FieldDataGenerator generator(String fieldName, DataSource dataSource) { case IP -> new IpFieldDataGenerator(dataSource); case CONSTANT_KEYWORD -> new ConstantKeywordFieldDataGenerator(); case WILDCARD -> new WildcardFieldDataGenerator(dataSource); + case PASSTHROUGH -> throw new IllegalArgumentException("Passthrough field type does not have a default generator"); }; } @@ -101,7 +103,8 @@ public static FieldType tryParse(String name) { case "ip" -> FieldType.IP; case "constant_keyword" -> FieldType.CONSTANT_KEYWORD; case "wildcard" -> FieldType.WILDCARD; - default -> null; + case "passthrough" -> FieldType.PASSTHROUGH; + default -> throw new IllegalArgumentException("Unknown field type: " + name); }; } diff --git a/test/framework/src/main/java/org/elasticsearch/datageneration/MappingGenerator.java b/test/framework/src/main/java/org/elasticsearch/datageneration/MappingGenerator.java index 795302e0972c7..0d66e7bd2062b 100644 --- a/test/framework/src/main/java/org/elasticsearch/datageneration/MappingGenerator.java +++ b/test/framework/src/main/java/org/elasticsearch/datageneration/MappingGenerator.java @@ -64,10 +64,8 @@ public Mapping generate(Template template) { rawMapping.put("_doc", topLevelMappingParameters); - if (specification.fullyDynamicMapping()) { - // Has to be "true" for fully dynamic mapping + if (specification.fullyDynamicMapping() == false) { topLevelMappingParameters.remove("dynamic"); - return new Mapping(rawMapping, lookup); } diff --git a/test/framework/src/main/java/org/elasticsearch/datageneration/datasource/DataSource.java b/test/framework/src/main/java/org/elasticsearch/datageneration/datasource/DataSource.java index 947a38839b259..74ebbc167f016 100644 --- a/test/framework/src/main/java/org/elasticsearch/datageneration/datasource/DataSource.java +++ b/test/framework/src/main/java/org/elasticsearch/datageneration/datasource/DataSource.java @@ -46,7 +46,12 @@ public T get(DataSourceRequest request) { return response; } } - - throw new IllegalStateException("Request is not supported by data source"); + throw new IllegalStateException( + "Request is not supported by data source. Request: " + + request.toString() + + "\n" + + "Available handlers: " + + handlers.stream().map(Object::getClass).map(Class::getName).toList().toString() + ); } } diff --git a/test/framework/src/main/java/org/elasticsearch/datageneration/datasource/DefaultMappingParametersHandler.java b/test/framework/src/main/java/org/elasticsearch/datageneration/datasource/DefaultMappingParametersHandler.java index 2e234f8aec41c..974283362bc93 100644 --- a/test/framework/src/main/java/org/elasticsearch/datageneration/datasource/DefaultMappingParametersHandler.java +++ b/test/framework/src/main/java/org/elasticsearch/datageneration/datasource/DefaultMappingParametersHandler.java @@ -48,6 +48,7 @@ public DataSourceResponse.LeafMappingParametersGenerator handle(DataSourceReques case IP -> ipMapping(); case CONSTANT_KEYWORD -> constantKeywordMapping(); case WILDCARD -> wildcardMapping(); + case PASSTHROUGH -> throw new IllegalArgumentException("Unsupported field type: " + fieldType); }); } diff --git a/test/framework/src/main/java/org/elasticsearch/datageneration/datasource/DefaultObjectGenerationHandler.java b/test/framework/src/main/java/org/elasticsearch/datageneration/datasource/DefaultObjectGenerationHandler.java index bf660779186ca..10a3037b015ad 100644 --- a/test/framework/src/main/java/org/elasticsearch/datageneration/datasource/DefaultObjectGenerationHandler.java +++ b/test/framework/src/main/java/org/elasticsearch/datageneration/datasource/DefaultObjectGenerationHandler.java @@ -12,8 +12,10 @@ import org.elasticsearch.datageneration.FieldType; import org.elasticsearch.test.ESTestCase; +import java.util.Arrays; import java.util.Optional; import java.util.Set; +import java.util.stream.Collectors; import static org.elasticsearch.test.ESTestCase.randomDouble; import static org.elasticsearch.test.ESTestCase.randomIntBetween; @@ -66,13 +68,17 @@ public String generateFieldName() { // UNSIGNED_LONG is excluded because it is mapped as long // and values larger than long fail to parse. - private static final Set EXCLUDED_FROM_DYNAMIC_MAPPING = Set.of(FieldType.UNSIGNED_LONG); + private static final Set EXCLUDED_FROM_DYNAMIC_MAPPING = Set.of(FieldType.UNSIGNED_LONG, FieldType.PASSTHROUGH); + private static final Set ALLOWED_FIELD_TYPES = Arrays.stream(FieldType.values()) + .filter(fieldType -> EXCLUDED_FROM_DYNAMIC_MAPPING.contains(fieldType) == false) + .collect(Collectors.toSet()); @Override public DataSourceResponse.FieldTypeGenerator handle(DataSourceRequest.FieldTypeGenerator request) { - return new DataSourceResponse.FieldTypeGenerator( - () -> new DataSourceResponse.FieldTypeGenerator.FieldTypeInfo(ESTestCase.randomFrom(FieldType.values()).toString()) - ); + return new DataSourceResponse.FieldTypeGenerator(() -> { + var fieldType = ESTestCase.randomFrom(ALLOWED_FIELD_TYPES); + return new DataSourceResponse.FieldTypeGenerator.FieldTypeInfo(fieldType.toString()); + }); } @Override diff --git a/x-pack/plugin/esql/build.gradle b/x-pack/plugin/esql/build.gradle index 2cae46d169793..00a247f854e50 100644 --- a/x-pack/plugin/esql/build.gradle +++ b/x-pack/plugin/esql/build.gradle @@ -35,6 +35,8 @@ dependencies { compileOnly project(':modules:lang-painless:spi') compileOnly project(xpackModule('esql-core')) compileOnly project(xpackModule('ml')) + compileOnly project(path: xpackModule('mapper-aggregate-metric')) + compileOnly project(path: xpackModule('downsample')) implementation project(xpackModule('kql')) implementation project('compute') implementation project('compute:ann') diff --git a/x-pack/plugin/esql/src/internalClusterTest/java/org/elasticsearch/xpack/esql/action/RandomizedTimeSeriesIT.java b/x-pack/plugin/esql/src/internalClusterTest/java/org/elasticsearch/xpack/esql/action/RandomizedTimeSeriesIT.java new file mode 100644 index 0000000000000..25d43d218e116 --- /dev/null +++ b/x-pack/plugin/esql/src/internalClusterTest/java/org/elasticsearch/xpack/esql/action/RandomizedTimeSeriesIT.java @@ -0,0 +1,264 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License + * 2.0; you may not use this file except in compliance with the Elastic License + * 2.0. + */ + +package org.elasticsearch.xpack.esql.action; + +import org.elasticsearch.Build; +import org.elasticsearch.action.DocWriteRequest; +import org.elasticsearch.action.admin.indices.template.put.TransportPutComposableIndexTemplateAction; +import org.elasticsearch.cluster.metadata.ComposableIndexTemplate; +import org.elasticsearch.common.Strings; +import org.elasticsearch.common.bytes.BytesReference; +import org.elasticsearch.common.compress.CompressedXContent; +import org.elasticsearch.common.settings.Settings; +import org.elasticsearch.common.xcontent.XContentHelper; +import org.elasticsearch.core.Nullable; +import org.elasticsearch.core.Tuple; +import org.elasticsearch.datastreams.DataStreamsPlugin; +import org.elasticsearch.index.IndexMode; +import org.elasticsearch.index.IndexSettings; +import org.elasticsearch.plugins.Plugin; +import org.elasticsearch.test.ESTestCase; +import org.elasticsearch.xcontent.XContentBuilder; +import org.elasticsearch.xcontent.XContentFactory; +import org.elasticsearch.xcontent.XContentType; +import org.elasticsearch.xpack.aggregatemetric.AggregateMetricMapperPlugin; +import org.elasticsearch.xpack.core.LocalStateCompositeXPackPlugin; +import org.elasticsearch.xpack.esql.plugin.EsqlPlugin; +import org.junit.Before; + +import java.io.IOException; +import java.time.Instant; +import java.util.ArrayList; +import java.util.Collection; +import java.util.HashMap; +import java.util.List; +import java.util.Locale; +import java.util.Map; +import java.util.Objects; +import java.util.stream.Collectors; + +import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertAcked; +import static org.hamcrest.Matchers.closeTo; +import static org.hamcrest.Matchers.equalTo; + +public class RandomizedTimeSeriesIT extends AbstractEsqlIntegTestCase { + + private static final Long NUM_DOCS = 2500L; + private static final String DATASTREAM_NAME = "tsit_ds"; + private List documents = null; + private TSDataGenerationHelper dataGenerationHelper; + + List> consumeRows(EsqlQueryResponse resp) { + List> rows = new ArrayList<>(); + resp.rows().forEach(rowIter -> { + List row = new ArrayList<>(); + rowIter.forEach(row::add); + rows.add(row); + }); + return rows; + } + + Map, List>> groupedRows( + List docs, + List groupingAttributes, + int secondsInWindow + ) { + Map, List>> groupedMap = new HashMap<>(); + for (XContentBuilder doc : docs) { + Map docMap = XContentHelper.convertToMap(BytesReference.bytes(doc), false, XContentType.JSON).v2(); + @SuppressWarnings("unchecked") + List groupingPairs = groupingAttributes.stream() + .map( + attr -> Tuple.tuple( + attr, + ((Map) docMap.getOrDefault("attributes", Map.of())).getOrDefault(attr, "").toString() + ) + ) + .filter(val -> val.v2().isEmpty() == false) // Filter out empty values + .map(tup -> tup.v1() + ":" + tup.v2()) + .toList(); + long timeBucketStart = windowStart(docMap.get("@timestamp"), secondsInWindow); + var keyList = new ArrayList<>(groupingPairs); + keyList.add(Long.toString(timeBucketStart)); + groupedMap.computeIfAbsent(keyList, k -> new ArrayList<>()).add(docMap); + } + return groupedMap; + } + + static Long windowStart(Object timestampCell, int secondsInWindow) { + // This calculation looks a little weird, but it simply performs an integer division that + // throws away the remainder of the division by secondsInWindow. It rounds down + // the timestamp to the nearest multiple of secondsInWindow. + var timestampSeconds = Instant.parse((String) timestampCell).toEpochMilli() / 1000; + return (timestampSeconds / secondsInWindow) * secondsInWindow; + } + + enum Agg { + MAX, + MIN, + AVG + } + + static List valuesInWindow(List> pointsInGroup, String metricName) { + @SuppressWarnings("unchecked") + var values = pointsInGroup.stream() + .map(doc -> ((Map) doc.get("metrics")).get(metricName)) + .filter(Objects::nonNull) + .collect(Collectors.toList()); + return values; + } + + static Double aggregateValuesInWindow(List values, Agg agg) { + if (values.isEmpty()) { + throw new IllegalArgumentException("No values to aggregate for " + agg + " operation"); + } + return switch (agg) { + case MAX -> Double.valueOf(values.stream().max(Integer::compareTo).orElseThrow()); + case MIN -> Double.valueOf(values.stream().min(Integer::compareTo).orElseThrow()); + case AVG -> values.stream().mapToDouble(Integer::doubleValue).average().orElseThrow(); + }; + } + + static List getRowKey(List row, List groupingAttributes) { + List rowKey = new ArrayList<>(); + for (int i = 0; i < groupingAttributes.size(); i++) { + Object value = row.get(i + 4); // Skip the first four columns + if (value != null) { + rowKey.add(groupingAttributes.get(i) + ":" + value); + } + } + rowKey.add(Long.toString(Instant.parse((String) row.get(3)).toEpochMilli() / 1000)); + return rowKey; + } + + @Override + public EsqlQueryResponse run(EsqlQueryRequest request) { + assumeTrue("time series available in snapshot builds only", Build.current().isSnapshot()); + return super.run(request); + } + + @Override + protected Collection> nodePlugins() { + return List.of( + DataStreamsPlugin.class, + LocalStateCompositeXPackPlugin.class, + // Downsample.class, // TODO(pabloem): What are these + AggregateMetricMapperPlugin.class, + EsqlPlugin.class + ); + } + + void putTSDBIndexTemplate(List patterns, @Nullable String mappingString) throws IOException { + Settings.Builder settingsBuilder = Settings.builder(); + // Ensure it will be a TSDB data stream + settingsBuilder.put(IndexSettings.MODE.getKey(), IndexMode.TIME_SERIES); + CompressedXContent mappings = mappingString == null ? null : CompressedXContent.fromJSON(mappingString); + TransportPutComposableIndexTemplateAction.Request request = new TransportPutComposableIndexTemplateAction.Request( + RandomizedTimeSeriesIT.DATASTREAM_NAME + ); + request.indexTemplate( + ComposableIndexTemplate.builder() + .indexPatterns(patterns) + .template(org.elasticsearch.cluster.metadata.Template.builder().settings(settingsBuilder).mappings(mappings)) + .metadata(null) + .dataStreamTemplate(new ComposableIndexTemplate.DataStreamTemplate()) + .build() + ); + assertAcked(client().execute(TransportPutComposableIndexTemplateAction.TYPE, request)); + } + + @Before + public void populateIndex() throws IOException { + dataGenerationHelper = new TSDataGenerationHelper(NUM_DOCS); + final XContentBuilder builder = XContentFactory.jsonBuilder(); + builder.map(dataGenerationHelper.mapping.raw()); + final String jsonMappings = Strings.toString(builder); + + putTSDBIndexTemplate(List.of(DATASTREAM_NAME + "*"), jsonMappings); + // Now we can push data into the data stream. + for (int i = 0; i < NUM_DOCS; i++) { + var document = dataGenerationHelper.generateDocument(Map.of()); + if (documents == null) { + documents = new ArrayList<>(); + } + documents.add(document); + var indexRequest = client().prepareIndex(DATASTREAM_NAME).setOpType(DocWriteRequest.OpType.CREATE).setSource(document); + indexRequest.setRefreshPolicy(org.elasticsearch.action.support.WriteRequest.RefreshPolicy.IMMEDIATE); + indexRequest.get(); + } + } + + /** + * This test validates Gauge metrics aggregation with grouping by time bucket and a subset of dimensions. + * The subset of dimensions is a random subset of the dimensions present in the data. + * The test checks that the max, min, and avg values of the gauge metric - and calculates + * the same values from the documents in the group. + */ + public void testGroupBySubset() { + var dimensions = ESTestCase.randomNonEmptySubsetOf(dataGenerationHelper.attributesForMetrics); + var dimensionsStr = dimensions.stream().map(d -> "attributes." + d).collect(Collectors.joining(", ")); + try (EsqlQueryResponse resp = run(String.format(Locale.ROOT, """ + TS %s + | STATS max(max_over_time(metrics.gauge_hdd.bytes.used)), + min(min_over_time(metrics.gauge_hdd.bytes.used)), + avg(avg_over_time(metrics.gauge_hdd.bytes.used)) + BY tbucket=bucket(@timestamp, 1 minute), %s + | SORT tbucket + | LIMIT 1000""", DATASTREAM_NAME, dimensionsStr))) { + var groups = groupedRows(documents, dimensions, 60); + List> rows = consumeRows(resp); + for (List row : rows) { + var rowKey = getRowKey(row, dimensions); + var docValues = valuesInWindow(groups.get(rowKey), "gauge_hdd.bytes.used"); + // Max of int is always int, so we can safely round the result. + assertThat(row.getFirst(), equalTo(Math.round(aggregateValuesInWindow(docValues, Agg.MAX)))); + assertThat(row.get(1), equalTo(Math.round(aggregateValuesInWindow(docValues, Agg.MIN)))); + // We check the expected vs ES-calculated average. We divide them to normalize the error + // and allow for a 20% error margin. + Double esAvg = (Double) row.get(2); + Double expectedAvg = aggregateValuesInWindow(docValues, Agg.AVG); + var ratio = esAvg / expectedAvg; + assertThat(ratio, closeTo(1, 0.2)); + + } + } + } + + /** + * This test validates Gauge metrics aggregation with grouping by time bucket only. + * The test checks that the max, min, and avg values of the gauge metric - and calculates + * the same values from the documents in the group. Because there is no grouping by dimensions, + * there is only one metric group per time bucket. + */ + public void testGroupByNothing() { + try (EsqlQueryResponse resp = run(String.format(Locale.ROOT, """ + TS %s + | STATS + max(max_over_time(metrics.gauge_hdd.bytes.used)), + min(min_over_time(metrics.gauge_hdd.bytes.used)), + avg(avg_over_time(metrics.gauge_hdd.bytes.used)) BY tbucket=bucket(@timestamp, 1 minute) + | SORT tbucket + | LIMIT 1000""", DATASTREAM_NAME))) { + List> rows = consumeRows(resp); + var groups = groupedRows(documents, List.of(), 60); + for (List row : rows) { + var windowStart = windowStart(row.get(3), 60); + var docValues = valuesInWindow(groups.get(List.of(Long.toString(windowStart))), "gauge_hdd.bytes.used"); + // Min and Max of int are always int, so we can safely round the result. + assertThat(row.getFirst(), equalTo(Math.round(aggregateValuesInWindow(docValues, Agg.MAX)))); + assertThat(row.get(1), equalTo(Math.round(aggregateValuesInWindow(docValues, Agg.MIN)))); + // We check the expected vs ES-calculated average. We divide them to normalize the error + // and allow for a 20% error margin. + Double esAvg = (Double) row.get(2); + Double expectedAvg = aggregateValuesInWindow(docValues, Agg.AVG); + var ratio = esAvg / expectedAvg; + assertThat(ratio, closeTo(1, 0.2)); + } + } + } +} diff --git a/x-pack/plugin/esql/src/internalClusterTest/java/org/elasticsearch/xpack/esql/action/TSDataGenerationHelper.java b/x-pack/plugin/esql/src/internalClusterTest/java/org/elasticsearch/xpack/esql/action/TSDataGenerationHelper.java new file mode 100644 index 0000000000000..8961419f6b6ba --- /dev/null +++ b/x-pack/plugin/esql/src/internalClusterTest/java/org/elasticsearch/xpack/esql/action/TSDataGenerationHelper.java @@ -0,0 +1,131 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License + * 2.0; you may not use this file except in compliance with the Elastic License + * 2.0. + */ + +package org.elasticsearch.xpack.esql.action; + +import org.elasticsearch.common.Randomness; +import org.elasticsearch.common.network.NetworkAddress; +import org.elasticsearch.core.Tuple; +import org.elasticsearch.datageneration.DataGeneratorSpecification; +import org.elasticsearch.datageneration.DocumentGenerator; +import org.elasticsearch.datageneration.FieldType; +import org.elasticsearch.datageneration.Mapping; +import org.elasticsearch.datageneration.MappingGenerator; +import org.elasticsearch.datageneration.Template; +import org.elasticsearch.datageneration.TemplateGenerator; +import org.elasticsearch.datageneration.fields.PredefinedField; +import org.elasticsearch.test.ESTestCase; +import org.elasticsearch.xcontent.XContentBuilder; +import org.elasticsearch.xcontent.XContentFactory; + +import java.io.IOException; +import java.time.Instant; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.stream.Collectors; +import java.util.stream.IntStream; + +class TSDataGenerationHelper { + + private static Object randomDimensionValue(String dimensionName) { + // We use dimensionName to determine the type of the value. + var isNumeric = dimensionName.hashCode() % 5 == 0; + var isIP = dimensionName.hashCode() % 5 == 1; + if (isNumeric) { + // Numeric values are sometimes passed as integers and sometimes as strings. + return ESTestCase.randomIntBetween(1, 1000); + } else if (isIP) { + // TODO: Make sure the schema ingests this as an IP address. + return NetworkAddress.format(ESTestCase.randomIp(ESTestCase.randomBoolean())); + } else { + return ESTestCase.randomAlphaOfLengthBetween(1, 20); + } + } + + TSDataGenerationHelper(long numDocs) { + // Metrics coming into our system have a pre-set group of attributes. + // Making a list-to-set-to-list to ensure uniqueness. + this.numDocs = numDocs; + attributesForMetrics = List.copyOf(Set.copyOf(ESTestCase.randomList(1, 300, () -> ESTestCase.randomAlphaOfLengthBetween(2, 30)))); + numTimeSeries = ESTestCase.randomIntBetween(10, (int) Math.sqrt(numDocs)); + // allTimeSeries contains the list of dimension-values for each time series. + List>> allTimeSeries = IntStream.range(0, numTimeSeries).mapToObj(tsIdx -> { + List dimensionsInMetric = ESTestCase.randomNonEmptySubsetOf(attributesForMetrics); + // TODO: How do we handle the case when there are no dimensions? (i.e. regular randomSubsetof(...) + return dimensionsInMetric.stream().map(attr -> new Tuple<>(attr, randomDimensionValue(attr))).collect(Collectors.toList()); + }).toList(); + + spec = DataGeneratorSpecification.builder() + .withMaxFieldCountPerLevel(0) + .withPredefinedFields( + List.of( + new PredefinedField.WithGenerator( + "@timestamp", + FieldType.DATE, + Map.of("type", "date"), + fieldMapping -> ESTestCase.randomInstantBetween(Instant.now().minusSeconds(2 * 60 * 60), Instant.now()) + ), + new PredefinedField.WithGenerator( + "attributes", + FieldType.PASSTHROUGH, + Map.of("type", "passthrough", "time_series_dimension", true, "dynamic", true, "priority", 1), + (ignored) -> { + var tsDimensions = ESTestCase.randomFrom(allTimeSeries); + return tsDimensions.stream().collect(Collectors.toMap(Tuple::v1, Tuple::v2)); + } + ), + new PredefinedField.WithGenerator( + "metrics", + FieldType.PASSTHROUGH, + Map.of("type", "passthrough", "dynamic", true, "priority", 10), + (ignored) -> Map.of("gauge_hdd.bytes.used", Randomness.get().nextLong(0, 1000000000L)) + ) + ) + ) + .build(); + + documentGenerator = new DocumentGenerator(spec); + template = new TemplateGenerator(spec).generate(); + mapping = new MappingGenerator(spec).generate(template); + var doc = mapping.raw().get("_doc"); + @SuppressWarnings("unchecked") + Map docMap = ((Map) doc); + // Add dynamic templates to the mapping + docMap.put( + "dynamic_templates", + List.of( + Map.of( + "counter_long", + Map.of("path_match", "metrics.counter_*", "mapping", Map.of("type", "long", "time_series_metric", "counter")) + ), + Map.of( + "gauge_long", + Map.of("path_match", "metrics.gauge_*", "mapping", Map.of("type", "long", "time_series_metric", "gauge")) + ) + // TODO: Add double and other metric types + ) + ); + } + + final DataGeneratorSpecification spec; + final DocumentGenerator documentGenerator; + final Template template; + final Mapping mapping; + final int numTimeSeries; + final long numDocs; + final List attributesForMetrics; + + XContentBuilder generateDocument(Map additionalFields) throws IOException { + var doc = XContentFactory.jsonBuilder(); + var generated = documentGenerator.generate(template, mapping); + generated.putAll(additionalFields); + + doc.map(generated); + return doc; + } +}