diff --git a/x-pack/plugin/esql/src/internalClusterTest/java/org/elasticsearch/xpack/esql/action/RandomizedTimeSeriesIT.java b/x-pack/plugin/esql/src/internalClusterTest/java/org/elasticsearch/xpack/esql/action/RandomizedTimeSeriesIT.java index d9881f0c2f172..4688a825ede8b 100644 --- a/x-pack/plugin/esql/src/internalClusterTest/java/org/elasticsearch/xpack/esql/action/RandomizedTimeSeriesIT.java +++ b/x-pack/plugin/esql/src/internalClusterTest/java/org/elasticsearch/xpack/esql/action/RandomizedTimeSeriesIT.java @@ -22,6 +22,7 @@ import org.elasticsearch.index.IndexMode; import org.elasticsearch.index.IndexSettings; import org.elasticsearch.plugins.Plugin; +import org.elasticsearch.test.ESIntegTestCase; import org.elasticsearch.test.ESTestCase; import org.elasticsearch.xcontent.XContentBuilder; import org.elasticsearch.xcontent.XContentFactory; @@ -43,15 +44,22 @@ import java.util.stream.Collectors; import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertAcked; +import static org.hamcrest.Matchers.allOf; +import static org.hamcrest.Matchers.closeTo; import static org.hamcrest.Matchers.containsInAnyOrder; import static org.hamcrest.Matchers.equalTo; +import static org.hamcrest.Matchers.lessThan; +import static org.hamcrest.Matchers.lessThanOrEqualTo; +import static org.hamcrest.Matchers.not; @SuppressWarnings("unchecked") +@ESIntegTestCase.ClusterScope(maxNumDataNodes = 1) public class RandomizedTimeSeriesIT extends AbstractEsqlIntegTestCase { - private static final Long NUM_DOCS = 2000L; + private static final Long TIME_RANGE_SECONDS = 3600L; private static final String DATASTREAM_NAME = "tsit_ds"; - private List documents = null; + private static final Integer SECONDS_IN_WINDOW = 60; + private List documents; private TSDataGenerationHelper dataGenerationHelper; List> consumeRows(EsqlQueryResponse resp) { @@ -103,7 +111,8 @@ enum Agg { MAX, MIN, AVG, - SUM + SUM, + COUNT } static List valuesInWindow(List> pointsInGroup, String metricName) { @@ -115,15 +124,61 @@ static List valuesInWindow(List> pointsInGroup, Str return values; } - static Double aggregateValuesInWindow(List values, Agg agg) { - if (values.isEmpty()) { - throw new IllegalArgumentException("No values to aggregate for " + agg + " operation"); + static Map>>> groupByTimeseries( + List> pointsInGroup, + String metricName + ) { + return pointsInGroup.stream() + .filter(doc -> doc.containsKey("metrics") && ((Map) doc.get("metrics")).containsKey(metricName)) + .map(doc -> { + String docKey = ((Map) doc.get("attributes")).entrySet() + .stream() + .map(entry -> entry.getKey() + ":" + entry.getValue()) + .collect(Collectors.joining(",")); + var docTs = Instant.parse((String) doc.get("@timestamp")); + var docValue = (Integer) ((Map) doc.get("metrics")).get(metricName); + return new Tuple<>(docKey, new Tuple<>(docTs, docValue)); + }) + .collect(Collectors.groupingBy(Tuple::v1)); + } + + static Object aggregatePerTimeseries( + Map>>> timeseries, + Agg crossAgg, + Agg timeseriesAgg + ) { + var res = timeseries.values().stream().map(timeseriesList -> { + List values = timeseriesList.stream().map(t -> t.v2().v2()).collect(Collectors.toList()); + return aggregateValuesInWindow(values, timeseriesAgg); + }).filter(Objects::nonNull).toList(); + + if (res.isEmpty() && timeseriesAgg == Agg.COUNT) { + res = List.of(0.0); } + + return switch (crossAgg) { + case MAX -> res.isEmpty() + ? null + : Double.valueOf(res.stream().mapToDouble(Double::doubleValue).max().orElseThrow()).longValue(); + case MIN -> res.isEmpty() + ? null + : Double.valueOf(res.stream().mapToDouble(Double::doubleValue).min().orElseThrow()).longValue(); + case AVG -> res.isEmpty() ? null : res.stream().mapToDouble(Double::doubleValue).average().orElseThrow(); + case SUM -> res.isEmpty() ? null : Double.valueOf(res.stream().mapToDouble(Double::doubleValue).sum()).longValue(); + case COUNT -> Integer.toUnsignedLong(res.size()); + }; + } + + static Double aggregateValuesInWindow(List values, Agg agg) { + // if (values.isEmpty()) { + // throw new IllegalArgumentException("No values to aggregate for " + agg + " operation"); + // } return switch (agg) { case MAX -> Double.valueOf(values.stream().max(Integer::compareTo).orElseThrow()); case MIN -> Double.valueOf(values.stream().min(Integer::compareTo).orElseThrow()); case AVG -> values.stream().mapToDouble(Integer::doubleValue).average().orElseThrow(); - case SUM -> values.stream().mapToDouble(Integer::doubleValue).sum(); + case SUM -> values.isEmpty() ? null : values.stream().mapToDouble(Integer::doubleValue).sum(); + case COUNT -> (double) values.size(); }; } @@ -150,10 +205,87 @@ protected Collection> nodePlugins() { return List.of(DataStreamsPlugin.class, LocalStateCompositeXPackPlugin.class, AggregateMetricMapperPlugin.class, EsqlPlugin.class); } + record RateRange(Double lower, Double upper) implements Comparable { + @Override + public int compareTo(RateRange o) { + // Compare first by lower bound, then by upper bound + int cmp = this.lower.compareTo(o.lower); + if (cmp == 0) { + return this.upper.compareTo(o.upper); + } + return cmp; + } + + public int compareToFindingMax(RateRange o) { + // Compare first by upper bound, then by lower bound + int cmp = this.upper.compareTo(o.upper); + if (cmp == 0) { + return this.lower.compareTo(o.lower); + } + return cmp; + } + } + + // A record that holds min, max, avg, count and sum of rates calculated from a timeseries. + record RateStats(Long count, RateRange max, RateRange avg, RateRange min, RateRange sum) {} + + static RateStats calculateRateAggregation( + Collection>>> allTimeseries, + Integer secondsInWindow + ) { + List allRates = allTimeseries.stream().map(timeseries -> { + if (timeseries.size() < 2) { + return null; + } + // Sort the timeseries by timestamp + timeseries.sort((t1, t2) -> t1.v2().v1().compareTo(t2.v2().v1())); + var firstTs = timeseries.getFirst().v2().v1(); + var lastTs = timeseries.getLast().v2().v1(); + Integer lastValue = null; + Double counterGrowth = 0.0; + for (Tuple> point : timeseries) { + var currentValue = point.v2().v2(); + if (currentValue == null) { + throw new IllegalArgumentException("Null value in counter timeseries"); + } + if (lastValue == null) { + lastValue = point.v2().v2(); // Initialize with the first value + continue; + } + if (currentValue > lastValue) { + counterGrowth += currentValue - lastValue; // Incremental growth + } else if (currentValue < lastValue) { + // If the value decreased, we assume a reset and start counting from the current value + counterGrowth += currentValue; + } + lastValue = currentValue; // Update last value for next iteration + } + return new RateRange( + counterGrowth / secondsInWindow * 0.99, // Add 1% tolerance to the lower bound + 1000.0 * counterGrowth / (lastTs.toEpochMilli() - firstTs.toEpochMilli()) * 1.01 // Add 1% tolerance to the upper bound + ); + }).filter(Objects::nonNull).toList(); + if (allRates.isEmpty()) { + return new RateStats(0L, null, null, null, new RateRange(0.0, 0.0)); + } + return new RateStats( + (long) allRates.size(), + allRates.stream().max(RateRange::compareToFindingMax).orElseThrow(), + new RateRange( + allRates.stream().mapToDouble(r -> r.lower).average().orElseThrow(), + allRates.stream().mapToDouble(r -> r.upper).average().orElseThrow() + ), + allRates.stream().min(RateRange::compareTo).orElseThrow(), + new RateRange(allRates.stream().mapToDouble(r -> r.lower).sum(), allRates.stream().mapToDouble(r -> r.upper).sum()) + ); + } + void putTSDBIndexTemplate(List patterns, @Nullable String mappingString) throws IOException { Settings.Builder settingsBuilder = Settings.builder(); // Ensure it will be a TSDB data stream settingsBuilder.put(IndexSettings.MODE.getKey(), IndexMode.TIME_SERIES); + settingsBuilder.put(IndexSettings.TIME_SERIES_START_TIME.getKey(), "2025-07-31T00:00:00Z"); + settingsBuilder.put(IndexSettings.TIME_SERIES_END_TIME.getKey(), "2025-07-31T12:00:00Z"); CompressedXContent mappings = mappingString == null ? null : CompressedXContent.fromJSON(mappingString); TransportPutComposableIndexTemplateAction.Request request = new TransportPutComposableIndexTemplateAction.Request( RandomizedTimeSeriesIT.DATASTREAM_NAME @@ -171,7 +303,7 @@ void putTSDBIndexTemplate(List patterns, @Nullable String mappingString) @Before public void populateIndex() throws IOException { - dataGenerationHelper = new TSDataGenerationHelper(NUM_DOCS); + dataGenerationHelper = new TSDataGenerationHelper(NUM_DOCS, TIME_RANGE_SECONDS); final XContentBuilder builder = XContentFactory.jsonBuilder(); builder.map(dataGenerationHelper.mapping.raw()); final String jsonMappings = Strings.toString(builder); @@ -190,6 +322,103 @@ public void populateIndex() throws IOException { } } + void checkWithin(Double actual, RateRange expected) { + if (expected == null) { + assertThat(actual, equalTo(null)); + return; + } + assertThat(actual, allOf(lessThanOrEqualTo(expected.upper), not(lessThan(expected.lower)))); + } + + void assertNoFailedWindows(List failedWindows, List> rows) { + if (failedWindows.isEmpty() == false) { + var pctFailures = (double) failedWindows.size() / rows.size() * 100; + var failureDetails = String.join("\n", failedWindows); + if (failureDetails.length() > 2000) { + failureDetails = failureDetails.substring(0, 2000) + "\n... (truncated)"; + } + throw new AssertionError("Failed " + failedWindows.size() + " windows(" + pctFailures + "%):\n" + failureDetails); + } + } + + /** + * This test validates Rate metrics aggregation with grouping by time bucket and a subset of dimensions. + * The subset of dimensions is a random subset of the dimensions present in the data. + * The test checks that the count, max, min, and avg values of the rate metric - and calculates + * the same values from the documents in the group. + */ + public void testRateGroupBySubset() { + var dimensions = ESTestCase.randomNonEmptySubsetOf(dataGenerationHelper.attributesForMetrics); + var dimensionsStr = dimensions.stream().map(d -> "attributes." + d).collect(Collectors.joining(", ")); + try (var resp = run(String.format(Locale.ROOT, """ + TS %s + | STATS count(rate(metrics.counter_hdd.bytes.read)), + max(rate(metrics.counter_hdd.bytes.read)), + avg(rate(metrics.counter_hdd.bytes.read)), + min(rate(metrics.counter_hdd.bytes.read)) + BY tbucket=bucket(@timestamp, 1 minute), %s + | SORT tbucket + | LIMIT 1000 + """, DATASTREAM_NAME, dimensionsStr))) { + List> rows = consumeRows(resp); + List failedWindows = new ArrayList<>(); + var groups = groupedRows(documents, dimensions, SECONDS_IN_WINDOW); + for (List row : rows) { + var rowKey = getRowKey(row, dimensions, 4); + var windowDataPoints = groups.get(rowKey); + var docsPerTimeseries = groupByTimeseries(windowDataPoints, "counter_hdd.bytes.read"); + var rateAgg = calculateRateAggregation(docsPerTimeseries.values(), SECONDS_IN_WINDOW); + try { + assertThat(row.getFirst(), equalTo(rateAgg.count)); + checkWithin((Double) row.get(1), rateAgg.max); + checkWithin((Double) row.get(2), rateAgg.avg); + checkWithin((Double) row.get(3), rateAgg.min); + } catch (AssertionError e) { + failedWindows.add("Failed for row:\n" + row + "\nWanted: " + rateAgg + "\nException: " + e.getMessage()); + } + } + assertNoFailedWindows(failedWindows, rows); + } + } + + /** + * This test validates Rate metrics aggregation with grouping by time bucket only. + * The test checks that the count, max, min, and avg values of the rate metric - and calculates + * the same values from the documents in the group. Because there is no grouping by dimensions, + * there is only one metric group per time bucket. + */ + public void testRateGroupByNothing() { + var groups = groupedRows(documents, List.of(), 60); + try (var resp = run(String.format(Locale.ROOT, """ + TS %s + | STATS count(rate(metrics.counter_hdd.bytes.read)), + max(rate(metrics.counter_hdd.bytes.read)), + avg(rate(metrics.counter_hdd.bytes.read)), + min(rate(metrics.counter_hdd.bytes.read)) + BY tbucket=bucket(@timestamp, 1 minute) + | SORT tbucket + | LIMIT 1000 + """, DATASTREAM_NAME))) { + List> rows = consumeRows(resp); + List failedWindows = new ArrayList<>(); + for (List row : rows) { + var windowStart = windowStart(row.get(4), SECONDS_IN_WINDOW); + var windowDataPoints = groups.get(List.of(Long.toString(windowStart))); + var docsPerTimeseries = groupByTimeseries(windowDataPoints, "counter_hdd.bytes.read"); + var rateAgg = calculateRateAggregation(docsPerTimeseries.values(), SECONDS_IN_WINDOW); + try { + assertThat(row.getFirst(), equalTo(rateAgg.count)); + checkWithin((Double) row.get(1), rateAgg.max); + checkWithin((Double) row.get(2), rateAgg.avg); + checkWithin((Double) row.get(3), rateAgg.min); + } catch (AssertionError e) { + failedWindows.add("Failed for row:\n" + row + "\nWanted: " + rateAgg + "\nException: " + e.getMessage()); + } + } + assertNoFailedWindows(failedWindows, rows); + } + } + /** * This test validates Gauge metrics aggregation with grouping by time bucket and a subset of dimensions. * The subset of dimensions is a random subset of the dimensions present in the data. @@ -207,29 +436,32 @@ public void testGroupBySubset() { min(min_over_time(metrics.gauge_hdd.bytes.used)), sum(count_over_time(metrics.gauge_hdd.bytes.used)), sum(sum_over_time(metrics.gauge_hdd.bytes.used)), - avg(avg_over_time(metrics.gauge_hdd.bytes.used)) + avg(avg_over_time(metrics.gauge_hdd.bytes.used)), + count(count_over_time(metrics.gauge_hdd.bytes.used)) BY tbucket=bucket(@timestamp, 1 minute), %s | SORT tbucket | LIMIT 1000""", DATASTREAM_NAME, dimensionsStr))) { var groups = groupedRows(documents, dimensions, 60); List> rows = consumeRows(resp); for (List row : rows) { - var rowKey = getRowKey(row, dimensions, 6); + var rowKey = getRowKey(row, dimensions, 7); + var tsGroups = groupByTimeseries(groups.get(rowKey), "gauge_hdd.bytes.used"); var docValues = valuesInWindow(groups.get(rowKey), "gauge_hdd.bytes.used"); if (row.get(0) instanceof List) { assertThat( - (Collection) row.get(0), + (Collection) row.getFirst(), containsInAnyOrder(docValues.stream().mapToLong(Integer::longValue).boxed().toArray(Long[]::new)) ); } else { - assertThat(row.get(0), equalTo(docValues.getFirst().longValue())); + assertThat(row.getFirst(), equalTo(docValues.isEmpty() ? null : docValues.getFirst().longValue())); } - assertThat(row.get(1), equalTo(Math.round(aggregateValuesInWindow(docValues, Agg.MAX)))); - assertThat(row.get(2), equalTo(Math.round(aggregateValuesInWindow(docValues, Agg.MIN)))); - assertThat(row.get(3), equalTo((long) docValues.size())); - assertThat(row.get(4), equalTo(aggregateValuesInWindow(docValues, Agg.SUM).longValue())); - // TODO: fix then enable - // assertThat(row.get(5), equalTo(aggregateValuesInWindow(docValues, Agg.SUM) / (double) docValues.size())); + assertThat(row.get(1), equalTo(aggregatePerTimeseries(tsGroups, Agg.MAX, Agg.MAX))); + assertThat(row.get(2), equalTo(aggregatePerTimeseries(tsGroups, Agg.MIN, Agg.MIN))); + assertThat(row.get(3), equalTo(aggregatePerTimeseries(tsGroups, Agg.SUM, Agg.COUNT))); + assertThat(row.get(4), equalTo(aggregatePerTimeseries(tsGroups, Agg.SUM, Agg.SUM))); + var avg = (Double) aggregatePerTimeseries(tsGroups, Agg.AVG, Agg.AVG); + assertThat((Double) row.get(5), row.get(5) == null ? equalTo(null) : closeTo(avg, avg * 0.01)); + // assertThat(row.get(6), equalTo(aggregatePerTimeseries(tsGroups, Agg.COUNT, Agg.COUNT).longValue())); } } } @@ -249,29 +481,32 @@ public void testGroupByNothing() { min(min_over_time(metrics.gauge_hdd.bytes.used)), sum(count_over_time(metrics.gauge_hdd.bytes.used)), sum(sum_over_time(metrics.gauge_hdd.bytes.used)), - avg(avg_over_time(metrics.gauge_hdd.bytes.used)) + avg(avg_over_time(metrics.gauge_hdd.bytes.used)), + count(count_over_time(metrics.gauge_hdd.bytes.used)) BY tbucket=bucket(@timestamp, 1 minute) | SORT tbucket | LIMIT 1000""", DATASTREAM_NAME))) { List> rows = consumeRows(resp); var groups = groupedRows(documents, List.of(), 60); for (List row : rows) { - var windowStart = windowStart(row.get(6), 60); + var windowStart = windowStart(row.get(7), 60); List docValues = valuesInWindow(groups.get(List.of(Long.toString(windowStart))), "gauge_hdd.bytes.used"); + var tsGroups = groupByTimeseries(groups.get(List.of(Long.toString(windowStart))), "gauge_hdd.bytes.used"); if (row.get(0) instanceof List) { assertThat( (Collection) row.get(0), containsInAnyOrder(docValues.stream().mapToLong(Integer::longValue).boxed().toArray(Long[]::new)) ); } else { - assertThat(row.get(0), equalTo(docValues.getFirst().longValue())); + assertThat(row.getFirst(), equalTo(docValues.isEmpty() ? null : docValues.getFirst().longValue())); } - assertThat(row.get(1), equalTo(Math.round(aggregateValuesInWindow(docValues, Agg.MAX)))); - assertThat(row.get(2), equalTo(Math.round(aggregateValuesInWindow(docValues, Agg.MIN)))); - assertThat(row.get(3), equalTo((long) docValues.size())); - assertThat(row.get(4), equalTo(aggregateValuesInWindow(docValues, Agg.SUM).longValue())); - // TODO: fix then enable - // assertThat(row.get(5), equalTo(aggregateValuesInWindow(docValues, Agg.SUM) / (double) docValues.size())); + assertThat(row.get(1), equalTo(aggregatePerTimeseries(tsGroups, Agg.MAX, Agg.MAX))); + assertThat(row.get(2), equalTo(aggregatePerTimeseries(tsGroups, Agg.MIN, Agg.MIN))); + assertThat(row.get(3), equalTo(aggregatePerTimeseries(tsGroups, Agg.SUM, Agg.COUNT))); + assertThat(row.get(4), equalTo(aggregatePerTimeseries(tsGroups, Agg.SUM, Agg.SUM))); + var avg = (Double) aggregatePerTimeseries(tsGroups, Agg.AVG, Agg.AVG); + assertThat((Double) row.get(5), row.get(5) == null ? equalTo(null) : closeTo(avg, avg * 0.01)); + // assertThat(row.get(6), equalTo(aggregatePerTimeseries(tsGroups, Agg.COUNT, Agg.COUNT).longValue())); } } } diff --git a/x-pack/plugin/esql/src/internalClusterTest/java/org/elasticsearch/xpack/esql/action/TSDataGenerationHelper.java b/x-pack/plugin/esql/src/internalClusterTest/java/org/elasticsearch/xpack/esql/action/TSDataGenerationHelper.java index 17904e5e28836..5b0914244fe7c 100644 --- a/x-pack/plugin/esql/src/internalClusterTest/java/org/elasticsearch/xpack/esql/action/TSDataGenerationHelper.java +++ b/x-pack/plugin/esql/src/internalClusterTest/java/org/elasticsearch/xpack/esql/action/TSDataGenerationHelper.java @@ -24,6 +24,7 @@ import java.io.IOException; import java.time.Instant; +import java.util.HashMap; import java.util.HashSet; import java.util.List; import java.util.Map; @@ -48,34 +49,37 @@ private static Object randomDimensionValue(String dimensionName) { } } - TSDataGenerationHelper(long numDocs) { + TSDataGenerationHelper(long numDocs, long timeRangeSeconds) { // Metrics coming into our system have a pre-set group of attributes. // Making a list-to-set-to-list to ensure uniqueness. this.numDocs = numDocs; var maxAttributes = (int) Math.sqrt(numDocs); - attributesForMetrics = List.copyOf( + List tempAttributeSet = List.copyOf( Set.copyOf(ESTestCase.randomList(1, maxAttributes, () -> ESTestCase.randomAlphaOfLengthBetween(2, 30))) ); var maxTimeSeries = (int) Math.sqrt(numDocs); var minTimeSeries = Math.max(1, maxTimeSeries / 4); numTimeSeries = ESTestCase.randomIntBetween(minTimeSeries, maxTimeSeries); + Set usedAttributeNames = new HashSet<>(); // allTimeSeries contains the list of dimension-values for each time series. List>> allTimeSeries = IntStream.range(0, numTimeSeries).mapToObj(tsIdx -> { - List dimensionsInMetric = ESTestCase.randomNonEmptySubsetOf(attributesForMetrics); + List dimensionsInMetric = ESTestCase.randomNonEmptySubsetOf(tempAttributeSet); // TODO: How do we handle the case when there are no dimensions? (i.e. regular randomSubsetof(...) + usedAttributeNames.addAll(dimensionsInMetric); return dimensionsInMetric.stream().map(attr -> new Tuple<>(attr, randomDimensionValue(attr))).collect(Collectors.toList()); }).toList(); + attributesForMetrics = List.copyOf(usedAttributeNames); // We want to ensure that all documents have different timestamps. - var now = Instant.now(); + var timeRangeMs = timeRangeSeconds * 1000; + var timeRangeEnd = Instant.parse("2025-07-31T10:00:00Z").toEpochMilli(); + var timeRangeStart = timeRangeEnd - timeRangeMs; var timestampSet = new HashSet(); var regens = 0; for (int i = 0; i < numDocs; i++) { // Random timestamps within the last 90 days. while (true) { - var randomIns = Instant.ofEpochMilli( - ESTestCase.randomLongBetween(now.minusSeconds(60 * 60 * 2).toEpochMilli(), now.toEpochMilli()) - ); + var randomIns = Instant.ofEpochMilli(ESTestCase.randomLongBetween(timeRangeStart, timeRangeEnd)); if (timestampSet.add(randomIns)) { break; } @@ -112,13 +116,25 @@ private static Object randomDimensionValue(String dimensionName) { "metrics", FieldType.PASSTHROUGH, Map.of("type", "passthrough", "dynamic", true, "priority", 10), - (ignored) -> Map.of("gauge_hdd.bytes.used", Randomness.get().nextLong(0, 1000000000L)) + + (ignored) -> { + var res = new HashMap(); + res.put("counter_hdd.bytes.read", Randomness.get().nextLong(0, 1000L)); + // Counter metrics + switch (ESTestCase.randomIntBetween(0, 2)) { + case 0 -> res.put("counter_kwh.consumed", Randomness.get().nextDouble(0, 1000000)); + case 1 -> res.put("gauge_hdd.bytes.used", Randomness.get().nextLong(0, 1000000000L)); + case 2 -> res.put("gauge_cpu.percent", Randomness.get().nextDouble(0, 100)); + } + return res; + } ) ) ) .build(); documentGenerator = new DocumentGenerator(spec); + template = new TemplateGenerator(spec).generate(); mapping = new MappingGenerator(spec).generate(template); var doc = mapping.raw().get("_doc"); @@ -132,11 +148,18 @@ private static Object randomDimensionValue(String dimensionName) { "counter_long", Map.of("path_match", "metrics.counter_*", "mapping", Map.of("type", "long", "time_series_metric", "counter")) ), + Map.of( + "counter_double", + Map.of("path_match", "metrics.counter_*", "mapping", Map.of("type", "double", "time_series_metric", "counter")) + ), Map.of( "gauge_long", Map.of("path_match", "metrics.gauge_*", "mapping", Map.of("type", "long", "time_series_metric", "gauge")) + ), + Map.of( + "gauge_double", + Map.of("path_match", "metrics.gauge_*", "mapping", Map.of("type", "double", "time_series_metric", "gauge")) ) - // TODO: Add double and other metric types ) ); }