Skip to content
Merged
6 changes: 6 additions & 0 deletions docs/changelog/127752.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
pr: 127752
summary: Downsampling does not consider passthrough fields as dimensions
area: Downsampling
type: bug
issues:
- 125156
Original file line number Diff line number Diff line change
Expand Up @@ -17,33 +17,21 @@
import org.elasticsearch.cluster.service.ClusterService;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.core.TimeValue;
import org.elasticsearch.datastreams.DataStreamsPlugin;
import org.elasticsearch.datastreams.lifecycle.DataStreamLifecycleService;
import org.elasticsearch.plugins.Plugin;
import org.elasticsearch.search.aggregations.bucket.histogram.DateHistogramInterval;
import org.elasticsearch.test.ClusterServiceUtils;
import org.elasticsearch.test.ESIntegTestCase;
import org.elasticsearch.test.InternalTestCluster;
import org.elasticsearch.xpack.aggregatemetric.AggregateMetricMapperPlugin;
import org.elasticsearch.xpack.core.LocalStateCompositeXPackPlugin;

import java.util.Collection;
import java.util.List;

import static org.elasticsearch.cluster.metadata.IndexMetadata.INDEX_DOWNSAMPLE_STATUS;
import static org.elasticsearch.xpack.downsample.DataStreamLifecycleDriver.getBackingIndices;
import static org.elasticsearch.xpack.downsample.DataStreamLifecycleDriver.putTSDBIndexTemplate;

