From 613bda84501238f980792bccca9742cea262a204 Mon Sep 17 00:00:00 2001 From: Pablo Date: Thu, 7 Aug 2025 16:57:14 -0700 Subject: [PATCH 1/9] First test case in prototype messy test file --- .../datageneration/FieldType.java | 2 + .../datageneration/MappingGenerator.java | 2 +- .../datageneration/datasource/DataSource.java | 4 +- .../DefaultMappingParametersHandler.java | 1 + .../DefaultObjectGenerationHandler.java | 14 +- x-pack/plugin/esql/build.gradle | 2 + .../xpack/esql/action/GenerativeTSIT.java | 303 ++++++++++++++++++ 7 files changed, 323 insertions(+), 5 deletions(-) create mode 100644 x-pack/plugin/esql/src/internalClusterTest/java/org/elasticsearch/xpack/esql/action/GenerativeTSIT.java diff --git a/test/framework/src/main/java/org/elasticsearch/datageneration/FieldType.java b/test/framework/src/main/java/org/elasticsearch/datageneration/FieldType.java index eab2149019204..38c7e8bd177fb 100644 --- a/test/framework/src/main/java/org/elasticsearch/datageneration/FieldType.java +++ b/test/framework/src/main/java/org/elasticsearch/datageneration/FieldType.java @@ -50,6 +50,7 @@ public enum FieldType { TEXT("text"), IP("ip"), CONSTANT_KEYWORD("constant_keyword"), + PASSTHROUGH("passthrough"), // For now this field type does not have default generators. WILDCARD("wildcard"); private final String name; @@ -78,6 +79,7 @@ public FieldDataGenerator generator(String fieldName, DataSource dataSource) { case IP -> new IpFieldDataGenerator(dataSource); case CONSTANT_KEYWORD -> new ConstantKeywordFieldDataGenerator(); case WILDCARD -> new WildcardFieldDataGenerator(dataSource); + default -> null; }; } diff --git a/test/framework/src/main/java/org/elasticsearch/datageneration/MappingGenerator.java b/test/framework/src/main/java/org/elasticsearch/datageneration/MappingGenerator.java index 795302e0972c7..30cc52c4334b5 100644 --- a/test/framework/src/main/java/org/elasticsearch/datageneration/MappingGenerator.java +++ b/test/framework/src/main/java/org/elasticsearch/datageneration/MappingGenerator.java @@ -64,7 +64,7 @@ public Mapping generate(Template template) { rawMapping.put("_doc", topLevelMappingParameters); - if (specification.fullyDynamicMapping()) { + if (specification.fullyDynamicMapping() == false) { // Has to be "true" for fully dynamic mapping topLevelMappingParameters.remove("dynamic"); diff --git a/test/framework/src/main/java/org/elasticsearch/datageneration/datasource/DataSource.java b/test/framework/src/main/java/org/elasticsearch/datageneration/datasource/DataSource.java index 947a38839b259..b324643a39000 100644 --- a/test/framework/src/main/java/org/elasticsearch/datageneration/datasource/DataSource.java +++ b/test/framework/src/main/java/org/elasticsearch/datageneration/datasource/DataSource.java @@ -46,7 +46,7 @@ public T get(DataSourceRequest request) { return response; } } - - throw new IllegalStateException("Request is not supported by data source"); + throw new IllegalStateException("Request is not supported by data source. Request: " + request.toString() + "\n" + + "Available handlers: " + handlers.stream().map(Object::getClass).map(Class::getName).toList().toString()); } } diff --git a/test/framework/src/main/java/org/elasticsearch/datageneration/datasource/DefaultMappingParametersHandler.java b/test/framework/src/main/java/org/elasticsearch/datageneration/datasource/DefaultMappingParametersHandler.java index 2e234f8aec41c..36ce5e013a99f 100644 --- a/test/framework/src/main/java/org/elasticsearch/datageneration/datasource/DefaultMappingParametersHandler.java +++ b/test/framework/src/main/java/org/elasticsearch/datageneration/datasource/DefaultMappingParametersHandler.java @@ -48,6 +48,7 @@ public DataSourceResponse.LeafMappingParametersGenerator handle(DataSourceReques case IP -> ipMapping(); case CONSTANT_KEYWORD -> constantKeywordMapping(); case WILDCARD -> wildcardMapping(); + default -> throw new IllegalArgumentException("Unsupported field type: " + fieldType); }); } diff --git a/test/framework/src/main/java/org/elasticsearch/datageneration/datasource/DefaultObjectGenerationHandler.java b/test/framework/src/main/java/org/elasticsearch/datageneration/datasource/DefaultObjectGenerationHandler.java index c1d0cccda83ea..4c86d863d3eb5 100644 --- a/test/framework/src/main/java/org/elasticsearch/datageneration/datasource/DefaultObjectGenerationHandler.java +++ b/test/framework/src/main/java/org/elasticsearch/datageneration/datasource/DefaultObjectGenerationHandler.java @@ -12,8 +12,11 @@ import org.elasticsearch.datageneration.FieldType; import org.elasticsearch.test.ESTestCase; +import java.util.Arrays; +import java.util.List; import java.util.Optional; import java.util.Set; +import java.util.stream.Collectors; import static org.elasticsearch.test.ESTestCase.randomAlphaOfLengthBetween; import static org.elasticsearch.test.ESTestCase.randomDouble; @@ -56,12 +59,19 @@ public String generateFieldName() { // UNSIGNED_LONG is excluded because it is mapped as long // and values larger than long fail to parse. - private static final Set EXCLUDED_FROM_DYNAMIC_MAPPING = Set.of(FieldType.UNSIGNED_LONG); + private static final Set EXCLUDED_FROM_DYNAMIC_MAPPING = Set.of(FieldType.UNSIGNED_LONG, FieldType.PASSTHROUGH); @Override public DataSourceResponse.FieldTypeGenerator handle(DataSourceRequest.FieldTypeGenerator request) { return new DataSourceResponse.FieldTypeGenerator( - () -> new DataSourceResponse.FieldTypeGenerator.FieldTypeInfo(ESTestCase.randomFrom(FieldType.values()).toString()) + () -> { + // All field types minus the excluded ones. + var fieldTypes = Arrays.stream(FieldType.values()) + .filter(fieldType -> EXCLUDED_FROM_DYNAMIC_MAPPING.contains(fieldType) == false) + .collect(Collectors.toSet()); + var fieldType = ESTestCase.randomFrom(fieldTypes); + return new DataSourceResponse.FieldTypeGenerator.FieldTypeInfo(fieldType.toString()); + } ); } diff --git a/x-pack/plugin/esql/build.gradle b/x-pack/plugin/esql/build.gradle index cf49e644e4313..4c068a6b13d0d 100644 --- a/x-pack/plugin/esql/build.gradle +++ b/x-pack/plugin/esql/build.gradle @@ -35,6 +35,8 @@ dependencies { compileOnly project(':modules:lang-painless:spi') compileOnly project(xpackModule('esql-core')) compileOnly project(xpackModule('ml')) + compileOnly project(path: xpackModule('mapper-aggregate-metric')) + compileOnly project(path: xpackModule('downsample')) implementation project(xpackModule('kql')) implementation project('compute') implementation project('compute:ann') diff --git a/x-pack/plugin/esql/src/internalClusterTest/java/org/elasticsearch/xpack/esql/action/GenerativeTSIT.java b/x-pack/plugin/esql/src/internalClusterTest/java/org/elasticsearch/xpack/esql/action/GenerativeTSIT.java new file mode 100644 index 0000000000000..207fe192d8e53 --- /dev/null +++ b/x-pack/plugin/esql/src/internalClusterTest/java/org/elasticsearch/xpack/esql/action/GenerativeTSIT.java @@ -0,0 +1,303 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License + * 2.0; you may not use this file except in compliance with the Elastic License + * 2.0. + */ + +package org.elasticsearch.xpack.esql.action; + +import org.elasticsearch.Build; +import org.elasticsearch.action.DocWriteRequest; +import org.elasticsearch.action.admin.indices.template.put.TransportPutComposableIndexTemplateAction; +import org.elasticsearch.cluster.metadata.ComposableIndexTemplate; +import org.elasticsearch.cluster.metadata.DataStreamLifecycle; +import org.elasticsearch.common.Randomness; +import org.elasticsearch.common.Strings; +import org.elasticsearch.common.bytes.BytesReference; +import org.elasticsearch.common.compress.CompressedXContent; +import org.elasticsearch.common.settings.Settings; +import org.elasticsearch.common.xcontent.XContentHelper; +import org.elasticsearch.core.Nullable; +import org.elasticsearch.core.Tuple; +import org.elasticsearch.datageneration.DataGeneratorSpecification; +import org.elasticsearch.datageneration.DocumentGenerator; +import org.elasticsearch.datageneration.FieldType; +import org.elasticsearch.datageneration.Mapping; +import org.elasticsearch.datageneration.MappingGenerator; +import org.elasticsearch.datageneration.Template; +import org.elasticsearch.datageneration.TemplateGenerator; +import org.elasticsearch.datageneration.fields.PredefinedField; +import org.elasticsearch.datastreams.DataStreamsPlugin; +import org.elasticsearch.index.IndexMode; +import org.elasticsearch.index.IndexSettings; +import org.elasticsearch.plugins.Plugin; +import org.elasticsearch.test.ESTestCase; +import org.elasticsearch.xcontent.XContentBuilder; +import org.elasticsearch.xcontent.XContentFactory; +import org.elasticsearch.xcontent.XContentType; +import org.elasticsearch.xpack.aggregatemetric.AggregateMetricMapperPlugin; +import org.elasticsearch.xpack.core.LocalStateCompositeXPackPlugin; +import org.elasticsearch.xpack.esql.plugin.EsqlPlugin; +import org.junit.Before; + +import java.io.IOException; +import java.time.Instant; +import java.util.ArrayList; +import java.util.Collection; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.stream.Collectors; +import java.util.stream.IntStream; + +import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertAcked; +import static org.hamcrest.Matchers.closeTo; +import static org.hamcrest.Matchers.equalTo; + +public class GenerativeTSIT extends AbstractEsqlIntegTestCase { + + private static final String DATASTREAM_NAME = "tsit_ds"; + private List documents = null; + + static class DataGenerationHelper { + + private static Object randomDimensionValue(String dimensionName) { + // We use dimensionName to determine the type of the value. + var isNumeric = dimensionName.hashCode() % 5 == 0; + if (isNumeric) { + // Numeric values are sometimes passed as integers and sometimes as strings. + return ESTestCase.randomBoolean() + ? ESTestCase.randomIntBetween(1, 1000) + : Integer.toString(ESTestCase.randomIntBetween(1, 1000)); + } else { + return ESTestCase.randomAlphaOfLengthBetween(1, 20); + } + } + + DataGenerationHelper() { + // Metrics coming into our system have a pre-set group of attributes. + List attributesForMetrics = ESTestCase.randomList(1, 300, () -> ESTestCase.randomAlphaOfLengthBetween(1, 30)); + int numTimeSeries = ESTestCase.randomIntBetween(100, 1000); // TODO: Larger size of timeseries + // allTimeSeries contains the list of dimension-values for each time series. + List>> allTimeSeries = IntStream.range(0, numTimeSeries).mapToObj(tsIdx -> { + List dimensionsInMetric = ESTestCase.randomNonEmptySubsetOf(attributesForMetrics); + // TODO: How do we handle the case when there are no dimensions? + return dimensionsInMetric.stream().map(attr -> new Tuple<>(attr, randomDimensionValue(attr))).collect(Collectors.toList()); + }).toList(); + + spec = DataGeneratorSpecification.builder() + .withPredefinedFields( + List.of( + new PredefinedField.WithGenerator( + "@timestamp", + FieldType.DATE, + Map.of("type", "date"), + fieldMapping -> ESTestCase.randomInstantBetween(Instant.now().minusSeconds(2 * 60 * 60), Instant.now()) + ), + new PredefinedField.WithGenerator( + "attributes", + FieldType.PASSTHROUGH, + Map.of("type", "passthrough", "time_series_dimension", true, "dynamic", true, "priority", 1), + (ignored) -> { + var tsDimensions = ESTestCase.randomFrom(allTimeSeries); + return tsDimensions.stream().collect(Collectors.toMap(Tuple::v1, Tuple::v2)); + } + ), + // TODO: How do we add a `metrics` top field that contains all metrics and indexes + // them dynamically? + new PredefinedField.WithGenerator( + "metrics", + FieldType.PASSTHROUGH, + Map.of("type", "passthrough", "dynamic", true, "priority", 10), + (ignored) -> Map.of("gauge_hdd.bytes.used", Randomness.get().nextLong(0, 1000000000L)) + ) + ) + ) + .build(); + + documentGenerator = new DocumentGenerator(spec); + template = new TemplateGenerator(spec).generate(); + mapping = new MappingGenerator(spec).generate(template); + var doc = mapping.raw().get("_doc"); + @SuppressWarnings("unchecked") + Map docMap = ((Map) doc); + // Add dynamic templates to the mapping + docMap.put( + "dynamic_templates", + List.of( + Map.of( + "counter_long", + Map.of("path_match", "metrics.counter_*", "mapping", Map.of("type", "long", "time_series_metric", "counter")) + ), + Map.of( + "gauge_long", + Map.of("path_match", "metrics.gauge_*", "mapping", Map.of("type", "long", "time_series_metric", "gauge")) + ), + Map.of( + "counter_double", + Map.of("path_match", "metrics.counter_*", "mapping", Map.of("type", "double", "time_series_metric", "counter")) + ), + Map.of( + "gauge_double", + Map.of("path_match", "metrics.gauge_*", "mapping", Map.of("type", "double", "time_series_metric", "gauge")) + ) + ) + ); + } + + final DataGeneratorSpecification spec; + final DocumentGenerator documentGenerator; + final Template template; + final Mapping mapping; + + XContentBuilder generateDocument(Map additionalFields) throws IOException { + var doc = XContentFactory.jsonBuilder(); + var generated = documentGenerator.generate(template, mapping); + generated.putAll(additionalFields); + + doc.map(generated); + return doc; + } + } + + Map, List>> groupedRows( + List docs, + List groupingAttributes, + int secondsInWindow + ) { + Map, List>> groupedMap = new HashMap<>(); + for (XContentBuilder doc : docs) { + Map docMap = XContentHelper.convertToMap(BytesReference.bytes(doc), false, XContentType.JSON).v2(); + @SuppressWarnings("unchecked") + List groupingValues = groupingAttributes.stream() + .map(attr -> ((Map) docMap.get("attributes")).get(attr).toString()) + .collect(Collectors.toList()); + // TODO: Verify that this window start calculation is correct. + long timeBucketStart = Instant.parse(((String) docMap.get("@timestamp"))).toEpochMilli() / 1000 / secondsInWindow + * secondsInWindow; + String key = String.join("|", groupingValues) + "|" + timeBucketStart; + // TODO: Why use this pipe syntax lol + groupedMap.computeIfAbsent(List.of(key), k -> new ArrayList<>()).add(docMap); + } + return groupedMap; + } + + @Override + public EsqlQueryResponse run(EsqlQueryRequest request) { + assumeTrue("time series available in snapshot builds only", Build.current().isSnapshot()); + return super.run(request); + } + + @Override + protected Collection> nodePlugins() { + return List.of( + DataStreamsPlugin.class, + LocalStateCompositeXPackPlugin.class, + // Downsample.class, // TODO(pabloem): What are these + AggregateMetricMapperPlugin.class, + EsqlPlugin.class + ); + } + + void putTSDBIndexTemplate( + String id, + List patterns, + @Nullable Settings settings, + @Nullable String mappingString, + @Nullable DataStreamLifecycle.Template lifecycle, + @Nullable Map metadata + ) throws IOException { + Settings.Builder settingsBuilder = Settings.builder(); + if (settings != null) { + settingsBuilder.put(settings); + } + // Ensure it will be a TSDB data stream + settingsBuilder.put(IndexSettings.MODE.getKey(), IndexMode.TIME_SERIES); + settingsBuilder.putList("index.routing_path", List.of("attributes.*")); + CompressedXContent mappings = mappingString == null ? null : CompressedXContent.fromJSON(mappingString); + // print the mapping + TransportPutComposableIndexTemplateAction.Request request = new TransportPutComposableIndexTemplateAction.Request(id); + request.indexTemplate( + ComposableIndexTemplate.builder() + .indexPatterns(patterns) + .template( + org.elasticsearch.cluster.metadata.Template.builder().settings(settingsBuilder).mappings(mappings).lifecycle(lifecycle) + ) + .metadata(metadata) + .dataStreamTemplate(new ComposableIndexTemplate.DataStreamTemplate()) + .build() + ); + assertAcked(client().execute(TransportPutComposableIndexTemplateAction.TYPE, request)); + } + + @Before + public void populateIndex() throws IOException { + var dataGenHelper = new DataGenerationHelper(); + final XContentBuilder builder = XContentFactory.jsonBuilder(); + builder.map(dataGenHelper.mapping.raw()); + // print the mapping + System.out.println("PABLO Data stream mapping: " + Strings.toString(builder)); + final String jsonMappings = Strings.toString(builder); + + putTSDBIndexTemplate(DATASTREAM_NAME, List.of(DATASTREAM_NAME + "*"), null, jsonMappings, null, null); + // Now we can push data into the data stream. + for (int i = 0; i < 1000; i++) { + var document = dataGenHelper.generateDocument(Map.of()); + if (documents == null) { + documents = new ArrayList<>(); + } + documents.add(document); + var indexRequest = client().prepareIndex(DATASTREAM_NAME).setOpType(DocWriteRequest.OpType.CREATE).setSource(document); + indexRequest.setRefreshPolicy(org.elasticsearch.action.support.WriteRequest.RefreshPolicy.IMMEDIATE); + indexRequest.get(); + } + } + + public void testGroupByNothing() { + try ( + var resp = run( + String.format( + """ + TS %s + | STATS max(max_over_time(metrics.gauge_hdd.bytes.used)), avg(avg_over_time(metrics.gauge_hdd.bytes.used)), min(min_over_time(metrics.gauge_hdd.bytes.used)) BY tbucket=bucket(@timestamp, 1 minute) + | SORT tbucket + | LIMIT 100""", + DATASTREAM_NAME + ) + ) + ) { + List> rows = new ArrayList<>(); + resp.rows().forEach(rowIter -> { + List row = new ArrayList<>(); + rowIter.forEach(row::add); + rows.add(row); + }); + var groups = groupedRows(documents, List.of(), 60); + for (int i = 0; i < rows.size(); i++) { + var row = rows.get(i); + var windowStart = Instant.parse((String) row.getLast()).toEpochMilli() / 1000 / 60 * 60; + var windowDataPoints = groups.get(List.of(String.format("|%d", windowStart))); + @SuppressWarnings("unchecked") + var docValues = windowDataPoints.stream() + .map(doc -> ((Map) doc.get("metrics")).get("gauge_hdd.bytes.used")) + .toList(); + docValues.stream().max(Integer::compareTo).ifPresentOrElse(maxValue -> { + var res = ((Long) row.getFirst()).intValue(); + assertThat(res, equalTo(maxValue)); + }, () -> { throw new AssertionError("No values found for window starting at " + windowStart); }); + docValues.stream().mapToDouble(Integer::doubleValue).average().ifPresentOrElse(avgValue -> { + var res = (Double) row.get(1); + assertThat(res, closeTo(avgValue, res * 0.5)); + }, () -> { + ; + throw new AssertionError("No values found for window starting at " + windowStart); + }); + docValues.stream().min(Integer::compareTo).ifPresentOrElse(minValue -> { + var res = ((Long) row.get(2)).intValue(); + assertThat(res, equalTo(minValue)); + }, () -> { throw new AssertionError("No values found for window starting at " + windowStart); }); + } + } + } +} From 2d70ceffed9ab7039b619f90afde4d51fcad48fa Mon Sep 17 00:00:00 2001 From: elasticsearchmachine Date: Fri, 8 Aug 2025 00:04:38 +0000 Subject: [PATCH 2/9] [CI] Auto commit changes from spotless --- .../datageneration/datasource/DataSource.java | 9 +++++++-- .../DefaultObjectGenerationHandler.java | 19 ++++++++----------- 2 files changed, 15 insertions(+), 13 deletions(-) diff --git a/test/framework/src/main/java/org/elasticsearch/datageneration/datasource/DataSource.java b/test/framework/src/main/java/org/elasticsearch/datageneration/datasource/DataSource.java index b324643a39000..74ebbc167f016 100644 --- a/test/framework/src/main/java/org/elasticsearch/datageneration/datasource/DataSource.java +++ b/test/framework/src/main/java/org/elasticsearch/datageneration/datasource/DataSource.java @@ -46,7 +46,12 @@ public T get(DataSourceRequest request) { return response; } } - throw new IllegalStateException("Request is not supported by data source. Request: " + request.toString() + "\n" + - "Available handlers: " + handlers.stream().map(Object::getClass).map(Class::getName).toList().toString()); + throw new IllegalStateException( + "Request is not supported by data source. Request: " + + request.toString() + + "\n" + + "Available handlers: " + + handlers.stream().map(Object::getClass).map(Class::getName).toList().toString() + ); } } diff --git a/test/framework/src/main/java/org/elasticsearch/datageneration/datasource/DefaultObjectGenerationHandler.java b/test/framework/src/main/java/org/elasticsearch/datageneration/datasource/DefaultObjectGenerationHandler.java index 4c86d863d3eb5..87ff5d0756577 100644 --- a/test/framework/src/main/java/org/elasticsearch/datageneration/datasource/DefaultObjectGenerationHandler.java +++ b/test/framework/src/main/java/org/elasticsearch/datageneration/datasource/DefaultObjectGenerationHandler.java @@ -13,7 +13,6 @@ import org.elasticsearch.test.ESTestCase; import java.util.Arrays; -import java.util.List; import java.util.Optional; import java.util.Set; import java.util.stream.Collectors; @@ -63,16 +62,14 @@ public String generateFieldName() { @Override public DataSourceResponse.FieldTypeGenerator handle(DataSourceRequest.FieldTypeGenerator request) { - return new DataSourceResponse.FieldTypeGenerator( - () -> { - // All field types minus the excluded ones. - var fieldTypes = Arrays.stream(FieldType.values()) - .filter(fieldType -> EXCLUDED_FROM_DYNAMIC_MAPPING.contains(fieldType) == false) - .collect(Collectors.toSet()); - var fieldType = ESTestCase.randomFrom(fieldTypes); - return new DataSourceResponse.FieldTypeGenerator.FieldTypeInfo(fieldType.toString()); - } - ); + return new DataSourceResponse.FieldTypeGenerator(() -> { + // All field types minus the excluded ones. + var fieldTypes = Arrays.stream(FieldType.values()) + .filter(fieldType -> EXCLUDED_FROM_DYNAMIC_MAPPING.contains(fieldType) == false) + .collect(Collectors.toSet()); + var fieldType = ESTestCase.randomFrom(fieldTypes); + return new DataSourceResponse.FieldTypeGenerator.FieldTypeInfo(fieldType.toString()); + }); } @Override From 19e2f8ae0e0e1a2306b7452016d8d09fe511deed Mon Sep 17 00:00:00 2001 From: Pablo Date: Fri, 8 Aug 2025 17:03:01 -0700 Subject: [PATCH 3/9] First two randomized test cases --- .../xpack/esql/action/GenerativeTSIT.java | 113 +++++++++++++----- 1 file changed, 84 insertions(+), 29 deletions(-) diff --git a/x-pack/plugin/esql/src/internalClusterTest/java/org/elasticsearch/xpack/esql/action/GenerativeTSIT.java b/x-pack/plugin/esql/src/internalClusterTest/java/org/elasticsearch/xpack/esql/action/GenerativeTSIT.java index 207fe192d8e53..9ab7737857645 100644 --- a/x-pack/plugin/esql/src/internalClusterTest/java/org/elasticsearch/xpack/esql/action/GenerativeTSIT.java +++ b/x-pack/plugin/esql/src/internalClusterTest/java/org/elasticsearch/xpack/esql/action/GenerativeTSIT.java @@ -47,6 +47,7 @@ import java.util.Collection; import java.util.HashMap; import java.util.List; +import java.util.Locale; import java.util.Map; import java.util.stream.Collectors; import java.util.stream.IntStream; @@ -59,8 +60,9 @@ public class GenerativeTSIT extends AbstractEsqlIntegTestCase { private static final String DATASTREAM_NAME = "tsit_ds"; private List documents = null; + private DataGenerationHelper dataGenerationHelper; - static class DataGenerationHelper { + static final class DataGenerationHelper { private static Object randomDimensionValue(String dimensionName) { // We use dimensionName to determine the type of the value. @@ -77,16 +79,18 @@ private static Object randomDimensionValue(String dimensionName) { DataGenerationHelper() { // Metrics coming into our system have a pre-set group of attributes. - List attributesForMetrics = ESTestCase.randomList(1, 300, () -> ESTestCase.randomAlphaOfLengthBetween(1, 30)); - int numTimeSeries = ESTestCase.randomIntBetween(100, 1000); // TODO: Larger size of timeseries + attributesForMetrics = ESTestCase.randomList(1, 300, () -> ESTestCase.randomAlphaOfLengthBetween(1, 30)); + numTimeSeries = ESTestCase.randomIntBetween(10, 50); // TODO: Larger size of timeseries + // System.out.println("Total of time series: " + numTimeSeries); // allTimeSeries contains the list of dimension-values for each time series. List>> allTimeSeries = IntStream.range(0, numTimeSeries).mapToObj(tsIdx -> { List dimensionsInMetric = ESTestCase.randomNonEmptySubsetOf(attributesForMetrics); - // TODO: How do we handle the case when there are no dimensions? + // TODO: How do we handle the case when there are no dimensions? (i.e. regular randomSubsetof(...) return dimensionsInMetric.stream().map(attr -> new Tuple<>(attr, randomDimensionValue(attr))).collect(Collectors.toList()); }).toList(); spec = DataGeneratorSpecification.builder() + .withMaxFieldCountPerLevel(0) .withPredefinedFields( List.of( new PredefinedField.WithGenerator( @@ -150,6 +154,8 @@ private static Object randomDimensionValue(String dimensionName) { final DocumentGenerator documentGenerator; final Template template; final Mapping mapping; + final int numTimeSeries; + final List attributesForMetrics; XContentBuilder generateDocument(Map additionalFields) throws IOException { var doc = XContentFactory.jsonBuilder(); @@ -170,15 +176,22 @@ Map, List>> groupedRows( for (XContentBuilder doc : docs) { Map docMap = XContentHelper.convertToMap(BytesReference.bytes(doc), false, XContentType.JSON).v2(); @SuppressWarnings("unchecked") - List groupingValues = groupingAttributes.stream() - .map(attr -> ((Map) docMap.get("attributes")).get(attr).toString()) - .collect(Collectors.toList()); + List groupingPairs = groupingAttributes.stream() + .map( + attr -> Tuple.tuple( + attr, + ((Map) docMap.getOrDefault("attributes", Map.of())).getOrDefault(attr, "").toString() + ) + ) + .filter(val -> val.v2().isEmpty() == false) // Filter out empty values + .map(tup -> tup.v1() + ":" + tup.v2()) + .toList(); // TODO: Verify that this window start calculation is correct. long timeBucketStart = Instant.parse(((String) docMap.get("@timestamp"))).toEpochMilli() / 1000 / secondsInWindow * secondsInWindow; - String key = String.join("|", groupingValues) + "|" + timeBucketStart; - // TODO: Why use this pipe syntax lol - groupedMap.computeIfAbsent(List.of(key), k -> new ArrayList<>()).add(docMap); + var keyList = new ArrayList<>(groupingPairs); + keyList.add(Long.toString(timeBucketStart)); + groupedMap.computeIfAbsent(keyList, k -> new ArrayList<>()).add(docMap); } return groupedMap; } @@ -233,17 +246,17 @@ void putTSDBIndexTemplate( @Before public void populateIndex() throws IOException { - var dataGenHelper = new DataGenerationHelper(); + dataGenerationHelper = new DataGenerationHelper(); final XContentBuilder builder = XContentFactory.jsonBuilder(); - builder.map(dataGenHelper.mapping.raw()); + builder.map(dataGenerationHelper.mapping.raw()); // print the mapping - System.out.println("PABLO Data stream mapping: " + Strings.toString(builder)); + // System.out.println("PABLO Data stream mapping: " + Strings.toString(builder)); final String jsonMappings = Strings.toString(builder); putTSDBIndexTemplate(DATASTREAM_NAME, List.of(DATASTREAM_NAME + "*"), null, jsonMappings, null, null); // Now we can push data into the data stream. for (int i = 0; i < 1000; i++) { - var document = dataGenHelper.generateDocument(Map.of()); + var document = dataGenerationHelper.generateDocument(Map.of()); if (documents == null) { documents = new ArrayList<>(); } @@ -254,19 +267,62 @@ public void populateIndex() throws IOException { } } + public void testGroupBySubset() { + var dimensions = ESTestCase.randomNonEmptySubsetOf(dataGenerationHelper.attributesForMetrics); + var dimensionsStr = dimensions.stream().map(d -> "attributes." + d).collect(Collectors.joining(", ")); + try (var resp = run(String.format(Locale.ROOT, """ + TS %s + | STATS max(max_over_time(metrics.gauge_hdd.bytes.used)), + min(min_over_time(metrics.gauge_hdd.bytes.used)), + avg(avg_over_time(metrics.gauge_hdd.bytes.used)) + BY tbucket=bucket(@timestamp, 1 minute), %s + | SORT tbucket + | LIMIT 1000""", DATASTREAM_NAME, dimensionsStr))) { + var groups = groupedRows(documents, dimensions, 60); + List> rows = new ArrayList<>(); + resp.rows().forEach(rowIter -> { + List row = new ArrayList<>(); + rowIter.forEach(row::add); + rows.add(row); + }); + // Print rows for now + for (List row : rows) { + var rowGroupingAttributes = row.subList(4, row.size()); + var rowKey = IntStream.range(0, dimensions.size()) + .filter(idx -> rowGroupingAttributes.get(idx) != null) + .mapToObj(idx -> (dimensions.get(idx) + ":" + rowGroupingAttributes.get(idx))) + .collect(Collectors.toList()); + rowKey.add(Long.toString(Instant.parse((String) row.get(3)).toEpochMilli() / 1000)); + var pointsInGroup = groups.get(rowKey); + @SuppressWarnings("unchecked") + var docValues = pointsInGroup.stream() + .map(doc -> ((Map) doc.get("metrics")).get("gauge_hdd.bytes.used")) + .toList(); + docValues.stream().max(Integer::compareTo).ifPresentOrElse(maxValue -> { + var res = ((Long) row.getFirst()).intValue(); + assertThat(res, equalTo(maxValue)); + }, () -> { throw new AssertionError("No values found for group: " + rowKey); }); + docValues.stream().min(Integer::compareTo).ifPresentOrElse(minValue -> { + var res = ((Long) row.get(1)).intValue(); + assertThat(res, equalTo(minValue)); + }, () -> { throw new AssertionError("No values found for group: " + rowKey); }); + docValues.stream().mapToDouble(Integer::doubleValue).average().ifPresentOrElse(avgValue -> { + var res = (Double) row.get(2); + assertThat(res, closeTo(avgValue, res * 0.5)); + }, () -> { throw new AssertionError("No values found for group: " + rowKey); }); + } + } + } + public void testGroupByNothing() { - try ( - var resp = run( - String.format( - """ - TS %s - | STATS max(max_over_time(metrics.gauge_hdd.bytes.used)), avg(avg_over_time(metrics.gauge_hdd.bytes.used)), min(min_over_time(metrics.gauge_hdd.bytes.used)) BY tbucket=bucket(@timestamp, 1 minute) - | SORT tbucket - | LIMIT 100""", - DATASTREAM_NAME - ) - ) - ) { + try (var resp = run(String.format(Locale.ROOT, """ + TS %s + | STATS + max(max_over_time(metrics.gauge_hdd.bytes.used)), + avg(avg_over_time(metrics.gauge_hdd.bytes.used)), + min(min_over_time(metrics.gauge_hdd.bytes.used)) BY tbucket=bucket(@timestamp, 1 minute) + | SORT tbucket + | LIMIT 1000""", DATASTREAM_NAME))) { List> rows = new ArrayList<>(); resp.rows().forEach(rowIter -> { List row = new ArrayList<>(); @@ -274,10 +330,9 @@ public void testGroupByNothing() { rows.add(row); }); var groups = groupedRows(documents, List.of(), 60); - for (int i = 0; i < rows.size(); i++) { - var row = rows.get(i); + for (List row : rows) { var windowStart = Instant.parse((String) row.getLast()).toEpochMilli() / 1000 / 60 * 60; - var windowDataPoints = groups.get(List.of(String.format("|%d", windowStart))); + var windowDataPoints = groups.get(List.of(Long.toString(windowStart))); @SuppressWarnings("unchecked") var docValues = windowDataPoints.stream() .map(doc -> ((Map) doc.get("metrics")).get("gauge_hdd.bytes.used")) From 14b4c1756413261289e4525b00e0124c7b543618 Mon Sep 17 00:00:00 2001 From: Pablo Date: Fri, 8 Aug 2025 17:13:59 -0700 Subject: [PATCH 4/9] smol cleanup --- .../org/elasticsearch/xpack/esql/action/GenerativeTSIT.java | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/x-pack/plugin/esql/src/internalClusterTest/java/org/elasticsearch/xpack/esql/action/GenerativeTSIT.java b/x-pack/plugin/esql/src/internalClusterTest/java/org/elasticsearch/xpack/esql/action/GenerativeTSIT.java index 9ab7737857645..81f5f242cacbd 100644 --- a/x-pack/plugin/esql/src/internalClusterTest/java/org/elasticsearch/xpack/esql/action/GenerativeTSIT.java +++ b/x-pack/plugin/esql/src/internalClusterTest/java/org/elasticsearch/xpack/esql/action/GenerativeTSIT.java @@ -49,6 +49,7 @@ import java.util.List; import java.util.Locale; import java.util.Map; +import java.util.Set; import java.util.stream.Collectors; import java.util.stream.IntStream; @@ -79,7 +80,8 @@ private static Object randomDimensionValue(String dimensionName) { DataGenerationHelper() { // Metrics coming into our system have a pre-set group of attributes. - attributesForMetrics = ESTestCase.randomList(1, 300, () -> ESTestCase.randomAlphaOfLengthBetween(1, 30)); + // Making a list-to-set-to-list to ensure uniqueness. + attributesForMetrics = List.copyOf(Set.copyOf(ESTestCase.randomList(1, 300, () -> ESTestCase.randomAlphaOfLengthBetween(1, 30)))); numTimeSeries = ESTestCase.randomIntBetween(10, 50); // TODO: Larger size of timeseries // System.out.println("Total of time series: " + numTimeSeries); // allTimeSeries contains the list of dimension-values for each time series. @@ -108,8 +110,6 @@ private static Object randomDimensionValue(String dimensionName) { return tsDimensions.stream().collect(Collectors.toMap(Tuple::v1, Tuple::v2)); } ), - // TODO: How do we add a `metrics` top field that contains all metrics and indexes - // them dynamically? new PredefinedField.WithGenerator( "metrics", FieldType.PASSTHROUGH, From 64624be23382fd5ecd9b304bdfb8caade604a69e Mon Sep 17 00:00:00 2001 From: elasticsearchmachine Date: Sat, 9 Aug 2025 00:20:33 +0000 Subject: [PATCH 5/9] [CI] Auto commit changes from spotless --- .../org/elasticsearch/xpack/esql/action/GenerativeTSIT.java | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/x-pack/plugin/esql/src/internalClusterTest/java/org/elasticsearch/xpack/esql/action/GenerativeTSIT.java b/x-pack/plugin/esql/src/internalClusterTest/java/org/elasticsearch/xpack/esql/action/GenerativeTSIT.java index 81f5f242cacbd..e8147b842062c 100644 --- a/x-pack/plugin/esql/src/internalClusterTest/java/org/elasticsearch/xpack/esql/action/GenerativeTSIT.java +++ b/x-pack/plugin/esql/src/internalClusterTest/java/org/elasticsearch/xpack/esql/action/GenerativeTSIT.java @@ -81,7 +81,9 @@ private static Object randomDimensionValue(String dimensionName) { DataGenerationHelper() { // Metrics coming into our system have a pre-set group of attributes. // Making a list-to-set-to-list to ensure uniqueness. - attributesForMetrics = List.copyOf(Set.copyOf(ESTestCase.randomList(1, 300, () -> ESTestCase.randomAlphaOfLengthBetween(1, 30)))); + attributesForMetrics = List.copyOf( + Set.copyOf(ESTestCase.randomList(1, 300, () -> ESTestCase.randomAlphaOfLengthBetween(1, 30))) + ); numTimeSeries = ESTestCase.randomIntBetween(10, 50); // TODO: Larger size of timeseries // System.out.println("Total of time series: " + numTimeSeries); // allTimeSeries contains the list of dimension-values for each time series. From 77a368ba3d7c8ca1f30b967d569d92b7d13228c2 Mon Sep 17 00:00:00 2001 From: Pablo Date: Mon, 11 Aug 2025 11:04:18 -0700 Subject: [PATCH 6/9] cleanup and ready for first check --- .../xpack/esql/action/GenerativeTSIT.java | 73 +++++++++---------- 1 file changed, 36 insertions(+), 37 deletions(-) diff --git a/x-pack/plugin/esql/src/internalClusterTest/java/org/elasticsearch/xpack/esql/action/GenerativeTSIT.java b/x-pack/plugin/esql/src/internalClusterTest/java/org/elasticsearch/xpack/esql/action/GenerativeTSIT.java index e8147b842062c..1193274e1db67 100644 --- a/x-pack/plugin/esql/src/internalClusterTest/java/org/elasticsearch/xpack/esql/action/GenerativeTSIT.java +++ b/x-pack/plugin/esql/src/internalClusterTest/java/org/elasticsearch/xpack/esql/action/GenerativeTSIT.java @@ -11,7 +11,6 @@ import org.elasticsearch.action.DocWriteRequest; import org.elasticsearch.action.admin.indices.template.put.TransportPutComposableIndexTemplateAction; import org.elasticsearch.cluster.metadata.ComposableIndexTemplate; -import org.elasticsearch.cluster.metadata.DataStreamLifecycle; import org.elasticsearch.common.Randomness; import org.elasticsearch.common.Strings; import org.elasticsearch.common.bytes.BytesReference; @@ -59,11 +58,12 @@ public class GenerativeTSIT extends AbstractEsqlIntegTestCase { + private static final Long NUM_DOCS = 1000L; private static final String DATASTREAM_NAME = "tsit_ds"; private List documents = null; private DataGenerationHelper dataGenerationHelper; - static final class DataGenerationHelper { + private static final class DataGenerationHelper { private static Object randomDimensionValue(String dimensionName) { // We use dimensionName to determine the type of the value. @@ -78,13 +78,14 @@ private static Object randomDimensionValue(String dimensionName) { } } - DataGenerationHelper() { + DataGenerationHelper(long numDocs) { // Metrics coming into our system have a pre-set group of attributes. // Making a list-to-set-to-list to ensure uniqueness. + this.numDocs = numDocs; attributesForMetrics = List.copyOf( - Set.copyOf(ESTestCase.randomList(1, 300, () -> ESTestCase.randomAlphaOfLengthBetween(1, 30))) + Set.copyOf(ESTestCase.randomList(1, 300, () -> ESTestCase.randomAlphaOfLengthBetween(2, 30))) ); - numTimeSeries = ESTestCase.randomIntBetween(10, 50); // TODO: Larger size of timeseries + numTimeSeries = ESTestCase.randomIntBetween(10, (int) Math.sqrt(numDocs)); // System.out.println("Total of time series: " + numTimeSeries); // allTimeSeries contains the list of dimension-values for each time series. List>> allTimeSeries = IntStream.range(0, numTimeSeries).mapToObj(tsIdx -> { @@ -157,6 +158,7 @@ private static Object randomDimensionValue(String dimensionName) { final Template template; final Mapping mapping; final int numTimeSeries; + final long numDocs; final List attributesForMetrics; XContentBuilder generateDocument(Map additionalFields) throws IOException { @@ -188,9 +190,7 @@ Map, List>> groupedRows( .filter(val -> val.v2().isEmpty() == false) // Filter out empty values .map(tup -> tup.v1() + ":" + tup.v2()) .toList(); - // TODO: Verify that this window start calculation is correct. - long timeBucketStart = Instant.parse(((String) docMap.get("@timestamp"))).toEpochMilli() / 1000 / secondsInWindow - * secondsInWindow; + long timeBucketStart = windowStart(docMap.get("@timestamp"), secondsInWindow); var keyList = new ArrayList<>(groupingPairs); keyList.add(Long.toString(timeBucketStart)); groupedMap.computeIfAbsent(keyList, k -> new ArrayList<>()).add(docMap); @@ -198,6 +198,23 @@ Map, List>> groupedRows( return groupedMap; } + static Long windowStart(Object timestampCell, int secondsInWindow) { + // The timestamp is in the 4th column (index 3) + return Instant.parse((String) timestampCell).toEpochMilli() / 1000 / secondsInWindow * secondsInWindow; + } + + static List getRowKey(List row, List groupingAttributes) { + List rowKey = new ArrayList<>(); + for (int i = 0; i < groupingAttributes.size(); i++) { + Object value = row.get(i + 4); // Skip the first four columns + if (value != null) { + rowKey.add(groupingAttributes.get(i) + ":" + value); + } + } + rowKey.add(Long.toString(Instant.parse((String) row.get(3)).toEpochMilli() / 1000)); + return rowKey; + } + @Override public EsqlQueryResponse run(EsqlQueryRequest request) { assumeTrue("time series available in snapshot builds only", Build.current().isSnapshot()); @@ -215,31 +232,21 @@ protected Collection> nodePlugins() { ); } - void putTSDBIndexTemplate( - String id, - List patterns, - @Nullable Settings settings, - @Nullable String mappingString, - @Nullable DataStreamLifecycle.Template lifecycle, - @Nullable Map metadata - ) throws IOException { + void putTSDBIndexTemplate(List patterns, @Nullable String mappingString) throws IOException { Settings.Builder settingsBuilder = Settings.builder(); - if (settings != null) { - settingsBuilder.put(settings); - } // Ensure it will be a TSDB data stream settingsBuilder.put(IndexSettings.MODE.getKey(), IndexMode.TIME_SERIES); settingsBuilder.putList("index.routing_path", List.of("attributes.*")); CompressedXContent mappings = mappingString == null ? null : CompressedXContent.fromJSON(mappingString); // print the mapping - TransportPutComposableIndexTemplateAction.Request request = new TransportPutComposableIndexTemplateAction.Request(id); + TransportPutComposableIndexTemplateAction.Request request = new TransportPutComposableIndexTemplateAction.Request( + GenerativeTSIT.DATASTREAM_NAME + ); request.indexTemplate( ComposableIndexTemplate.builder() .indexPatterns(patterns) - .template( - org.elasticsearch.cluster.metadata.Template.builder().settings(settingsBuilder).mappings(mappings).lifecycle(lifecycle) - ) - .metadata(metadata) + .template(org.elasticsearch.cluster.metadata.Template.builder().settings(settingsBuilder).mappings(mappings)) + .metadata(null) .dataStreamTemplate(new ComposableIndexTemplate.DataStreamTemplate()) .build() ); @@ -248,16 +255,14 @@ void putTSDBIndexTemplate( @Before public void populateIndex() throws IOException { - dataGenerationHelper = new DataGenerationHelper(); + dataGenerationHelper = new DataGenerationHelper(NUM_DOCS); final XContentBuilder builder = XContentFactory.jsonBuilder(); builder.map(dataGenerationHelper.mapping.raw()); - // print the mapping - // System.out.println("PABLO Data stream mapping: " + Strings.toString(builder)); final String jsonMappings = Strings.toString(builder); - putTSDBIndexTemplate(DATASTREAM_NAME, List.of(DATASTREAM_NAME + "*"), null, jsonMappings, null, null); + putTSDBIndexTemplate(List.of(DATASTREAM_NAME + "*"), jsonMappings); // Now we can push data into the data stream. - for (int i = 0; i < 1000; i++) { + for (int i = 0; i < NUM_DOCS; i++) { var document = dataGenerationHelper.generateDocument(Map.of()); if (documents == null) { documents = new ArrayList<>(); @@ -287,14 +292,8 @@ public void testGroupBySubset() { rowIter.forEach(row::add); rows.add(row); }); - // Print rows for now for (List row : rows) { - var rowGroupingAttributes = row.subList(4, row.size()); - var rowKey = IntStream.range(0, dimensions.size()) - .filter(idx -> rowGroupingAttributes.get(idx) != null) - .mapToObj(idx -> (dimensions.get(idx) + ":" + rowGroupingAttributes.get(idx))) - .collect(Collectors.toList()); - rowKey.add(Long.toString(Instant.parse((String) row.get(3)).toEpochMilli() / 1000)); + var rowKey = getRowKey(row, dimensions); var pointsInGroup = groups.get(rowKey); @SuppressWarnings("unchecked") var docValues = pointsInGroup.stream() @@ -333,7 +332,7 @@ public void testGroupByNothing() { }); var groups = groupedRows(documents, List.of(), 60); for (List row : rows) { - var windowStart = Instant.parse((String) row.getLast()).toEpochMilli() / 1000 / 60 * 60; + var windowStart = windowStart(row.get(3), 60); var windowDataPoints = groups.get(List.of(Long.toString(windowStart))); @SuppressWarnings("unchecked") var docValues = windowDataPoints.stream() From afdac85e6846f14e73f855b060a5b6008d7ee0fe Mon Sep 17 00:00:00 2001 From: Pablo Date: Mon, 11 Aug 2025 17:18:13 -0700 Subject: [PATCH 7/9] Address comments --- .../datageneration/FieldType.java | 5 +- .../datageneration/MappingGenerator.java | 2 - .../DefaultMappingParametersHandler.java | 2 +- .../xpack/esql/action/GenerativeTSIT.java | 149 +++--------------- .../esql/action/TSDataGenerationHelper.java | 141 +++++++++++++++++ 5 files changed, 169 insertions(+), 130 deletions(-) create mode 100644 x-pack/plugin/esql/src/internalClusterTest/java/org/elasticsearch/xpack/esql/action/TSDataGenerationHelper.java diff --git a/test/framework/src/main/java/org/elasticsearch/datageneration/FieldType.java b/test/framework/src/main/java/org/elasticsearch/datageneration/FieldType.java index 38c7e8bd177fb..4631b1a94bb0f 100644 --- a/test/framework/src/main/java/org/elasticsearch/datageneration/FieldType.java +++ b/test/framework/src/main/java/org/elasticsearch/datageneration/FieldType.java @@ -79,7 +79,7 @@ public FieldDataGenerator generator(String fieldName, DataSource dataSource) { case IP -> new IpFieldDataGenerator(dataSource); case CONSTANT_KEYWORD -> new ConstantKeywordFieldDataGenerator(); case WILDCARD -> new WildcardFieldDataGenerator(dataSource); - default -> null; + case PASSTHROUGH -> throw new IllegalArgumentException("Passthrough field type does not have a default generator"); }; } @@ -103,7 +103,8 @@ public static FieldType tryParse(String name) { case "ip" -> FieldType.IP; case "constant_keyword" -> FieldType.CONSTANT_KEYWORD; case "wildcard" -> FieldType.WILDCARD; - default -> null; + case "passthrough" -> FieldType.PASSTHROUGH; + default -> throw new IllegalArgumentException("Unknown field type: " + name); }; } diff --git a/test/framework/src/main/java/org/elasticsearch/datageneration/MappingGenerator.java b/test/framework/src/main/java/org/elasticsearch/datageneration/MappingGenerator.java index 30cc52c4334b5..0d66e7bd2062b 100644 --- a/test/framework/src/main/java/org/elasticsearch/datageneration/MappingGenerator.java +++ b/test/framework/src/main/java/org/elasticsearch/datageneration/MappingGenerator.java @@ -65,9 +65,7 @@ public Mapping generate(Template template) { rawMapping.put("_doc", topLevelMappingParameters); if (specification.fullyDynamicMapping() == false) { - // Has to be "true" for fully dynamic mapping topLevelMappingParameters.remove("dynamic"); - return new Mapping(rawMapping, lookup); } diff --git a/test/framework/src/main/java/org/elasticsearch/datageneration/datasource/DefaultMappingParametersHandler.java b/test/framework/src/main/java/org/elasticsearch/datageneration/datasource/DefaultMappingParametersHandler.java index 36ce5e013a99f..974283362bc93 100644 --- a/test/framework/src/main/java/org/elasticsearch/datageneration/datasource/DefaultMappingParametersHandler.java +++ b/test/framework/src/main/java/org/elasticsearch/datageneration/datasource/DefaultMappingParametersHandler.java @@ -48,7 +48,7 @@ public DataSourceResponse.LeafMappingParametersGenerator handle(DataSourceReques case IP -> ipMapping(); case CONSTANT_KEYWORD -> constantKeywordMapping(); case WILDCARD -> wildcardMapping(); - default -> throw new IllegalArgumentException("Unsupported field type: " + fieldType); + case PASSTHROUGH -> throw new IllegalArgumentException("Unsupported field type: " + fieldType); }); } diff --git a/x-pack/plugin/esql/src/internalClusterTest/java/org/elasticsearch/xpack/esql/action/GenerativeTSIT.java b/x-pack/plugin/esql/src/internalClusterTest/java/org/elasticsearch/xpack/esql/action/GenerativeTSIT.java index 1193274e1db67..b71deacffaa9e 100644 --- a/x-pack/plugin/esql/src/internalClusterTest/java/org/elasticsearch/xpack/esql/action/GenerativeTSIT.java +++ b/x-pack/plugin/esql/src/internalClusterTest/java/org/elasticsearch/xpack/esql/action/GenerativeTSIT.java @@ -11,7 +11,6 @@ import org.elasticsearch.action.DocWriteRequest; import org.elasticsearch.action.admin.indices.template.put.TransportPutComposableIndexTemplateAction; import org.elasticsearch.cluster.metadata.ComposableIndexTemplate; -import org.elasticsearch.common.Randomness; import org.elasticsearch.common.Strings; import org.elasticsearch.common.bytes.BytesReference; import org.elasticsearch.common.compress.CompressedXContent; @@ -19,14 +18,6 @@ import org.elasticsearch.common.xcontent.XContentHelper; import org.elasticsearch.core.Nullable; import org.elasticsearch.core.Tuple; -import org.elasticsearch.datageneration.DataGeneratorSpecification; -import org.elasticsearch.datageneration.DocumentGenerator; -import org.elasticsearch.datageneration.FieldType; -import org.elasticsearch.datageneration.Mapping; -import org.elasticsearch.datageneration.MappingGenerator; -import org.elasticsearch.datageneration.Template; -import org.elasticsearch.datageneration.TemplateGenerator; -import org.elasticsearch.datageneration.fields.PredefinedField; import org.elasticsearch.datastreams.DataStreamsPlugin; import org.elasticsearch.index.IndexMode; import org.elasticsearch.index.IndexSettings; @@ -48,9 +39,7 @@ import java.util.List; import java.util.Locale; import java.util.Map; -import java.util.Set; import java.util.stream.Collectors; -import java.util.stream.IntStream; import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertAcked; import static org.hamcrest.Matchers.closeTo; @@ -61,115 +50,7 @@ public class GenerativeTSIT extends AbstractEsqlIntegTestCase { private static final Long NUM_DOCS = 1000L; private static final String DATASTREAM_NAME = "tsit_ds"; private List documents = null; - private DataGenerationHelper dataGenerationHelper; - - private static final class DataGenerationHelper { - - private static Object randomDimensionValue(String dimensionName) { - // We use dimensionName to determine the type of the value. - var isNumeric = dimensionName.hashCode() % 5 == 0; - if (isNumeric) { - // Numeric values are sometimes passed as integers and sometimes as strings. - return ESTestCase.randomBoolean() - ? ESTestCase.randomIntBetween(1, 1000) - : Integer.toString(ESTestCase.randomIntBetween(1, 1000)); - } else { - return ESTestCase.randomAlphaOfLengthBetween(1, 20); - } - } - - DataGenerationHelper(long numDocs) { - // Metrics coming into our system have a pre-set group of attributes. - // Making a list-to-set-to-list to ensure uniqueness. - this.numDocs = numDocs; - attributesForMetrics = List.copyOf( - Set.copyOf(ESTestCase.randomList(1, 300, () -> ESTestCase.randomAlphaOfLengthBetween(2, 30))) - ); - numTimeSeries = ESTestCase.randomIntBetween(10, (int) Math.sqrt(numDocs)); - // System.out.println("Total of time series: " + numTimeSeries); - // allTimeSeries contains the list of dimension-values for each time series. - List>> allTimeSeries = IntStream.range(0, numTimeSeries).mapToObj(tsIdx -> { - List dimensionsInMetric = ESTestCase.randomNonEmptySubsetOf(attributesForMetrics); - // TODO: How do we handle the case when there are no dimensions? (i.e. regular randomSubsetof(...) - return dimensionsInMetric.stream().map(attr -> new Tuple<>(attr, randomDimensionValue(attr))).collect(Collectors.toList()); - }).toList(); - - spec = DataGeneratorSpecification.builder() - .withMaxFieldCountPerLevel(0) - .withPredefinedFields( - List.of( - new PredefinedField.WithGenerator( - "@timestamp", - FieldType.DATE, - Map.of("type", "date"), - fieldMapping -> ESTestCase.randomInstantBetween(Instant.now().minusSeconds(2 * 60 * 60), Instant.now()) - ), - new PredefinedField.WithGenerator( - "attributes", - FieldType.PASSTHROUGH, - Map.of("type", "passthrough", "time_series_dimension", true, "dynamic", true, "priority", 1), - (ignored) -> { - var tsDimensions = ESTestCase.randomFrom(allTimeSeries); - return tsDimensions.stream().collect(Collectors.toMap(Tuple::v1, Tuple::v2)); - } - ), - new PredefinedField.WithGenerator( - "metrics", - FieldType.PASSTHROUGH, - Map.of("type", "passthrough", "dynamic", true, "priority", 10), - (ignored) -> Map.of("gauge_hdd.bytes.used", Randomness.get().nextLong(0, 1000000000L)) - ) - ) - ) - .build(); - - documentGenerator = new DocumentGenerator(spec); - template = new TemplateGenerator(spec).generate(); - mapping = new MappingGenerator(spec).generate(template); - var doc = mapping.raw().get("_doc"); - @SuppressWarnings("unchecked") - Map docMap = ((Map) doc); - // Add dynamic templates to the mapping - docMap.put( - "dynamic_templates", - List.of( - Map.of( - "counter_long", - Map.of("path_match", "metrics.counter_*", "mapping", Map.of("type", "long", "time_series_metric", "counter")) - ), - Map.of( - "gauge_long", - Map.of("path_match", "metrics.gauge_*", "mapping", Map.of("type", "long", "time_series_metric", "gauge")) - ), - Map.of( - "counter_double", - Map.of("path_match", "metrics.counter_*", "mapping", Map.of("type", "double", "time_series_metric", "counter")) - ), - Map.of( - "gauge_double", - Map.of("path_match", "metrics.gauge_*", "mapping", Map.of("type", "double", "time_series_metric", "gauge")) - ) - ) - ); - } - - final DataGeneratorSpecification spec; - final DocumentGenerator documentGenerator; - final Template template; - final Mapping mapping; - final int numTimeSeries; - final long numDocs; - final List attributesForMetrics; - - XContentBuilder generateDocument(Map additionalFields) throws IOException { - var doc = XContentFactory.jsonBuilder(); - var generated = documentGenerator.generate(template, mapping); - generated.putAll(additionalFields); - - doc.map(generated); - return doc; - } - } + private TSDataGenerationHelper dataGenerationHelper; Map, List>> groupedRows( List docs, @@ -255,7 +136,7 @@ void putTSDBIndexTemplate(List patterns, @Nullable String mappingString) @Before public void populateIndex() throws IOException { - dataGenerationHelper = new DataGenerationHelper(NUM_DOCS); + dataGenerationHelper = new TSDataGenerationHelper(NUM_DOCS); final XContentBuilder builder = XContentFactory.jsonBuilder(); builder.map(dataGenerationHelper.mapping.raw()); final String jsonMappings = Strings.toString(builder); @@ -274,10 +155,16 @@ public void populateIndex() throws IOException { } } + /** + * This test validates Gauge metrics aggregation with grouping by time bucket and a subset of dimensions. + * The subset of dimensions is a random subset of the dimensions present in the data. + * The test checks that the max, min, and avg values of the gauge metric - and calculates + * the same values from the documents in the group. + */ public void testGroupBySubset() { var dimensions = ESTestCase.randomNonEmptySubsetOf(dataGenerationHelper.attributesForMetrics); var dimensionsStr = dimensions.stream().map(d -> "attributes." + d).collect(Collectors.joining(", ")); - try (var resp = run(String.format(Locale.ROOT, """ + try (EsqlQueryResponse resp = run(String.format(Locale.ROOT, """ TS %s | STATS max(max_over_time(metrics.gauge_hdd.bytes.used)), min(min_over_time(metrics.gauge_hdd.bytes.used)), @@ -294,19 +181,22 @@ public void testGroupBySubset() { }); for (List row : rows) { var rowKey = getRowKey(row, dimensions); - var pointsInGroup = groups.get(rowKey); + List> pointsInGroup = groups.get(rowKey); @SuppressWarnings("unchecked") var docValues = pointsInGroup.stream() .map(doc -> ((Map) doc.get("metrics")).get("gauge_hdd.bytes.used")) .toList(); + // Verify that the first column is the max value (the query gets max, avg, min in that order) docValues.stream().max(Integer::compareTo).ifPresentOrElse(maxValue -> { var res = ((Long) row.getFirst()).intValue(); assertThat(res, equalTo(maxValue)); }, () -> { throw new AssertionError("No values found for group: " + rowKey); }); + // Verify that the second column is the min value (thus why row.get(1)) docValues.stream().min(Integer::compareTo).ifPresentOrElse(minValue -> { var res = ((Long) row.get(1)).intValue(); assertThat(res, equalTo(minValue)); }, () -> { throw new AssertionError("No values found for group: " + rowKey); }); + // Verify that the second column is the avg value (thus why row.get(2)) docValues.stream().mapToDouble(Integer::doubleValue).average().ifPresentOrElse(avgValue -> { var res = (Double) row.get(2); assertThat(res, closeTo(avgValue, res * 0.5)); @@ -315,8 +205,14 @@ public void testGroupBySubset() { } } + /** + * This test validates Gauge metrics aggregation with grouping by time bucket only. + * The test checks that the max, min, and avg values of the gauge metric - and calculates + * the same values from the documents in the group. Because there is no grouping by dimensions, + * there is only one metric group per time bucket. + */ public void testGroupByNothing() { - try (var resp = run(String.format(Locale.ROOT, """ + try (EsqlQueryResponse resp = run(String.format(Locale.ROOT, """ TS %s | STATS max(max_over_time(metrics.gauge_hdd.bytes.used)), @@ -333,15 +229,17 @@ public void testGroupByNothing() { var groups = groupedRows(documents, List.of(), 60); for (List row : rows) { var windowStart = windowStart(row.get(3), 60); - var windowDataPoints = groups.get(List.of(Long.toString(windowStart))); + List> windowDataPoints = groups.get(List.of(Long.toString(windowStart))); @SuppressWarnings("unchecked") var docValues = windowDataPoints.stream() .map(doc -> ((Map) doc.get("metrics")).get("gauge_hdd.bytes.used")) .toList(); + // Verify that the first column is the max value (the query gets max, avg, min in that order) docValues.stream().max(Integer::compareTo).ifPresentOrElse(maxValue -> { var res = ((Long) row.getFirst()).intValue(); assertThat(res, equalTo(maxValue)); }, () -> { throw new AssertionError("No values found for window starting at " + windowStart); }); + // Verify that the second column is the avg value (thus why row.get(1)) docValues.stream().mapToDouble(Integer::doubleValue).average().ifPresentOrElse(avgValue -> { var res = (Double) row.get(1); assertThat(res, closeTo(avgValue, res * 0.5)); @@ -349,6 +247,7 @@ public void testGroupByNothing() { ; throw new AssertionError("No values found for window starting at " + windowStart); }); + // Verify that the third column is the min value (thus why row.get(2)) docValues.stream().min(Integer::compareTo).ifPresentOrElse(minValue -> { var res = ((Long) row.get(2)).intValue(); assertThat(res, equalTo(minValue)); diff --git a/x-pack/plugin/esql/src/internalClusterTest/java/org/elasticsearch/xpack/esql/action/TSDataGenerationHelper.java b/x-pack/plugin/esql/src/internalClusterTest/java/org/elasticsearch/xpack/esql/action/TSDataGenerationHelper.java new file mode 100644 index 0000000000000..1f697dfb28263 --- /dev/null +++ b/x-pack/plugin/esql/src/internalClusterTest/java/org/elasticsearch/xpack/esql/action/TSDataGenerationHelper.java @@ -0,0 +1,141 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License + * 2.0; you may not use this file except in compliance with the Elastic License + * 2.0. + */ + +package org.elasticsearch.xpack.esql.action; + +import org.elasticsearch.common.Randomness; +import org.elasticsearch.common.network.NetworkAddress; +import org.elasticsearch.core.Tuple; +import org.elasticsearch.datageneration.DataGeneratorSpecification; +import org.elasticsearch.datageneration.DocumentGenerator; +import org.elasticsearch.datageneration.FieldType; +import org.elasticsearch.datageneration.Mapping; +import org.elasticsearch.datageneration.MappingGenerator; +import org.elasticsearch.datageneration.Template; +import org.elasticsearch.datageneration.TemplateGenerator; +import org.elasticsearch.datageneration.fields.PredefinedField; +import org.elasticsearch.test.ESTestCase; +import org.elasticsearch.xcontent.XContentBuilder; +import org.elasticsearch.xcontent.XContentFactory; + +import java.io.IOException; +import java.time.Instant; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.stream.Collectors; +import java.util.stream.IntStream; + +class TSDataGenerationHelper { + + private static Object randomDimensionValue(String dimensionName) { + // We use dimensionName to determine the type of the value. + var isNumeric = dimensionName.hashCode() % 5 == 0; + var isIP = dimensionName.hashCode() % 5 == 1; + if (isNumeric) { + // Numeric values are sometimes passed as integers and sometimes as strings. + return ESTestCase.randomBoolean() + ? ESTestCase.randomIntBetween(1, 1000) + : Integer.toString(ESTestCase.randomIntBetween(1, 1000)); + } else if (isIP) { + // TODO: Make sure the schema ingests this as an IP address. + return NetworkAddress.format(ESTestCase.randomIp(ESTestCase.randomBoolean())); + } else { + return ESTestCase.randomAlphaOfLengthBetween(1, 20); + } + } + + TSDataGenerationHelper(long numDocs) { + // Metrics coming into our system have a pre-set group of attributes. + // Making a list-to-set-to-list to ensure uniqueness. + this.numDocs = numDocs; + attributesForMetrics = List.copyOf(Set.copyOf(ESTestCase.randomList(1, 300, () -> ESTestCase.randomAlphaOfLengthBetween(2, 30)))); + numTimeSeries = ESTestCase.randomIntBetween(10, (int) Math.sqrt(numDocs)); + // System.out.println("Total of time series: " + numTimeSeries); + // allTimeSeries contains the list of dimension-values for each time series. + List>> allTimeSeries = IntStream.range(0, numTimeSeries).mapToObj(tsIdx -> { + List dimensionsInMetric = ESTestCase.randomNonEmptySubsetOf(attributesForMetrics); + // TODO: How do we handle the case when there are no dimensions? (i.e. regular randomSubsetof(...) + return dimensionsInMetric.stream().map(attr -> new Tuple<>(attr, randomDimensionValue(attr))).collect(Collectors.toList()); + }).toList(); + + spec = DataGeneratorSpecification.builder() + .withMaxFieldCountPerLevel(0) + .withPredefinedFields( + List.of( + new PredefinedField.WithGenerator( + "@timestamp", + FieldType.DATE, + Map.of("type", "date"), + fieldMapping -> ESTestCase.randomInstantBetween(Instant.now().minusSeconds(2 * 60 * 60), Instant.now()) + ), + new PredefinedField.WithGenerator( + "attributes", + FieldType.PASSTHROUGH, + Map.of("type", "passthrough", "time_series_dimension", true, "dynamic", true, "priority", 1), + (ignored) -> { + var tsDimensions = ESTestCase.randomFrom(allTimeSeries); + return tsDimensions.stream().collect(Collectors.toMap(Tuple::v1, Tuple::v2)); + } + ), + new PredefinedField.WithGenerator( + "metrics", + FieldType.PASSTHROUGH, + Map.of("type", "passthrough", "dynamic", true, "priority", 10), + (ignored) -> Map.of("gauge_hdd.bytes.used", Randomness.get().nextLong(0, 1000000000L)) + ) + ) + ) + .build(); + + documentGenerator = new DocumentGenerator(spec); + template = new TemplateGenerator(spec).generate(); + mapping = new MappingGenerator(spec).generate(template); + var doc = mapping.raw().get("_doc"); + @SuppressWarnings("unchecked") + Map docMap = ((Map) doc); + // Add dynamic templates to the mapping + docMap.put( + "dynamic_templates", + List.of( + Map.of( + "counter_long", + Map.of("path_match", "metrics.counter_*", "mapping", Map.of("type", "long", "time_series_metric", "counter")) + ), + Map.of( + "gauge_long", + Map.of("path_match", "metrics.gauge_*", "mapping", Map.of("type", "long", "time_series_metric", "gauge")) + ), + Map.of( + "counter_double", + Map.of("path_match", "metrics.counter_*", "mapping", Map.of("type", "double", "time_series_metric", "counter")) + ), + Map.of( + "gauge_double", + Map.of("path_match", "metrics.gauge_*", "mapping", Map.of("type", "double", "time_series_metric", "gauge")) + ) + ) + ); + } + + final DataGeneratorSpecification spec; + final DocumentGenerator documentGenerator; + final Template template; + final Mapping mapping; + final int numTimeSeries; + final long numDocs; + final List attributesForMetrics; + + XContentBuilder generateDocument(Map additionalFields) throws IOException { + var doc = XContentFactory.jsonBuilder(); + var generated = documentGenerator.generate(template, mapping); + generated.putAll(additionalFields); + + doc.map(generated); + return doc; + } +} From c384e0c413ba2f144b4b6a49b17ec309f492f64c Mon Sep 17 00:00:00 2001 From: Pablo Date: Tue, 12 Aug 2025 16:23:31 -0700 Subject: [PATCH 8/9] addressing comments --- .../DefaultObjectGenerationHandler.java | 9 +- ...eTSIT.java => RandomizedTimeSeriesIT.java} | 129 +++++++++--------- 2 files changed, 72 insertions(+), 66 deletions(-) rename x-pack/plugin/esql/src/internalClusterTest/java/org/elasticsearch/xpack/esql/action/{GenerativeTSIT.java => RandomizedTimeSeriesIT.java} (69%) diff --git a/test/framework/src/main/java/org/elasticsearch/datageneration/datasource/DefaultObjectGenerationHandler.java b/test/framework/src/main/java/org/elasticsearch/datageneration/datasource/DefaultObjectGenerationHandler.java index 7c3842b9c24e1..10a3037b015ad 100644 --- a/test/framework/src/main/java/org/elasticsearch/datageneration/datasource/DefaultObjectGenerationHandler.java +++ b/test/framework/src/main/java/org/elasticsearch/datageneration/datasource/DefaultObjectGenerationHandler.java @@ -69,15 +69,14 @@ public String generateFieldName() { // UNSIGNED_LONG is excluded because it is mapped as long // and values larger than long fail to parse. private static final Set EXCLUDED_FROM_DYNAMIC_MAPPING = Set.of(FieldType.UNSIGNED_LONG, FieldType.PASSTHROUGH); + private static final Set ALLOWED_FIELD_TYPES = Arrays.stream(FieldType.values()) + .filter(fieldType -> EXCLUDED_FROM_DYNAMIC_MAPPING.contains(fieldType) == false) + .collect(Collectors.toSet()); @Override public DataSourceResponse.FieldTypeGenerator handle(DataSourceRequest.FieldTypeGenerator request) { return new DataSourceResponse.FieldTypeGenerator(() -> { - // All field types minus the excluded ones. - var fieldTypes = Arrays.stream(FieldType.values()) - .filter(fieldType -> EXCLUDED_FROM_DYNAMIC_MAPPING.contains(fieldType) == false) - .collect(Collectors.toSet()); - var fieldType = ESTestCase.randomFrom(fieldTypes); + var fieldType = ESTestCase.randomFrom(ALLOWED_FIELD_TYPES); return new DataSourceResponse.FieldTypeGenerator.FieldTypeInfo(fieldType.toString()); }); } diff --git a/x-pack/plugin/esql/src/internalClusterTest/java/org/elasticsearch/xpack/esql/action/GenerativeTSIT.java b/x-pack/plugin/esql/src/internalClusterTest/java/org/elasticsearch/xpack/esql/action/RandomizedTimeSeriesIT.java similarity index 69% rename from x-pack/plugin/esql/src/internalClusterTest/java/org/elasticsearch/xpack/esql/action/GenerativeTSIT.java rename to x-pack/plugin/esql/src/internalClusterTest/java/org/elasticsearch/xpack/esql/action/RandomizedTimeSeriesIT.java index b71deacffaa9e..55565e3256db9 100644 --- a/x-pack/plugin/esql/src/internalClusterTest/java/org/elasticsearch/xpack/esql/action/GenerativeTSIT.java +++ b/x-pack/plugin/esql/src/internalClusterTest/java/org/elasticsearch/xpack/esql/action/RandomizedTimeSeriesIT.java @@ -39,19 +39,30 @@ import java.util.List; import java.util.Locale; import java.util.Map; +import java.util.Objects; import java.util.stream.Collectors; import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertAcked; import static org.hamcrest.Matchers.closeTo; import static org.hamcrest.Matchers.equalTo; -public class GenerativeTSIT extends AbstractEsqlIntegTestCase { +public class RandomizedTimeSeriesIT extends AbstractEsqlIntegTestCase { - private static final Long NUM_DOCS = 1000L; + private static final Long NUM_DOCS = 2500L; private static final String DATASTREAM_NAME = "tsit_ds"; private List documents = null; private TSDataGenerationHelper dataGenerationHelper; + List> consumeRows(EsqlQueryResponse resp) { + List> rows = new ArrayList<>(); + resp.rows().forEach(rowIter -> { + List row = new ArrayList<>(); + rowIter.forEach(row::add); + rows.add(row); + }); + return rows; + } + Map, List>> groupedRows( List docs, List groupingAttributes, @@ -80,10 +91,38 @@ Map, List>> groupedRows( } static Long windowStart(Object timestampCell, int secondsInWindow) { - // The timestamp is in the 4th column (index 3) + // This calculation looks a little weird, but it simply performs an integer division that + // throws away the remainder of the division by secondsInWindow. It rounds down + // the timestamp to the nearest multiple of secondsInWindow. return Instant.parse((String) timestampCell).toEpochMilli() / 1000 / secondsInWindow * secondsInWindow; } + enum Agg { + MAX, + MIN, + AVG + } + + static List valuesInWindow(List> pointsInGroup, String metricName) { + @SuppressWarnings("unchecked") + var values = pointsInGroup.stream() + .map(doc -> ((Map) doc.get("metrics")).get(metricName)) + .filter(Objects::nonNull) + .collect(Collectors.toList()); + return values; + } + + static Double aggregateValuesInWindow(List values, Agg agg) { + if (values.isEmpty()) { + throw new IllegalArgumentException("No values to aggregate for " + agg + " operation"); + } + return switch (agg) { + case MAX -> Double.valueOf(values.stream().max(Integer::compareTo).orElseThrow()); + case MIN -> Double.valueOf(values.stream().min(Integer::compareTo).orElseThrow()); + case AVG -> values.stream().mapToDouble(Integer::doubleValue).average().orElseThrow(); + }; + } + static List getRowKey(List row, List groupingAttributes) { List rowKey = new ArrayList<>(); for (int i = 0; i < groupingAttributes.size(); i++) { @@ -121,7 +160,7 @@ void putTSDBIndexTemplate(List patterns, @Nullable String mappingString) CompressedXContent mappings = mappingString == null ? null : CompressedXContent.fromJSON(mappingString); // print the mapping TransportPutComposableIndexTemplateAction.Request request = new TransportPutComposableIndexTemplateAction.Request( - GenerativeTSIT.DATASTREAM_NAME + RandomizedTimeSeriesIT.DATASTREAM_NAME ); request.indexTemplate( ComposableIndexTemplate.builder() @@ -173,34 +212,20 @@ public void testGroupBySubset() { | SORT tbucket | LIMIT 1000""", DATASTREAM_NAME, dimensionsStr))) { var groups = groupedRows(documents, dimensions, 60); - List> rows = new ArrayList<>(); - resp.rows().forEach(rowIter -> { - List row = new ArrayList<>(); - rowIter.forEach(row::add); - rows.add(row); - }); + List> rows = consumeRows(resp); for (List row : rows) { var rowKey = getRowKey(row, dimensions); - List> pointsInGroup = groups.get(rowKey); - @SuppressWarnings("unchecked") - var docValues = pointsInGroup.stream() - .map(doc -> ((Map) doc.get("metrics")).get("gauge_hdd.bytes.used")) - .toList(); - // Verify that the first column is the max value (the query gets max, avg, min in that order) - docValues.stream().max(Integer::compareTo).ifPresentOrElse(maxValue -> { - var res = ((Long) row.getFirst()).intValue(); - assertThat(res, equalTo(maxValue)); - }, () -> { throw new AssertionError("No values found for group: " + rowKey); }); - // Verify that the second column is the min value (thus why row.get(1)) - docValues.stream().min(Integer::compareTo).ifPresentOrElse(minValue -> { - var res = ((Long) row.get(1)).intValue(); - assertThat(res, equalTo(minValue)); - }, () -> { throw new AssertionError("No values found for group: " + rowKey); }); - // Verify that the second column is the avg value (thus why row.get(2)) - docValues.stream().mapToDouble(Integer::doubleValue).average().ifPresentOrElse(avgValue -> { - var res = (Double) row.get(2); - assertThat(res, closeTo(avgValue, res * 0.5)); - }, () -> { throw new AssertionError("No values found for group: " + rowKey); }); + var docValues = valuesInWindow(groups.get(rowKey), "gauge_hdd.bytes.used"); + // Max of int is always int, so we can safely round the result. + assertThat(row.getFirst(), equalTo(Math.round(aggregateValuesInWindow(docValues, Agg.MAX)))); + assertThat(row.get(1), equalTo(Math.round(aggregateValuesInWindow(docValues, Agg.MIN)))); + // We check the expected vs ES-calculated average. We divide them to normalize the error + // and allow for a 20% error margin. + Double esAvg = (Double) row.get(2); + Double expectedAvg = aggregateValuesInWindow(docValues, Agg.AVG); + var ratio = esAvg / expectedAvg; + assertThat(ratio, closeTo(1, 0.2)); + } } } @@ -216,42 +241,24 @@ public void testGroupByNothing() { TS %s | STATS max(max_over_time(metrics.gauge_hdd.bytes.used)), - avg(avg_over_time(metrics.gauge_hdd.bytes.used)), - min(min_over_time(metrics.gauge_hdd.bytes.used)) BY tbucket=bucket(@timestamp, 1 minute) + min(min_over_time(metrics.gauge_hdd.bytes.used)), + avg(avg_over_time(metrics.gauge_hdd.bytes.used)) BY tbucket=bucket(@timestamp, 1 minute) | SORT tbucket | LIMIT 1000""", DATASTREAM_NAME))) { - List> rows = new ArrayList<>(); - resp.rows().forEach(rowIter -> { - List row = new ArrayList<>(); - rowIter.forEach(row::add); - rows.add(row); - }); + List> rows = consumeRows(resp); var groups = groupedRows(documents, List.of(), 60); for (List row : rows) { var windowStart = windowStart(row.get(3), 60); - List> windowDataPoints = groups.get(List.of(Long.toString(windowStart))); - @SuppressWarnings("unchecked") - var docValues = windowDataPoints.stream() - .map(doc -> ((Map) doc.get("metrics")).get("gauge_hdd.bytes.used")) - .toList(); - // Verify that the first column is the max value (the query gets max, avg, min in that order) - docValues.stream().max(Integer::compareTo).ifPresentOrElse(maxValue -> { - var res = ((Long) row.getFirst()).intValue(); - assertThat(res, equalTo(maxValue)); - }, () -> { throw new AssertionError("No values found for window starting at " + windowStart); }); - // Verify that the second column is the avg value (thus why row.get(1)) - docValues.stream().mapToDouble(Integer::doubleValue).average().ifPresentOrElse(avgValue -> { - var res = (Double) row.get(1); - assertThat(res, closeTo(avgValue, res * 0.5)); - }, () -> { - ; - throw new AssertionError("No values found for window starting at " + windowStart); - }); - // Verify that the third column is the min value (thus why row.get(2)) - docValues.stream().min(Integer::compareTo).ifPresentOrElse(minValue -> { - var res = ((Long) row.get(2)).intValue(); - assertThat(res, equalTo(minValue)); - }, () -> { throw new AssertionError("No values found for window starting at " + windowStart); }); + var docValues = valuesInWindow(groups.get(List.of(Long.toString(windowStart))), "gauge_hdd.bytes.used"); + // Min and Max of int are always int, so we can safely round the result. + assertThat(row.getFirst(), equalTo(Math.round(aggregateValuesInWindow(docValues, Agg.MAX)))); + assertThat(row.get(1), equalTo(Math.round(aggregateValuesInWindow(docValues, Agg.MIN)))); + // We check the expected vs ES-calculated average. We divide them to normalize the error + // and allow for a 20% error margin. + Double esAvg = (Double) row.get(2); + Double expectedAvg = aggregateValuesInWindow(docValues, Agg.AVG); + var ratio = esAvg / expectedAvg; + assertThat(ratio, closeTo(1, 0.2)); } } } From f5bfbf1765dc48355d0bd9447bdf788151b92eed Mon Sep 17 00:00:00 2001 From: Pablo Date: Tue, 12 Aug 2025 16:33:21 -0700 Subject: [PATCH 9/9] more addressing comments --- .../xpack/esql/action/RandomizedTimeSeriesIT.java | 5 ++--- .../xpack/esql/action/TSDataGenerationHelper.java | 14 ++------------ 2 files changed, 4 insertions(+), 15 deletions(-) diff --git a/x-pack/plugin/esql/src/internalClusterTest/java/org/elasticsearch/xpack/esql/action/RandomizedTimeSeriesIT.java b/x-pack/plugin/esql/src/internalClusterTest/java/org/elasticsearch/xpack/esql/action/RandomizedTimeSeriesIT.java index 55565e3256db9..25d43d218e116 100644 --- a/x-pack/plugin/esql/src/internalClusterTest/java/org/elasticsearch/xpack/esql/action/RandomizedTimeSeriesIT.java +++ b/x-pack/plugin/esql/src/internalClusterTest/java/org/elasticsearch/xpack/esql/action/RandomizedTimeSeriesIT.java @@ -94,7 +94,8 @@ static Long windowStart(Object timestampCell, int secondsInWindow) { // This calculation looks a little weird, but it simply performs an integer division that // throws away the remainder of the division by secondsInWindow. It rounds down // the timestamp to the nearest multiple of secondsInWindow. - return Instant.parse((String) timestampCell).toEpochMilli() / 1000 / secondsInWindow * secondsInWindow; + var timestampSeconds = Instant.parse((String) timestampCell).toEpochMilli() / 1000; + return (timestampSeconds / secondsInWindow) * secondsInWindow; } enum Agg { @@ -156,9 +157,7 @@ void putTSDBIndexTemplate(List patterns, @Nullable String mappingString) Settings.Builder settingsBuilder = Settings.builder(); // Ensure it will be a TSDB data stream settingsBuilder.put(IndexSettings.MODE.getKey(), IndexMode.TIME_SERIES); - settingsBuilder.putList("index.routing_path", List.of("attributes.*")); CompressedXContent mappings = mappingString == null ? null : CompressedXContent.fromJSON(mappingString); - // print the mapping TransportPutComposableIndexTemplateAction.Request request = new TransportPutComposableIndexTemplateAction.Request( RandomizedTimeSeriesIT.DATASTREAM_NAME ); diff --git a/x-pack/plugin/esql/src/internalClusterTest/java/org/elasticsearch/xpack/esql/action/TSDataGenerationHelper.java b/x-pack/plugin/esql/src/internalClusterTest/java/org/elasticsearch/xpack/esql/action/TSDataGenerationHelper.java index 1f697dfb28263..8961419f6b6ba 100644 --- a/x-pack/plugin/esql/src/internalClusterTest/java/org/elasticsearch/xpack/esql/action/TSDataGenerationHelper.java +++ b/x-pack/plugin/esql/src/internalClusterTest/java/org/elasticsearch/xpack/esql/action/TSDataGenerationHelper.java @@ -38,9 +38,7 @@ private static Object randomDimensionValue(String dimensionName) { var isIP = dimensionName.hashCode() % 5 == 1; if (isNumeric) { // Numeric values are sometimes passed as integers and sometimes as strings. - return ESTestCase.randomBoolean() - ? ESTestCase.randomIntBetween(1, 1000) - : Integer.toString(ESTestCase.randomIntBetween(1, 1000)); + return ESTestCase.randomIntBetween(1, 1000); } else if (isIP) { // TODO: Make sure the schema ingests this as an IP address. return NetworkAddress.format(ESTestCase.randomIp(ESTestCase.randomBoolean())); @@ -55,7 +53,6 @@ private static Object randomDimensionValue(String dimensionName) { this.numDocs = numDocs; attributesForMetrics = List.copyOf(Set.copyOf(ESTestCase.randomList(1, 300, () -> ESTestCase.randomAlphaOfLengthBetween(2, 30)))); numTimeSeries = ESTestCase.randomIntBetween(10, (int) Math.sqrt(numDocs)); - // System.out.println("Total of time series: " + numTimeSeries); // allTimeSeries contains the list of dimension-values for each time series. List>> allTimeSeries = IntStream.range(0, numTimeSeries).mapToObj(tsIdx -> { List dimensionsInMetric = ESTestCase.randomNonEmptySubsetOf(attributesForMetrics); @@ -109,15 +106,8 @@ private static Object randomDimensionValue(String dimensionName) { Map.of( "gauge_long", Map.of("path_match", "metrics.gauge_*", "mapping", Map.of("type", "long", "time_series_metric", "gauge")) - ), - Map.of( - "counter_double", - Map.of("path_match", "metrics.counter_*", "mapping", Map.of("type", "double", "time_series_metric", "counter")) - ), - Map.of( - "gauge_double", - Map.of("path_match", "metrics.gauge_*", "mapping", Map.of("type", "double", "time_series_metric", "gauge")) ) + // TODO: Add double and other metric types ) ); }