diff --git a/docs/changelog/127752.yaml b/docs/changelog/127752.yaml new file mode 100644 index 0000000000000..b4721af3431a7 --- /dev/null +++ b/docs/changelog/127752.yaml @@ -0,0 +1,6 @@ +pr: 127752 +summary: Downsampling does not consider passthrough fields as dimensions +area: Downsampling +type: bug +issues: + - 125156 diff --git a/x-pack/plugin/downsample/src/internalClusterTest/java/org/elasticsearch/xpack/downsample/DataStreamLifecycleDownsampleDisruptionIT.java b/x-pack/plugin/downsample/src/internalClusterTest/java/org/elasticsearch/xpack/downsample/DataStreamLifecycleDownsampleDisruptionIT.java index afa2e95e1284c..c34e3a811f590 100644 --- a/x-pack/plugin/downsample/src/internalClusterTest/java/org/elasticsearch/xpack/downsample/DataStreamLifecycleDownsampleDisruptionIT.java +++ b/x-pack/plugin/downsample/src/internalClusterTest/java/org/elasticsearch/xpack/downsample/DataStreamLifecycleDownsampleDisruptionIT.java @@ -19,35 +19,23 @@ import org.elasticsearch.cluster.metadata.IndexMetadata; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.core.TimeValue; -import org.elasticsearch.datastreams.DataStreamsPlugin; import org.elasticsearch.datastreams.lifecycle.DataStreamLifecycleService; -import org.elasticsearch.plugins.Plugin; import org.elasticsearch.search.aggregations.bucket.histogram.DateHistogramInterval; import org.elasticsearch.test.ESIntegTestCase; import org.elasticsearch.test.InternalTestCluster; import org.elasticsearch.test.junit.annotations.TestLogging; -import org.elasticsearch.xpack.aggregatemetric.AggregateMetricMapperPlugin; -import org.elasticsearch.xpack.core.LocalStateCompositeXPackPlugin; -import java.util.Collection; import java.util.List; import java.util.concurrent.TimeUnit; -import static org.elasticsearch.xpack.downsample.DataStreamLifecycleDriver.getBackingIndices; -import static org.elasticsearch.xpack.downsample.DataStreamLifecycleDriver.putTSDBIndexTemplate; import static org.hamcrest.Matchers.is; import static org.hamcrest.Matchers.notNullValue; @ESIntegTestCase.ClusterScope(scope = ESIntegTestCase.Scope.TEST, numDataNodes = 0, numClientNodes = 4) -public class DataStreamLifecycleDownsampleDisruptionIT extends ESIntegTestCase { +public class DataStreamLifecycleDownsampleDisruptionIT extends DownsamplingIntegTestCase { private static final Logger logger = LogManager.getLogger(DataStreamLifecycleDownsampleDisruptionIT.class); public static final int DOC_COUNT = 50_000; - @Override - protected Collection> nodePlugins() { - return List.of(DataStreamsPlugin.class, LocalStateCompositeXPackPlugin.class, Downsample.class, AggregateMetricMapperPlugin.class); - } - @Override protected Settings nodeSettings(int nodeOrdinal, Settings otherSettings) { Settings.Builder settings = Settings.builder().put(super.nodeSettings(nodeOrdinal, otherSettings)); @@ -76,8 +64,7 @@ public void testDataStreamLifecycleDownsampleRollingRestart() throws Exception { ) ) .build(); - DataStreamLifecycleDriver.setupTSDBDataStreamAndIngestDocs( - client(), + setupTSDBDataStreamAndIngestDocs( dataStreamName, "1986-01-08T23:40:53.384Z", "2022-01-08T23:40:53.384Z", @@ -88,15 +75,15 @@ public void testDataStreamLifecycleDownsampleRollingRestart() throws Exception { // before we rollover we update the index template to remove the start/end time boundaries (they're there just to ease with // testing so DSL doesn't have to wait for the end_time to lapse) - putTSDBIndexTemplate(client(), dataStreamName, null, null, lifecycle); - client().execute(RolloverAction.INSTANCE, new RolloverRequest(dataStreamName, null)).actionGet(); + putTSDBIndexTemplate(dataStreamName, null, null, lifecycle); + safeGet(client().execute(RolloverAction.INSTANCE, new RolloverRequest(dataStreamName, null))); // DSL runs every second and it has to tail forcemerge the index (2 seconds) and mark it as read-only (2s) before it starts // downsampling. This sleep here tries to get as close as possible to having disruption during the downsample execution. long sleepTime = randomLongBetween(3000, 4500); logger.info("-> giving data stream lifecycle [{}] millis to make some progress before starting the disruption", sleepTime); Thread.sleep(sleepTime); - List backingIndices = getBackingIndices(client(), dataStreamName); + List backingIndices = getDataStreamBackingIndexNames(dataStreamName); // first generation index String sourceIndex = backingIndices.get(0); diff --git a/x-pack/plugin/downsample/src/internalClusterTest/java/org/elasticsearch/xpack/downsample/DataStreamLifecycleDownsampleIT.java b/x-pack/plugin/downsample/src/internalClusterTest/java/org/elasticsearch/xpack/downsample/DataStreamLifecycleDownsampleIT.java index 68c1d7ab178c2..9d73db7089929 100644 --- a/x-pack/plugin/downsample/src/internalClusterTest/java/org/elasticsearch/xpack/downsample/DataStreamLifecycleDownsampleIT.java +++ b/x-pack/plugin/downsample/src/internalClusterTest/java/org/elasticsearch/xpack/downsample/DataStreamLifecycleDownsampleIT.java @@ -15,16 +15,10 @@ import org.elasticsearch.cluster.metadata.DataStreamLifecycle.Downsampling; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.core.TimeValue; -import org.elasticsearch.datastreams.DataStreamsPlugin; import org.elasticsearch.datastreams.lifecycle.DataStreamLifecycleService; -import org.elasticsearch.plugins.Plugin; import org.elasticsearch.search.aggregations.bucket.histogram.DateHistogramInterval; -import org.elasticsearch.test.ESIntegTestCase; import org.elasticsearch.test.junit.annotations.TestLogging; -import org.elasticsearch.xpack.aggregatemetric.AggregateMetricMapperPlugin; -import org.elasticsearch.xpack.core.LocalStateCompositeXPackPlugin; -import java.util.Collection; import java.util.HashSet; import java.util.List; import java.util.Set; @@ -32,18 +26,12 @@ import static org.elasticsearch.cluster.metadata.ClusterChangedEventUtils.indicesCreated; import static org.elasticsearch.cluster.metadata.DataStreamTestHelper.backingIndexEqualTo; -import static org.elasticsearch.xpack.downsample.DataStreamLifecycleDriver.getBackingIndices; -import static org.elasticsearch.xpack.downsample.DataStreamLifecycleDriver.putTSDBIndexTemplate; +import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertAcked; import static org.hamcrest.Matchers.is; -public class DataStreamLifecycleDownsampleIT extends ESIntegTestCase { +public class DataStreamLifecycleDownsampleIT extends DownsamplingIntegTestCase { public static final int DOC_COUNT = 50_000; - @Override - protected Collection> nodePlugins() { - return List.of(DataStreamsPlugin.class, LocalStateCompositeXPackPlugin.class, Downsample.class, AggregateMetricMapperPlugin.class); - } - @Override protected Settings nodeSettings(int nodeOrdinal, Settings otherSettings) { Settings.Builder settings = Settings.builder().put(super.nodeSettings(nodeOrdinal, otherSettings)); @@ -66,8 +54,7 @@ public void testDownsampling() throws Exception { ) .build(); - DataStreamLifecycleDriver.setupTSDBDataStreamAndIngestDocs( - client(), + setupTSDBDataStreamAndIngestDocs( dataStreamName, "1986-01-08T23:40:53.384Z", "2022-01-08T23:40:53.384Z", @@ -76,7 +63,7 @@ public void testDownsampling() throws Exception { "1990-09-09T18:00:00" ); - List backingIndices = getBackingIndices(client(), dataStreamName); + List backingIndices = getDataStreamBackingIndexNames(dataStreamName); String firstGenerationBackingIndex = backingIndices.get(0); String oneSecondDownsampleIndex = "downsample-5m-" + firstGenerationBackingIndex; String tenSecondsDownsampleIndex = "downsample-10m-" + firstGenerationBackingIndex; @@ -93,7 +80,7 @@ public void testDownsampling() throws Exception { }); // before we rollover we update the index template to remove the start/end time boundaries (they're there just to ease with // testing so DSL doesn't have to wait for the end_time to lapse) - putTSDBIndexTemplate(client(), dataStreamName, null, null, lifecycle); + putTSDBIndexTemplate(dataStreamName, null, null, lifecycle); client().execute(RolloverAction.INSTANCE, new RolloverRequest(dataStreamName, null)).actionGet(); @@ -109,7 +96,7 @@ public void testDownsampling() throws Exception { }, 30, TimeUnit.SECONDS); assertBusy(() -> { - List dsBackingIndices = getBackingIndices(client(), dataStreamName); + List dsBackingIndices = getDataStreamBackingIndexNames(dataStreamName); assertThat(dsBackingIndices.size(), is(2)); String writeIndex = dsBackingIndices.get(1); @@ -136,8 +123,7 @@ public void testDownsamplingOnlyExecutesTheLastMatchingRound() throws Exception ) ) .build(); - DataStreamLifecycleDriver.setupTSDBDataStreamAndIngestDocs( - client(), + setupTSDBDataStreamAndIngestDocs( dataStreamName, "1986-01-08T23:40:53.384Z", "2022-01-08T23:40:53.384Z", @@ -146,7 +132,7 @@ public void testDownsamplingOnlyExecutesTheLastMatchingRound() throws Exception "1990-09-09T18:00:00" ); - List backingIndices = getBackingIndices(client(), dataStreamName); + List backingIndices = getDataStreamBackingIndexNames(dataStreamName); String firstGenerationBackingIndex = backingIndices.get(0); String oneSecondDownsampleIndex = "downsample-5m-" + firstGenerationBackingIndex; String tenSecondsDownsampleIndex = "downsample-10m-" + firstGenerationBackingIndex; @@ -163,7 +149,7 @@ public void testDownsamplingOnlyExecutesTheLastMatchingRound() throws Exception }); // before we rollover we update the index template to remove the start/end time boundaries (they're there just to ease with // testing so DSL doesn't have to wait for the end_time to lapse) - putTSDBIndexTemplate(client(), dataStreamName, null, null, lifecycle); + putTSDBIndexTemplate(dataStreamName, null, null, lifecycle); client().execute(RolloverAction.INSTANCE, new RolloverRequest(dataStreamName, null)).actionGet(); assertBusy(() -> { @@ -173,7 +159,7 @@ public void testDownsamplingOnlyExecutesTheLastMatchingRound() throws Exception }, 30, TimeUnit.SECONDS); assertBusy(() -> { - List dsBackingIndices = getBackingIndices(client(), dataStreamName); + List dsBackingIndices = getDataStreamBackingIndexNames(dataStreamName); assertThat(dsBackingIndices.size(), is(2)); String writeIndex = dsBackingIndices.get(1); @@ -201,8 +187,7 @@ public void testUpdateDownsampleRound() throws Exception { ) .build(); - DataStreamLifecycleDriver.setupTSDBDataStreamAndIngestDocs( - client(), + setupTSDBDataStreamAndIngestDocs( dataStreamName, "1986-01-08T23:40:53.384Z", "2022-01-08T23:40:53.384Z", @@ -211,7 +196,7 @@ public void testUpdateDownsampleRound() throws Exception { "1990-09-09T18:00:00" ); - List backingIndices = getBackingIndices(client(), dataStreamName); + List backingIndices = getDataStreamBackingIndexNames(dataStreamName); String firstGenerationBackingIndex = backingIndices.get(0); String oneSecondDownsampleIndex = "downsample-5m-" + firstGenerationBackingIndex; String tenSecondsDownsampleIndex = "downsample-10m-" + firstGenerationBackingIndex; @@ -228,8 +213,8 @@ public void testUpdateDownsampleRound() throws Exception { }); // before we rollover we update the index template to remove the start/end time boundaries (they're there just to ease with // testing so DSL doesn't have to wait for the end_time to lapse) - putTSDBIndexTemplate(client(), dataStreamName, null, null, lifecycle); - client().execute(RolloverAction.INSTANCE, new RolloverRequest(dataStreamName, null)).actionGet(); + putTSDBIndexTemplate(dataStreamName, null, null, lifecycle); + safeGet(client().execute(RolloverAction.INSTANCE, new RolloverRequest(dataStreamName, null))); assertBusy(() -> { assertThat(witnessedDownsamplingIndices.size(), is(1)); @@ -238,7 +223,7 @@ public void testUpdateDownsampleRound() throws Exception { }, 30, TimeUnit.SECONDS); assertBusy(() -> { - List dsBackingIndices = getBackingIndices(client(), dataStreamName); + List dsBackingIndices = getDataStreamBackingIndexNames(dataStreamName); assertThat(dsBackingIndices.size(), is(2)); String writeIndex = dsBackingIndices.get(1); assertThat(writeIndex, backingIndexEqualTo(dataStreamName, 2)); @@ -247,7 +232,7 @@ public void testUpdateDownsampleRound() throws Exception { // update the lifecycle so that it only has one round, for the same `after` parameter as before, but a different interval // the different interval should yield a different downsample index name so we expect the data stream lifecycle to get the previous - // `10s` interval downsample index, downsample it to `30s` and replace it in the data stream instead of the `10s` one. + // `10s` interval downsample index, downsample it to `20m` and replace it in the data stream instead of the `10s` one. DataStreamLifecycle updatedLifecycle = DataStreamLifecycle.newBuilder() .downsampling( new Downsampling( @@ -255,14 +240,15 @@ public void testUpdateDownsampleRound() throws Exception { ) ) .build(); - - client().execute( - PutDataStreamLifecycleAction.INSTANCE, - new PutDataStreamLifecycleAction.Request( - TEST_REQUEST_TIMEOUT, - TEST_REQUEST_TIMEOUT, - new String[] { dataStreamName }, - updatedLifecycle + assertAcked( + client().execute( + PutDataStreamLifecycleAction.INSTANCE, + new PutDataStreamLifecycleAction.Request( + TEST_REQUEST_TIMEOUT, + TEST_REQUEST_TIMEOUT, + new String[] { dataStreamName }, + updatedLifecycle + ) ) ); @@ -271,7 +257,7 @@ public void testUpdateDownsampleRound() throws Exception { assertBusy(() -> { assertThat(indexExists(tenSecondsDownsampleIndex), is(false)); - List dsBackingIndices = getBackingIndices(client(), dataStreamName); + List dsBackingIndices = getDataStreamBackingIndexNames(dataStreamName); assertThat(dsBackingIndices.size(), is(2)); String writeIndex = dsBackingIndices.get(1); assertThat(writeIndex, backingIndexEqualTo(dataStreamName, 2)); diff --git a/x-pack/plugin/downsample/src/internalClusterTest/java/org/elasticsearch/xpack/downsample/DataStreamLifecycleDriver.java b/x-pack/plugin/downsample/src/internalClusterTest/java/org/elasticsearch/xpack/downsample/DataStreamLifecycleDriver.java deleted file mode 100644 index 64fb9e8f85b9b..0000000000000 --- a/x-pack/plugin/downsample/src/internalClusterTest/java/org/elasticsearch/xpack/downsample/DataStreamLifecycleDriver.java +++ /dev/null @@ -1,215 +0,0 @@ -/* - * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one - * or more contributor license agreements. Licensed under the Elastic License - * 2.0; you may not use this file except in compliance with the Elastic License - * 2.0. - */ - -package org.elasticsearch.xpack.downsample; - -import org.apache.logging.log4j.LogManager; -import org.apache.logging.log4j.Logger; -import org.elasticsearch.ElasticsearchException; -import org.elasticsearch.action.DocWriteRequest; -import org.elasticsearch.action.admin.indices.template.put.TransportPutComposableIndexTemplateAction; -import org.elasticsearch.action.bulk.BulkItemResponse; -import org.elasticsearch.action.bulk.BulkRequestBuilder; -import org.elasticsearch.action.bulk.BulkResponse; -import org.elasticsearch.action.datastreams.GetDataStreamAction; -import org.elasticsearch.action.index.IndexRequest; -import org.elasticsearch.action.support.WriteRequest; -import org.elasticsearch.client.internal.Client; -import org.elasticsearch.cluster.metadata.ComposableIndexTemplate; -import org.elasticsearch.cluster.metadata.DataStreamLifecycle; -import org.elasticsearch.cluster.metadata.IndexMetadata; -import org.elasticsearch.cluster.metadata.Template; -import org.elasticsearch.common.Strings; -import org.elasticsearch.common.compress.CompressedXContent; -import org.elasticsearch.common.settings.Settings; -import org.elasticsearch.common.time.DateFormatter; -import org.elasticsearch.core.Nullable; -import org.elasticsearch.index.Index; -import org.elasticsearch.index.IndexMode; -import org.elasticsearch.index.IndexSettings; -import org.elasticsearch.index.engine.VersionConflictEngineException; -import org.elasticsearch.search.aggregations.bucket.histogram.DateHistogramInterval; -import org.elasticsearch.xcontent.XContentBuilder; -import org.elasticsearch.xcontent.XContentFactory; - -import java.io.IOException; -import java.time.LocalDateTime; -import java.time.ZoneId; -import java.util.ArrayList; -import java.util.List; -import java.util.Map; -import java.util.function.Supplier; - -import static org.elasticsearch.test.ESTestCase.TEST_REQUEST_TIMEOUT; -import static org.elasticsearch.test.ESTestCase.indexSettings; -import static org.elasticsearch.test.ESTestCase.randomAlphaOfLength; -import static org.elasticsearch.test.ESTestCase.randomFrom; -import static org.elasticsearch.test.ESTestCase.randomIntBetween; -import static org.elasticsearch.test.ESTestCase.randomLongBetween; -import static org.elasticsearch.xcontent.XContentFactory.jsonBuilder; -import static org.hamcrest.Matchers.is; -import static org.junit.Assert.assertThat; - -/** - * A collection of methods to help with the setup of data stream lifecycle downsample tests. - */ -public class DataStreamLifecycleDriver { - private static final Logger logger = LogManager.getLogger(DataStreamLifecycleDriver.class); - private static final DateFormatter DATE_FORMATTER = DateFormatter.forPattern("yyyy-MM-dd'T'HH:mm:ss.SSSXXX"); - public static final String FIELD_TIMESTAMP = "@timestamp"; - public static final String FIELD_DIMENSION_1 = "dimension_kw"; - public static final String FIELD_DIMENSION_2 = "dimension_long"; - public static final String FIELD_METRIC_COUNTER = "counter"; - - public static int setupTSDBDataStreamAndIngestDocs( - Client client, - String dataStreamName, - @Nullable String startTime, - @Nullable String endTime, - DataStreamLifecycle lifecycle, - int docCount, - String firstDocTimestamp - ) throws IOException { - putTSDBIndexTemplate(client, dataStreamName + "*", startTime, endTime, lifecycle); - return indexDocuments(client, dataStreamName, docCount, firstDocTimestamp); - } - - public static List getBackingIndices(Client client, String dataStreamName) { - GetDataStreamAction.Request getDataStreamRequest = new GetDataStreamAction.Request( - TEST_REQUEST_TIMEOUT, - new String[] { dataStreamName } - ); - GetDataStreamAction.Response getDataStreamResponse = client.execute(GetDataStreamAction.INSTANCE, getDataStreamRequest).actionGet(); - assertThat(getDataStreamResponse.getDataStreams().isEmpty(), is(false)); - assertThat(getDataStreamResponse.getDataStreams().get(0).getDataStream().getName(), is(dataStreamName)); - return getDataStreamResponse.getDataStreams().get(0).getDataStream().getIndices().stream().map(Index::getName).toList(); - } - - public static void putTSDBIndexTemplate( - Client client, - String pattern, - @Nullable String startTime, - @Nullable String endTime, - DataStreamLifecycle lifecycle - ) throws IOException { - Settings.Builder settings = indexSettings(1, 0).put(IndexSettings.MODE.getKey(), IndexMode.TIME_SERIES) - .putList(IndexMetadata.INDEX_ROUTING_PATH.getKey(), List.of(FIELD_DIMENSION_1)); - - if (Strings.hasText(startTime)) { - settings.put(IndexSettings.TIME_SERIES_START_TIME.getKey(), startTime); - } - - if (Strings.hasText(endTime)) { - settings.put(IndexSettings.TIME_SERIES_END_TIME.getKey(), endTime); - } - - XContentBuilder mapping = jsonBuilder().startObject().startObject("_doc").startObject("properties"); - mapping.startObject(FIELD_TIMESTAMP).field("type", "date").endObject(); - - mapping.startObject(FIELD_DIMENSION_1).field("type", "keyword").field("time_series_dimension", true).endObject(); - mapping.startObject(FIELD_DIMENSION_2).field("type", "long").field("time_series_dimension", true).endObject(); - - mapping.startObject(FIELD_METRIC_COUNTER) - .field("type", "double") /* numeric label indexed as a metric */ - .field("time_series_metric", "counter") - .endObject(); - - mapping.endObject().endObject().endObject(); - - putComposableIndexTemplate( - client, - "id1", - CompressedXContent.fromJSON(Strings.toString(mapping)), - List.of(pattern), - settings.build(), - null, - lifecycle - ); - } - - private static void putComposableIndexTemplate( - Client client, - String id, - @Nullable CompressedXContent mappings, - List patterns, - @Nullable Settings settings, - @Nullable Map metadata, - @Nullable DataStreamLifecycle lifecycle - ) throws IOException { - TransportPutComposableIndexTemplateAction.Request request = new TransportPutComposableIndexTemplateAction.Request(id); - request.indexTemplate( - ComposableIndexTemplate.builder() - .indexPatterns(patterns) - .template(Template.builder().settings(settings).mappings(mappings).lifecycle(lifecycle)) - .metadata(metadata) - .dataStreamTemplate(new ComposableIndexTemplate.DataStreamTemplate()) - .build() - ); - client.execute(TransportPutComposableIndexTemplateAction.TYPE, request).actionGet(); - } - - private static int indexDocuments(Client client, String dataStreamName, int docCount, String firstDocTimestamp) { - final Supplier sourceSupplier = () -> { - long startTime = LocalDateTime.parse(firstDocTimestamp).atZone(ZoneId.of("UTC")).toInstant().toEpochMilli(); - final String ts = randomDateForInterval(new DateHistogramInterval("1s"), startTime); - double counterValue = DATE_FORMATTER.parseMillis(ts); - final List dimensionValues = new ArrayList<>(5); - for (int j = 0; j < randomIntBetween(1, 5); j++) { - dimensionValues.add(randomAlphaOfLength(6)); - } - try { - return XContentFactory.jsonBuilder() - .startObject() - .field(FIELD_TIMESTAMP, ts) - .field(FIELD_DIMENSION_1, randomFrom(dimensionValues)) - .field(FIELD_DIMENSION_2, randomIntBetween(1, 10)) - .field(FIELD_METRIC_COUNTER, counterValue) - .endObject(); - } catch (IOException e) { - throw new RuntimeException(e); - } - }; - return bulkIndex(client, dataStreamName, sourceSupplier, docCount); - } - - private static String randomDateForInterval(final DateHistogramInterval interval, final long startTime) { - long endTime = startTime + 10 * interval.estimateMillis(); - return randomDateForRange(startTime, endTime); - } - - private static String randomDateForRange(long start, long end) { - return DATE_FORMATTER.formatMillis(randomLongBetween(start, end)); - } - - private static int bulkIndex(Client client, String dataStreamName, Supplier docSourceSupplier, int docCount) { - BulkRequestBuilder bulkRequestBuilder = client.prepareBulk(); - bulkRequestBuilder.setRefreshPolicy(WriteRequest.RefreshPolicy.IMMEDIATE); - for (int i = 0; i < docCount; i++) { - IndexRequest indexRequest = new IndexRequest(dataStreamName).opType(DocWriteRequest.OpType.CREATE); - XContentBuilder source = docSourceSupplier.get(); - indexRequest.source(source); - bulkRequestBuilder.add(indexRequest); - } - BulkResponse bulkResponse = bulkRequestBuilder.get(); - int duplicates = 0; - for (BulkItemResponse response : bulkResponse.getItems()) { - if (response.isFailed()) { - if (response.getFailure().getCause() instanceof VersionConflictEngineException) { - // A duplicate event was created by random generator. We should not fail for this - // reason. - logger.debug("-> failed to insert a duplicate: [{}]", response.getFailureMessage()); - duplicates++; - } else { - throw new ElasticsearchException("Failed to index data: " + bulkResponse.buildFailureMessage()); - } - } - } - int docsIndexed = docCount - duplicates; - logger.info("-> Indexed [{}] documents. Dropped [{}] duplicates.", docsIndexed, duplicates); - return docsIndexed; - } -} diff --git a/x-pack/plugin/downsample/src/internalClusterTest/java/org/elasticsearch/xpack/downsample/DownsampleIT.java b/x-pack/plugin/downsample/src/internalClusterTest/java/org/elasticsearch/xpack/downsample/DownsampleIT.java new file mode 100644 index 0000000000000..f9ab19c83baab --- /dev/null +++ b/x-pack/plugin/downsample/src/internalClusterTest/java/org/elasticsearch/xpack/downsample/DownsampleIT.java @@ -0,0 +1,94 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License + * 2.0; you may not use this file except in compliance with the Elastic License + * 2.0. + */ + +package org.elasticsearch.xpack.downsample; + +import org.elasticsearch.action.admin.indices.get.GetIndexRequest; +import org.elasticsearch.action.admin.indices.rollover.RolloverRequest; +import org.elasticsearch.action.downsample.DownsampleAction; +import org.elasticsearch.action.downsample.DownsampleConfig; +import org.elasticsearch.cluster.metadata.IndexMetadata; +import org.elasticsearch.common.settings.Settings; +import org.elasticsearch.search.aggregations.bucket.histogram.DateHistogramInterval; +import org.elasticsearch.xcontent.XContentBuilder; +import org.elasticsearch.xcontent.XContentFactory; + +import java.io.IOException; +import java.time.Instant; +import java.util.List; +import java.util.function.Supplier; + +import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertAcked; +import static org.elasticsearch.xpack.downsample.DownsampleDataStreamTests.TIMEOUT; +import static org.hamcrest.Matchers.equalTo; + +public class DownsampleIT extends DownsamplingIntegTestCase { + + public void testDownsamplingPassthroughDimensions() throws Exception { + String dataStreamName = "metrics-foo"; + // Set up template + putTSDBIndexTemplate("my-template", List.of("metrics-foo"), null, """ + { + "properties": { + "attributes": { + "type": "passthrough", + "priority": 10, + "time_series_dimension": true + }, + "metrics.cpu_usage": { + "type": "double", + "time_series_metric": "counter" + } + } + } + """, null, null); + + // Create data stream by indexing documents + final Instant now = Instant.now(); + Supplier sourceSupplier = () -> { + String ts = randomDateForRange(now.minusSeconds(60 * 60).toEpochMilli(), now.plusSeconds(60 * 29).toEpochMilli()); + try { + return XContentFactory.jsonBuilder() + .startObject() + .field("@timestamp", ts) + .field("attributes.host.name", randomFrom("host1", "host2", "host3")) + .field("metrics.cpu_usage", randomDouble()) + .endObject(); + } catch (IOException e) { + throw new RuntimeException(e); + } + }; + bulkIndex(dataStreamName, sourceSupplier, 100); + // Rollover to ensure the index we will downsample is not the write index + assertAcked(client().admin().indices().rolloverIndex(new RolloverRequest(dataStreamName, null))); + List backingIndices = waitForDataStreamBackingIndices(dataStreamName, 2); + String sourceIndex = backingIndices.get(0); + String interval = "5m"; + String targetIndex = "downsample-" + interval + "-" + sourceIndex; + // Set the source index to read-only state + assertAcked( + indicesAdmin().prepareUpdateSettings(sourceIndex) + .setSettings(Settings.builder().put(IndexMetadata.INDEX_BLOCKS_WRITE_SETTING.getKey(), true).build()) + ); + + DownsampleConfig downsampleConfig = new DownsampleConfig(new DateHistogramInterval(interval)); + assertAcked( + client().execute( + DownsampleAction.INSTANCE, + new DownsampleAction.Request(TEST_REQUEST_TIMEOUT, sourceIndex, targetIndex, TIMEOUT, downsampleConfig) + ) + ); + + assertBusy(() -> { + var response = indicesAdmin().getIndex(new GetIndexRequest().indices(targetIndex)).actionGet(); + String downsampleStatus = response.getSetting(targetIndex, IndexMetadata.INDEX_DOWNSAMPLE_STATUS.getKey()); + assertThat(downsampleStatus, equalTo("success")); + }); + + assertDownsampleIndexFieldsAndDimensions(sourceIndex, targetIndex, downsampleConfig); + } +} diff --git a/x-pack/plugin/downsample/src/internalClusterTest/java/org/elasticsearch/xpack/downsample/DownsamplingIntegTestCase.java b/x-pack/plugin/downsample/src/internalClusterTest/java/org/elasticsearch/xpack/downsample/DownsamplingIntegTestCase.java new file mode 100644 index 0000000000000..76306054df54d --- /dev/null +++ b/x-pack/plugin/downsample/src/internalClusterTest/java/org/elasticsearch/xpack/downsample/DownsamplingIntegTestCase.java @@ -0,0 +1,324 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License + * 2.0; you may not use this file except in compliance with the Elastic License + * 2.0. + */ + +package org.elasticsearch.xpack.downsample; + +import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.Logger; +import org.elasticsearch.ElasticsearchException; +import org.elasticsearch.action.DocWriteRequest; +import org.elasticsearch.action.admin.cluster.stats.MappingVisitor; +import org.elasticsearch.action.admin.indices.get.GetIndexResponse; +import org.elasticsearch.action.admin.indices.template.put.TransportPutComposableIndexTemplateAction; +import org.elasticsearch.action.bulk.BulkItemResponse; +import org.elasticsearch.action.bulk.BulkRequestBuilder; +import org.elasticsearch.action.bulk.BulkResponse; +import org.elasticsearch.action.downsample.DownsampleConfig; +import org.elasticsearch.action.index.IndexRequest; +import org.elasticsearch.action.support.WriteRequest; +import org.elasticsearch.cluster.metadata.ComposableIndexTemplate; +import org.elasticsearch.cluster.metadata.DataStreamLifecycle; +import org.elasticsearch.cluster.metadata.IndexMetadata; +import org.elasticsearch.cluster.metadata.Template; +import org.elasticsearch.common.Strings; +import org.elasticsearch.common.compress.CompressedXContent; +import org.elasticsearch.common.settings.Settings; +import org.elasticsearch.common.time.DateFormatter; +import org.elasticsearch.core.Nullable; +import org.elasticsearch.datastreams.DataStreamsPlugin; +import org.elasticsearch.index.IndexMode; +import org.elasticsearch.index.IndexSettings; +import org.elasticsearch.index.engine.VersionConflictEngineException; +import org.elasticsearch.index.mapper.DateFieldMapper; +import org.elasticsearch.index.mapper.MapperService; +import org.elasticsearch.index.mapper.TimeSeriesParams; +import org.elasticsearch.indices.IndicesService; +import org.elasticsearch.plugins.Plugin; +import org.elasticsearch.search.aggregations.bucket.histogram.DateHistogramInterval; +import org.elasticsearch.test.ESIntegTestCase; +import org.elasticsearch.xcontent.XContentBuilder; +import org.elasticsearch.xcontent.XContentFactory; +import org.elasticsearch.xpack.aggregatemetric.AggregateMetricMapperPlugin; +import org.elasticsearch.xpack.core.LocalStateCompositeXPackPlugin; + +import java.io.IOException; +import java.time.LocalDateTime; +import java.time.ZoneId; +import java.util.ArrayList; +import java.util.Collection; +import java.util.HashMap; +import java.util.HashSet; +import java.util.List; +import java.util.Locale; +import java.util.Map; +import java.util.Set; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.function.Supplier; + +import static org.elasticsearch.index.mapper.TimeSeriesParams.TIME_SERIES_DIMENSION_PARAM; +import static org.elasticsearch.index.mapper.TimeSeriesParams.TIME_SERIES_METRIC_PARAM; +import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertAcked; +import static org.hamcrest.Matchers.arrayContaining; +import static org.hamcrest.Matchers.equalTo; + +/** + * Base test case for downsampling integration tests. It provides helper methods to: + * - set up templates and data streams + * - index documents + * - to assert the correctness of mapping, settings etc. + */ +public abstract class DownsamplingIntegTestCase extends ESIntegTestCase { + private static final Logger logger = LogManager.getLogger(DownsamplingIntegTestCase.class); + static final DateFormatter DATE_FORMATTER = DateFormatter.forPattern("yyyy-MM-dd'T'HH:mm:ss.SSSXXX"); + static final String FIELD_TIMESTAMP = "@timestamp"; + static final String FIELD_DIMENSION_KEYWORD = "dimension_kw"; + static final String FIELD_DIMENSION_LONG = "dimension_long"; + static final String FIELD_METRIC_COUNTER_DOUBLE = "counter"; + + @Override + protected Collection> nodePlugins() { + return List.of(DataStreamsPlugin.class, LocalStateCompositeXPackPlugin.class, Downsample.class, AggregateMetricMapperPlugin.class); + } + + /** + * Sets up a TSDB data stream and ingests the specified number of documents + * @return the count of indexed documents + */ + public int setupTSDBDataStreamAndIngestDocs( + String dataStreamName, + @Nullable String startTime, + @Nullable String endTime, + DataStreamLifecycle lifecycle, + int docCount, + String firstDocTimestamp + ) throws IOException { + putTSDBIndexTemplate(dataStreamName + "*", startTime, endTime, lifecycle); + return indexDocuments(dataStreamName, docCount, firstDocTimestamp); + } + + /** + * Creates an index template that will create TSDB composable templates + */ + public void putTSDBIndexTemplate(String pattern, @Nullable String startTime, @Nullable String endTime, DataStreamLifecycle lifecycle) + throws IOException { + Settings.Builder settings = indexSettings(1, 0).putList( + IndexMetadata.INDEX_ROUTING_PATH.getKey(), + List.of(FIELD_DIMENSION_KEYWORD) + ); + + if (Strings.hasText(startTime)) { + settings.put(IndexSettings.TIME_SERIES_START_TIME.getKey(), startTime); + } + + if (Strings.hasText(endTime)) { + settings.put(IndexSettings.TIME_SERIES_END_TIME.getKey(), endTime); + } + + String mappingString = String.format(Locale.ROOT, """ + { + "properties": { + "@timestamp": { + "type": "date" + }, + "%s": { + "type": "keyword", + "time_series_dimension": true + }, + "%s": { + "type": "long", + "time_series_dimension": true + }, + "%s": { + "type": "double", + "time_series_metric": "counter" + } + } + }""", FIELD_DIMENSION_KEYWORD, FIELD_DIMENSION_LONG, FIELD_METRIC_COUNTER_DOUBLE); + + putTSDBIndexTemplate("id1", List.of(pattern), settings.build(), mappingString, lifecycle, null); + } + + void putTSDBIndexTemplate( + String id, + List patterns, + @Nullable Settings settings, + @Nullable String mappingString, + @Nullable DataStreamLifecycle lifecycle, + @Nullable Map metadata + ) throws IOException { + Settings.Builder settingsBuilder = Settings.builder(); + if (settings != null) { + settingsBuilder.put(settings); + } + // Ensure it will be a TSDB data stream + settingsBuilder.put(IndexSettings.MODE.getKey(), IndexMode.TIME_SERIES); + CompressedXContent mappings = mappingString == null ? null : CompressedXContent.fromJSON(mappingString); + TransportPutComposableIndexTemplateAction.Request request = new TransportPutComposableIndexTemplateAction.Request(id); + request.indexTemplate( + ComposableIndexTemplate.builder() + .indexPatterns(patterns) + .template(Template.builder().settings(settingsBuilder).mappings(mappings).lifecycle(lifecycle)) + .metadata(metadata) + .dataStreamTemplate(new ComposableIndexTemplate.DataStreamTemplate()) + .build() + ); + assertAcked(client().execute(TransportPutComposableIndexTemplateAction.TYPE, request)); + } + + /** + * Creates and indexes the specified number of documents using the docSource supplier. + * @return the count of indexed documents + */ + int bulkIndex(String dataStreamName, Supplier docSourceSupplier, int docCount) { + BulkRequestBuilder bulkRequestBuilder = client().prepareBulk(); + bulkRequestBuilder.setRefreshPolicy(WriteRequest.RefreshPolicy.IMMEDIATE); + for (int i = 0; i < docCount; i++) { + IndexRequest indexRequest = new IndexRequest(dataStreamName).opType(DocWriteRequest.OpType.CREATE); + XContentBuilder source = docSourceSupplier.get(); + indexRequest.source(source); + bulkRequestBuilder.add(indexRequest); + } + BulkResponse bulkResponse = bulkRequestBuilder.get(); + int duplicates = 0; + for (BulkItemResponse response : bulkResponse.getItems()) { + if (response.isFailed()) { + if (response.getFailure().getCause() instanceof VersionConflictEngineException) { + // A duplicate event was created by random generator. We should not fail for this + // reason. + logger.debug("-> failed to insert a duplicate: [{}]", response.getFailureMessage()); + duplicates++; + } else { + throw new ElasticsearchException("Failed to index data: " + bulkResponse.buildFailureMessage()); + } + } + } + int docsIndexed = docCount - duplicates; + logger.info("-> Indexed [{}] documents. Dropped [{}] duplicates.", docsIndexed, duplicates); + return docsIndexed; + } + + int indexDocuments(String dataStreamName, int docCount, String firstDocTimestamp) { + final Supplier sourceSupplier = () -> { + long startTime = LocalDateTime.parse(firstDocTimestamp).atZone(ZoneId.of("UTC")).toInstant().toEpochMilli(); + final String ts = randomDateForInterval(new DateHistogramInterval("1s"), startTime); + double counterValue = DATE_FORMATTER.parseMillis(ts); + final List dimensionValues = new ArrayList<>(5); + for (int j = 0; j < randomIntBetween(1, 5); j++) { + dimensionValues.add(randomAlphaOfLength(6)); + } + try { + return XContentFactory.jsonBuilder() + .startObject() + .field(FIELD_TIMESTAMP, ts) + .field(FIELD_DIMENSION_KEYWORD, randomFrom(dimensionValues)) + .field(FIELD_DIMENSION_LONG, randomIntBetween(1, 10)) + .field(FIELD_METRIC_COUNTER_DOUBLE, counterValue) + .endObject(); + } catch (IOException e) { + throw new RuntimeException(e); + } + }; + return bulkIndex(dataStreamName, sourceSupplier, docCount); + } + + String randomDateForInterval(final DateHistogramInterval interval, final long startTime) { + long endTime = startTime + 10 * interval.estimateMillis(); + return randomDateForRange(startTime, endTime); + } + + String randomDateForRange(long start, long end) { + return DATE_FORMATTER.formatMillis(randomLongBetween(start, end)); + } + + List waitForDataStreamBackingIndices(String dataStreamName, int backingIndexCount) throws Exception { + List backingIndices = new ArrayList<>(); + assertBusy(() -> { + var indices = getDataStreamBackingIndexNames(dataStreamName); + assertThat(indices.size(), equalTo(backingIndexCount)); + backingIndices.addAll(indices); + }); + return backingIndices; + } + + /** + * Currently we assert the correctness of metrics and dimensions. The assertions can be extended when needed. + */ + @SuppressWarnings("unchecked") + void assertDownsampleIndexFieldsAndDimensions(String sourceIndex, String downsampleIndex, DownsampleConfig config) throws Exception { + GetIndexResponse getIndexResponse = indicesAdmin().prepareGetIndex().setIndices(sourceIndex, downsampleIndex).get(); + assertThat(getIndexResponse.indices(), arrayContaining(sourceIndex, downsampleIndex)); + + // Retrieve field information for the metric fields + final Map sourceIndexMappings = getIndexResponse.mappings().get(sourceIndex).getSourceAsMap(); + final Map downsampleIndexMappings = getIndexResponse.mappings().get(downsampleIndex).getSourceAsMap(); + + final MapperService mapperService = getMapperServiceForIndex(sourceIndex); + final CompressedXContent sourceIndexCompressedXContent = new CompressedXContent(sourceIndexMappings); + mapperService.merge(MapperService.SINGLE_MAPPING_NAME, sourceIndexCompressedXContent, MapperService.MergeReason.INDEX_TEMPLATE); + + // Collect expected mappings for fields and dimensions + Map metricFields = new HashMap<>(); + Map dimensionFields = new HashMap<>(); + MappingVisitor.visitMapping(sourceIndexMappings, (field, fieldMapping) -> { + if (isTimeSeriesMetric(fieldMapping)) { + metricFields.put(field, TimeSeriesParams.MetricType.fromString(fieldMapping.get(TIME_SERIES_METRIC_PARAM).toString())); + } else if (hasTimeSeriesDimensionTrue(fieldMapping)) { + // This includes passthrough objects + dimensionFields.put(field, fieldMapping.get("type").toString()); + } + }); + + AtomicBoolean encounteredTimestamp = new AtomicBoolean(false); + Set encounteredMetrics = new HashSet<>(); + Set encounteredDimensions = new HashSet<>(); + MappingVisitor.visitMapping(downsampleIndexMappings, (field, fieldMapping) -> { + if (field.equals(config.getTimestampField())) { + encounteredTimestamp.set(true); + assertThat(fieldMapping.get("type"), equalTo(DateFieldMapper.CONTENT_TYPE)); + Map dateTimeMeta = (Map) fieldMapping.get("meta"); + assertThat(dateTimeMeta.get("time_zone"), equalTo(config.getTimeZone())); + assertThat(dateTimeMeta.get(config.getIntervalType()), equalTo(config.getInterval().toString())); + } else if (metricFields.containsKey(field)) { + encounteredMetrics.add(field); + TimeSeriesParams.MetricType metricType = metricFields.get(field); + switch (metricType) { + case COUNTER -> assertThat(fieldMapping.get("type"), equalTo("double")); + case GAUGE -> assertThat(fieldMapping.get("type"), equalTo("aggregate_metric_double")); + default -> fail("Unsupported field type"); + } + assertThat(fieldMapping.get("time_series_metric"), equalTo(metricType.toString())); + } else if (dimensionFields.containsKey(field)) { + encounteredDimensions.add(field); + assertThat(fieldMapping.get("type"), equalTo(dimensionFields.get(field))); + assertThat(fieldMapping.get("time_series_dimension"), equalTo(true)); + } + }); + assertThat(encounteredTimestamp.get(), equalTo(true)); + assertThat(encounteredMetrics, equalTo(metricFields.keySet())); + assertThat(encounteredDimensions, equalTo(dimensionFields.keySet())); + } + + private static MapperService getMapperServiceForIndex(String sourceIndex) throws IOException { + final IndexMetadata indexMetadata = clusterAdmin().prepareState(TEST_REQUEST_TIMEOUT) + .get() + .getState() + .getMetadata() + .index(sourceIndex); + final IndicesService indicesService = internalCluster().getAnyMasterNodeInstance(IndicesService.class); + return indicesService.createIndexMapperServiceForValidation(indexMetadata); + } + + boolean isTimeSeriesMetric(final Map fieldMapping) { + final String metricType = (String) fieldMapping.get(TIME_SERIES_METRIC_PARAM); + return metricType != null + && List.of(TimeSeriesParams.MetricType.values()).contains(TimeSeriesParams.MetricType.fromString(metricType)); + } + + private static boolean hasTimeSeriesDimensionTrue(Map fieldMapping) { + return Boolean.TRUE.equals(fieldMapping.get(TIME_SERIES_DIMENSION_PARAM)); + } +} diff --git a/x-pack/plugin/downsample/src/internalClusterTest/java/org/elasticsearch/xpack/downsample/ILMDownsampleDisruptionIT.java b/x-pack/plugin/downsample/src/internalClusterTest/java/org/elasticsearch/xpack/downsample/ILMDownsampleDisruptionIT.java index 5fba98b765a6b..9b2a851de5a27 100644 --- a/x-pack/plugin/downsample/src/internalClusterTest/java/org/elasticsearch/xpack/downsample/ILMDownsampleDisruptionIT.java +++ b/x-pack/plugin/downsample/src/internalClusterTest/java/org/elasticsearch/xpack/downsample/ILMDownsampleDisruptionIT.java @@ -9,24 +9,16 @@ import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; -import org.elasticsearch.action.DocWriteRequest; import org.elasticsearch.action.admin.indices.get.GetIndexRequest; import org.elasticsearch.action.admin.indices.get.GetIndexResponse; import org.elasticsearch.action.admin.indices.settings.get.GetSettingsRequest; import org.elasticsearch.action.admin.indices.settings.put.UpdateSettingsRequest; -import org.elasticsearch.action.bulk.BulkItemResponse; -import org.elasticsearch.action.bulk.BulkRequestBuilder; -import org.elasticsearch.action.bulk.BulkResponse; import org.elasticsearch.action.downsample.DownsampleConfig; -import org.elasticsearch.action.index.IndexRequest; -import org.elasticsearch.action.support.WriteRequest; import org.elasticsearch.cluster.metadata.IndexMetadata; import org.elasticsearch.common.settings.Settings; -import org.elasticsearch.common.time.DateFormatter; import org.elasticsearch.core.TimeValue; import org.elasticsearch.index.IndexMode; import org.elasticsearch.index.IndexSettings; -import org.elasticsearch.index.engine.VersionConflictEngineException; import org.elasticsearch.index.mapper.DateFieldMapper; import org.elasticsearch.index.query.MatchAllQueryBuilder; import org.elasticsearch.plugins.Plugin; @@ -56,6 +48,7 @@ import java.util.Locale; import java.util.Map; import java.util.concurrent.TimeUnit; +import java.util.function.Supplier; import static org.elasticsearch.core.Strings.format; import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertAcked; @@ -65,13 +58,8 @@ import static org.hamcrest.Matchers.equalTo; @ESIntegTestCase.ClusterScope(scope = ESIntegTestCase.Scope.TEST, numDataNodes = 0, numClientNodes = 4) -public class ILMDownsampleDisruptionIT extends ESIntegTestCase { +public class ILMDownsampleDisruptionIT extends DownsamplingIntegTestCase { private static final Logger logger = LogManager.getLogger(ILMDownsampleDisruptionIT.class); - private static final DateFormatter DATE_FORMATTER = DateFormatter.forPattern("yyyy-MM-dd'T'HH:mm:ss.SSSXXX"); - private static final String FIELD_TIMESTAMP = "@timestamp"; - private static final String FIELD_DIMENSION_1 = "dimension_kw"; - private static final String FIELD_DIMENSION_2 = "dimension_long"; - private static final String FIELD_METRIC_COUNTER = "counter"; private static final String POLICY_NAME = "mypolicy"; public static final int DOC_COUNT = 10_000; @@ -99,7 +87,7 @@ protected Settings nodeSettings(int nodeOrdinal, Settings otherSettings) { public void setup(final String sourceIndex, int numOfShards, int numOfReplicas, long startTime) throws IOException { final Settings.Builder settings = indexSettings(numOfShards, numOfReplicas).put(IndexSettings.MODE.getKey(), IndexMode.TIME_SERIES) - .putList(IndexMetadata.INDEX_ROUTING_PATH.getKey(), List.of(FIELD_DIMENSION_1)) + .putList(IndexMetadata.INDEX_ROUTING_PATH.getKey(), List.of(FIELD_DIMENSION_KEYWORD)) .put( IndexSettings.TIME_SERIES_START_TIME.getKey(), DateFieldMapper.DEFAULT_DATE_TIME_FORMATTER.formatMillis(Instant.ofEpochMilli(startTime).toEpochMilli()) @@ -113,10 +101,10 @@ public void setup(final String sourceIndex, int numOfShards, int numOfReplicas, final XContentBuilder mapping = jsonBuilder().startObject().startObject("_doc").startObject("properties"); mapping.startObject(FIELD_TIMESTAMP).field("type", "date").endObject(); - mapping.startObject(FIELD_DIMENSION_1).field("type", "keyword").field("time_series_dimension", true).endObject(); - mapping.startObject(FIELD_DIMENSION_2).field("type", "long").field("time_series_dimension", true).endObject(); + mapping.startObject(FIELD_DIMENSION_KEYWORD).field("type", "keyword").field("time_series_dimension", true).endObject(); + mapping.startObject(FIELD_DIMENSION_LONG).field("type", "long").field("time_series_dimension", true).endObject(); - mapping.startObject(FIELD_METRIC_COUNTER) + mapping.startObject(FIELD_METRIC_COUNTER_DOUBLE) .field("type", "double") /* numeric label indexed as a metric */ .field("time_series_metric", "counter") .endObject(); @@ -149,20 +137,24 @@ public void testILMDownsampleRollingRestart() throws Exception { long startTime = LocalDateTime.parse("1993-09-09T18:00:00").atZone(ZoneId.of("UTC")).toInstant().toEpochMilli(); setup(sourceIndex, 1, 0, startTime); final DownsampleConfig config = new DownsampleConfig(randomInterval()); - final SourceSupplier sourceSupplier = () -> { + final Supplier sourceSupplier = () -> { final String ts = randomDateForInterval(config.getInterval(), startTime); double counterValue = DATE_FORMATTER.parseMillis(ts); final List dimensionValues = new ArrayList<>(5); for (int j = 0; j < randomIntBetween(1, 5); j++) { dimensionValues.add(randomAlphaOfLength(6)); } - return XContentFactory.jsonBuilder() - .startObject() - .field(FIELD_TIMESTAMP, ts) - .field(FIELD_DIMENSION_1, randomFrom(dimensionValues)) - .field(FIELD_DIMENSION_2, randomIntBetween(1, 10)) - .field(FIELD_METRIC_COUNTER, counterValue) - .endObject(); + try { + return XContentFactory.jsonBuilder() + .startObject() + .field(FIELD_TIMESTAMP, ts) + .field(FIELD_DIMENSION_KEYWORD, randomFrom(dimensionValues)) + .field(FIELD_DIMENSION_LONG, randomIntBetween(1, 10)) + .field(FIELD_METRIC_COUNTER_DOUBLE, counterValue) + .endObject(); + } catch (IOException e) { + throw new RuntimeException(e); + } }; int indexedDocs = bulkIndex(sourceIndex, sourceSupplier, DOC_COUNT); @@ -217,46 +209,4 @@ private void assertTargetIndex(final InternalTestCluster cluster, final String t } ); } - - private int bulkIndex(final String indexName, final SourceSupplier sourceSupplier, int docCount) throws IOException { - BulkRequestBuilder bulkRequestBuilder = internalCluster().client().prepareBulk(); - bulkRequestBuilder.setRefreshPolicy(WriteRequest.RefreshPolicy.IMMEDIATE); - for (int i = 0; i < docCount; i++) { - IndexRequest indexRequest = new IndexRequest(indexName).opType(DocWriteRequest.OpType.CREATE); - XContentBuilder source = sourceSupplier.get(); - indexRequest.source(source); - bulkRequestBuilder.add(indexRequest); - } - BulkResponse bulkResponse = bulkRequestBuilder.get(); - int duplicates = 0; - for (BulkItemResponse response : bulkResponse.getItems()) { - if (response.isFailed()) { - if (response.getFailure().getCause() instanceof VersionConflictEngineException) { - // A duplicate event was created by random generator. We should not fail for this - // reason. - logger.debug("We tried to insert a duplicate: [{}]", response.getFailureMessage()); - duplicates++; - } else { - fail("Failed to index data: " + bulkResponse.buildFailureMessage()); - } - } - } - int docsIndexed = docCount - duplicates; - logger.info("Indexed [{}] documents. Dropped [{}] duplicates.", docsIndexed, duplicates); - return docsIndexed; - } - - private String randomDateForInterval(final DateHistogramInterval interval, final long startTime) { - long endTime = startTime + 10 * interval.estimateMillis(); - return randomDateForRange(startTime, endTime); - } - - private String randomDateForRange(long start, long end) { - return DATE_FORMATTER.formatMillis(randomLongBetween(start, end)); - } - - @FunctionalInterface - public interface SourceSupplier { - XContentBuilder get() throws IOException; - } } diff --git a/x-pack/plugin/downsample/src/main/java/org/elasticsearch/xpack/downsample/DimensionFieldValueFetcher.java b/x-pack/plugin/downsample/src/main/java/org/elasticsearch/xpack/downsample/DimensionFieldValueFetcher.java index 342b6e57c9e51..43e860ae78456 100644 --- a/x-pack/plugin/downsample/src/main/java/org/elasticsearch/xpack/downsample/DimensionFieldValueFetcher.java +++ b/x-pack/plugin/downsample/src/main/java/org/elasticsearch/xpack/downsample/DimensionFieldValueFetcher.java @@ -40,7 +40,7 @@ static List create(final SearchExecutionContext context, fina List fetchers = new ArrayList<>(); for (String dimension : dimensions) { MappedFieldType fieldType = context.getFieldType(dimension); - assert fieldType != null : "Unknown dimension field type for dimension field: [" + dimension + "]"; + assert fieldType != null : "Unknown type for dimension field: [" + dimension + "]"; if (context.fieldExistsInIndex(fieldType.name())) { final IndexFieldData fieldData = context.getForField(fieldType, MappedFieldType.FielddataOperation.SEARCH); diff --git a/x-pack/plugin/downsample/src/main/java/org/elasticsearch/xpack/downsample/TimeseriesFieldTypeHelper.java b/x-pack/plugin/downsample/src/main/java/org/elasticsearch/xpack/downsample/TimeseriesFieldTypeHelper.java index e539722481df8..93425be0b7d40 100644 --- a/x-pack/plugin/downsample/src/main/java/org/elasticsearch/xpack/downsample/TimeseriesFieldTypeHelper.java +++ b/x-pack/plugin/downsample/src/main/java/org/elasticsearch/xpack/downsample/TimeseriesFieldTypeHelper.java @@ -10,8 +10,10 @@ import org.elasticsearch.index.mapper.MappedFieldType; import org.elasticsearch.index.mapper.MapperService; import org.elasticsearch.index.mapper.MappingLookup; +import org.elasticsearch.index.mapper.PassThroughObjectMapper; import org.elasticsearch.index.mapper.TimeSeriesParams; import org.elasticsearch.index.mapper.flattened.FlattenedFieldMapper; +import org.elasticsearch.search.suggest.completion.context.ContextMapping; import java.io.IOException; import java.util.List; @@ -47,7 +49,11 @@ public boolean isTimeSeriesMetric(final String unused, final Map fiel } public boolean isTimeSeriesDimension(final String unused, final Map fieldMapping) { - return Boolean.TRUE.equals(fieldMapping.get(TIME_SERIES_DIMENSION_PARAM)); + return Boolean.TRUE.equals(fieldMapping.get(TIME_SERIES_DIMENSION_PARAM)) && isPassthroughField(fieldMapping) == false; + } + + public static boolean isPassthroughField(final Map fieldMapping) { + return PassThroughObjectMapper.CONTENT_TYPE.equals(fieldMapping.get(ContextMapping.FIELD_TYPE)); } public List extractFlattenedDimensions(final String field, final Map fieldMapping) {