@ESIntegTestCase.ClusterScope(scope = ESIntegTestCase.Scope.TEST, numDataNodes = 0, numClientNodes = 4)
public class DataStreamLifecycleDownsampleDisruptionIT extends ESIntegTestCase {
public class DataStreamLifecycleDownsampleDisruptionIT extends DownsamplingIntegTestCase {
private static final Logger logger = LogManager.getLogger(DataStreamLifecycleDownsampleDisruptionIT.class);
public static final int DOC_COUNT = 25_000;

@Override
protected Collection<Class<? extends Plugin>> nodePlugins() {
return List.of(DataStreamsPlugin.class, LocalStateCompositeXPackPlugin.class, Downsample.class, AggregateMetricMapperPlugin.class);
}

@Override
protected Settings nodeSettings(int nodeOrdinal, Settings otherSettings) {
Settings.Builder settings = Settings.builder().put(super.nodeSettings(nodeOrdinal, otherSettings));
Expand All @@ -69,8 +57,7 @@ public void testDataStreamLifecycleDownsampleRollingRestart() throws Exception {
)
)
.buildTemplate();
DataStreamLifecycleDriver.setupTSDBDataStreamAndIngestDocs(
client(),
setupTSDBDataStreamAndIngestDocs(
dataStreamName,
"1986-01-08T23:40:53.384Z",
"2022-01-08T23:40:53.384Z",
Expand All @@ -81,9 +68,9 @@ public void testDataStreamLifecycleDownsampleRollingRestart() throws Exception {

// before we rollover we update the index template to remove the start/end time boundaries (they're there just to ease with
// testing so DSL doesn't have to wait for the end_time to lapse)
putTSDBIndexTemplate(client(), dataStreamName, null, null, lifecycle);
client().execute(RolloverAction.INSTANCE, new RolloverRequest(dataStreamName, null)).actionGet();
String sourceIndex = getBackingIndices(client(), dataStreamName).get(0);
putTSDBIndexTemplate(dataStreamName, null, null, lifecycle);
safeGet(client().execute(RolloverAction.INSTANCE, new RolloverRequest(dataStreamName, null)));
String sourceIndex = getDataStreamBackingIndexNames(dataStreamName).get(0);
final String targetIndex = "downsample-5m-" + sourceIndex;

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,40 +9,26 @@

import org.elasticsearch.action.admin.indices.rollover.RolloverAction;
import org.elasticsearch.action.admin.indices.rollover.RolloverRequest;
import org.elasticsearch.action.datastreams.lifecycle.PutDataStreamLifecycleAction;
import org.elasticsearch.action.downsample.DownsampleConfig;
import org.elasticsearch.cluster.metadata.DataStreamLifecycle;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.core.TimeValue;
import org.elasticsearch.datastreams.DataStreamsPlugin;
import org.elasticsearch.datastreams.lifecycle.DataStreamLifecycleService;
import org.elasticsearch.plugins.Plugin;
import org.elasticsearch.search.aggregations.bucket.histogram.DateHistogramInterval;
import org.elasticsearch.test.ESIntegTestCase;
import org.elasticsearch.test.junit.annotations.TestLogging;
import org.elasticsearch.xpack.aggregatemetric.AggregateMetricMapperPlugin;
import org.elasticsearch.xpack.core.LocalStateCompositeXPackPlugin;

import java.util.Collection;
import java.util.HashSet;
import java.util.List;
import java.util.Set;
import java.util.concurrent.TimeUnit;

import static org.elasticsearch.cluster.metadata.ClusterChangedEventUtils.indicesCreated;
import static org.elasticsearch.cluster.metadata.DataStreamTestHelper.backingIndexEqualTo;
import static org.elasticsearch.xpack.downsample.DataStreamLifecycleDriver.getBackingIndices;
import static org.elasticsearch.xpack.downsample.DataStreamLifecycleDriver.putTSDBIndexTemplate;
import static org.hamcrest.Matchers.is;

public class DataStreamLifecycleDownsampleIT extends ESIntegTestCase {
public class DataStreamLifecycleDownsampleIT extends DownsamplingIntegTestCase {
public static final int DOC_COUNT = 50_000;

@Override
protected Collection<Class<? extends Plugin>> nodePlugins() {
return List.of(DataStreamsPlugin.class, LocalStateCompositeXPackPlugin.class, Downsample.class, AggregateMetricMapperPlugin.class);
}

@Override
protected Settings nodeSettings(int nodeOrdinal, Settings otherSettings) {
Settings.Builder settings = Settings.builder().put(super.nodeSettings(nodeOrdinal, otherSettings));
Expand All @@ -69,8 +55,7 @@ public void testDownsampling() throws Exception {
)
.buildTemplate();

DataStreamLifecycleDriver.setupTSDBDataStreamAndIngestDocs(
client(),
setupTSDBDataStreamAndIngestDocs(
dataStreamName,
"1986-01-08T23:40:53.384Z",
"2022-01-08T23:40:53.384Z",
Expand All @@ -79,7 +64,7 @@ public void testDownsampling() throws Exception {
"1990-09-09T18:00:00"
);

List<String> backingIndices = getBackingIndices(client(), dataStreamName);
List<String> backingIndices = getDataStreamBackingIndexNames(dataStreamName);
String firstGenerationBackingIndex = backingIndices.get(0);
String oneSecondDownsampleIndex = "downsample-5m-" + firstGenerationBackingIndex;
String tenSecondsDownsampleIndex = "downsample-10m-" + firstGenerationBackingIndex;
Expand All @@ -96,7 +81,7 @@ public void testDownsampling() throws Exception {
});
// before we rollover we update the index template to remove the start/end time boundaries (they're there just to ease with
// testing so DSL doesn't have to wait for the end_time to lapse)
putTSDBIndexTemplate(client(), dataStreamName, null, null, lifecycle);
putTSDBIndexTemplate(dataStreamName, null, null, lifecycle);

client().execute(RolloverAction.INSTANCE, new RolloverRequest(dataStreamName, null)).actionGet();

Expand All @@ -112,7 +97,7 @@ public void testDownsampling() throws Exception {
}, 30, TimeUnit.SECONDS);

assertBusy(() -> {
List<String> dsBackingIndices = getBackingIndices(client(), dataStreamName);
List<String> dsBackingIndices = getDataStreamBackingIndexNames(dataStreamName);

assertThat(dsBackingIndices.size(), is(2));
String writeIndex = dsBackingIndices.get(1);
Expand Down Expand Up @@ -143,8 +128,7 @@ public void testDownsamplingOnlyExecutesTheLastMatchingRound() throws Exception
)
)
.buildTemplate();
DataStreamLifecycleDriver.setupTSDBDataStreamAndIngestDocs(
client(),
setupTSDBDataStreamAndIngestDocs(
dataStreamName,
"1986-01-08T23:40:53.384Z",
"2022-01-08T23:40:53.384Z",
Expand All @@ -153,7 +137,7 @@ public void testDownsamplingOnlyExecutesTheLastMatchingRound() throws Exception
"1990-09-09T18:00:00"
);

List<String> backingIndices = getBackingIndices(client(), dataStreamName);
List<String> backingIndices = getDataStreamBackingIndexNames(dataStreamName);
String firstGenerationBackingIndex = backingIndices.get(0);
String oneSecondDownsampleIndex = "downsample-5m-" + firstGenerationBackingIndex;
String tenSecondsDownsampleIndex = "downsample-10m-" + firstGenerationBackingIndex;
Expand All @@ -170,7 +154,7 @@ public void testDownsamplingOnlyExecutesTheLastMatchingRound() throws Exception
});
// before we rollover we update the index template to remove the start/end time boundaries (they're there just to ease with
// testing so DSL doesn't have to wait for the end_time to lapse)
putTSDBIndexTemplate(client(), dataStreamName, null, null, lifecycle);
putTSDBIndexTemplate(dataStreamName, null, null, lifecycle);
client().execute(RolloverAction.INSTANCE, new RolloverRequest(dataStreamName, null)).actionGet();

assertBusy(() -> {
Expand All @@ -180,7 +164,7 @@ public void testDownsamplingOnlyExecutesTheLastMatchingRound() throws Exception
}, 30, TimeUnit.SECONDS);

assertBusy(() -> {
List<String> dsBackingIndices = getBackingIndices(client(), dataStreamName);
List<String> dsBackingIndices = getDataStreamBackingIndexNames(dataStreamName);

assertThat(dsBackingIndices.size(), is(2));
String writeIndex = dsBackingIndices.get(1);
Expand Down Expand Up @@ -212,8 +196,7 @@ public void testUpdateDownsampleRound() throws Exception {
)
.buildTemplate();

DataStreamLifecycleDriver.setupTSDBDataStreamAndIngestDocs(
client(),
setupTSDBDataStreamAndIngestDocs(
dataStreamName,
"1986-01-08T23:40:53.384Z",
"2022-01-08T23:40:53.384Z",
Expand All @@ -222,7 +205,7 @@ public void testUpdateDownsampleRound() throws Exception {
"1990-09-09T18:00:00"
);

List<String> backingIndices = getBackingIndices(client(), dataStreamName);
List<String> backingIndices = getDataStreamBackingIndexNames(dataStreamName);
String firstGenerationBackingIndex = backingIndices.get(0);
String oneSecondDownsampleIndex = "downsample-5m-" + firstGenerationBackingIndex;
String tenSecondsDownsampleIndex = "downsample-10m-" + firstGenerationBackingIndex;
Expand All @@ -239,8 +222,8 @@ public void testUpdateDownsampleRound() throws Exception {
});
// before we rollover we update the index template to remove the start/end time boundaries (they're there just to ease with
// testing so DSL doesn't have to wait for the end_time to lapse)
putTSDBIndexTemplate(client(), dataStreamName, null, null, lifecycle);
client().execute(RolloverAction.INSTANCE, new RolloverRequest(dataStreamName, null)).actionGet();
putTSDBIndexTemplate(dataStreamName, null, null, lifecycle);
safeGet(client().execute(RolloverAction.INSTANCE, new RolloverRequest(dataStreamName, null)));

assertBusy(() -> {
assertThat(witnessedDownsamplingIndices.size(), is(1));
Expand All @@ -249,7 +232,7 @@ public void testUpdateDownsampleRound() throws Exception {
}, 30, TimeUnit.SECONDS);

assertBusy(() -> {
List<String> dsBackingIndices = getBackingIndices(client(), dataStreamName);
List<String> dsBackingIndices = getDataStreamBackingIndexNames(dataStreamName);
assertThat(dsBackingIndices.size(), is(2));
String writeIndex = dsBackingIndices.get(1);
assertThat(writeIndex, backingIndexEqualTo(dataStreamName, 2));
Expand All @@ -258,34 +241,27 @@ public void testUpdateDownsampleRound() throws Exception {

// update the lifecycle so that it only has one round, for the same `after` parameter as before, but a different interval
// the different interval should yield a different downsample index name so we expect the data stream lifecycle to get the previous
// `10s` interval downsample index, downsample it to `30s` and replace it in the data stream instead of the `10s` one.
DataStreamLifecycle updatedLifecycle = DataStreamLifecycle.dataLifecycleBuilder()
.downsampling(
List.of(
new DataStreamLifecycle.DownsamplingRound(
TimeValue.timeValueMillis(10),
new DownsampleConfig(new DateHistogramInterval("20m"))
// `10s` interval downsample index, downsample it to `20m` and replace it in the data stream instead of the `10s` one.
updateDataLifecycle(
dataStreamName,
DataStreamLifecycle.dataLifecycleBuilder()
.downsampling(
List.of(
new DataStreamLifecycle.DownsamplingRound(
TimeValue.timeValueMillis(10),
new DownsampleConfig(new DateHistogramInterval("20m"))
)
)
)
)
.build();

client().execute(
PutDataStreamLifecycleAction.INSTANCE,
new PutDataStreamLifecycleAction.Request(
TEST_REQUEST_TIMEOUT,
TEST_REQUEST_TIMEOUT,
new String[] { dataStreamName },
updatedLifecycle
)
.build()
);

String thirtySecondsDownsampleIndex = "downsample-20m-" + firstGenerationBackingIndex;

assertBusy(() -> {
assertThat(indexExists(tenSecondsDownsampleIndex), is(false));

List<String> dsBackingIndices = getBackingIndices(client(), dataStreamName);
List<String> dsBackingIndices = getDataStreamBackingIndexNames(dataStreamName);
assertThat(dsBackingIndices.size(), is(2));
String writeIndex = dsBackingIndices.get(1);
assertThat(writeIndex, backingIndexEqualTo(dataStreamName, 2));
Expand Down
Loading
Loading