Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions docs/changelog/127752.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
pr: 127752
summary: Downsampling does not consider passthrough fields as dimensions
area: Downsampling
type: bug
issues:
- 125156
Original file line number Diff line number Diff line change
Expand Up @@ -19,35 +19,23 @@
import org.elasticsearch.cluster.metadata.IndexMetadata;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.core.TimeValue;
import org.elasticsearch.datastreams.DataStreamsPlugin;
import org.elasticsearch.datastreams.lifecycle.DataStreamLifecycleService;
import org.elasticsearch.plugins.Plugin;
import org.elasticsearch.search.aggregations.bucket.histogram.DateHistogramInterval;
import org.elasticsearch.test.ESIntegTestCase;
import org.elasticsearch.test.InternalTestCluster;
import org.elasticsearch.test.junit.annotations.TestLogging;
import org.elasticsearch.xpack.aggregatemetric.AggregateMetricMapperPlugin;
import org.elasticsearch.xpack.core.LocalStateCompositeXPackPlugin;

import java.util.Collection;
import java.util.List;
import java.util.concurrent.TimeUnit;

import static org.elasticsearch.xpack.downsample.DataStreamLifecycleDriver.getBackingIndices;
import static org.elasticsearch.xpack.downsample.DataStreamLifecycleDriver.putTSDBIndexTemplate;
import static org.hamcrest.Matchers.is;
import static org.hamcrest.Matchers.notNullValue;

@ESIntegTestCase.ClusterScope(scope = ESIntegTestCase.Scope.TEST, numDataNodes = 0, numClientNodes = 4)
public class DataStreamLifecycleDownsampleDisruptionIT extends ESIntegTestCase {
public class DataStreamLifecycleDownsampleDisruptionIT extends DownsamplingIntegTestCase {
private static final Logger logger = LogManager.getLogger(DataStreamLifecycleDownsampleDisruptionIT.class);
public static final int DOC_COUNT = 50_000;

@Override
protected Collection<Class<? extends Plugin>> nodePlugins() {
return List.of(DataStreamsPlugin.class, LocalStateCompositeXPackPlugin.class, Downsample.class, AggregateMetricMapperPlugin.class);
}

@Override
protected Settings nodeSettings(int nodeOrdinal, Settings otherSettings) {
Settings.Builder settings = Settings.builder().put(super.nodeSettings(nodeOrdinal, otherSettings));
Expand Down Expand Up @@ -76,8 +64,7 @@ public void testDataStreamLifecycleDownsampleRollingRestart() throws Exception {
)
)
.build();
DataStreamLifecycleDriver.setupTSDBDataStreamAndIngestDocs(
client(),
setupTSDBDataStreamAndIngestDocs(
dataStreamName,
"1986-01-08T23:40:53.384Z",
"2022-01-08T23:40:53.384Z",
Expand All @@ -88,15 +75,15 @@ public void testDataStreamLifecycleDownsampleRollingRestart() throws Exception {

// before we rollover we update the index template to remove the start/end time boundaries (they're there just to ease with
// testing so DSL doesn't have to wait for the end_time to lapse)
putTSDBIndexTemplate(client(), dataStreamName, null, null, lifecycle);
client().execute(RolloverAction.INSTANCE, new RolloverRequest(dataStreamName, null)).actionGet();
putTSDBIndexTemplate(dataStreamName, null, null, lifecycle);
safeGet(client().execute(RolloverAction.INSTANCE, new RolloverRequest(dataStreamName, null)));

// DSL runs every second and it has to tail forcemerge the index (2 seconds) and mark it as read-only (2s) before it starts
// downsampling. This sleep here tries to get as close as possible to having disruption during the downsample execution.
long sleepTime = randomLongBetween(3000, 4500);
logger.info("-> giving data stream lifecycle [{}] millis to make some progress before starting the disruption", sleepTime);
Thread.sleep(sleepTime);
List<String> backingIndices = getBackingIndices(client(), dataStreamName);
List<String> backingIndices = getDataStreamBackingIndexNames(dataStreamName);
// first generation index
String sourceIndex = backingIndices.get(0);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,35 +15,23 @@
import org.elasticsearch.cluster.metadata.DataStreamLifecycle.Downsampling;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.core.TimeValue;
import org.elasticsearch.datastreams.DataStreamsPlugin;
import org.elasticsearch.datastreams.lifecycle.DataStreamLifecycleService;
import org.elasticsearch.plugins.Plugin;
import org.elasticsearch.search.aggregations.bucket.histogram.DateHistogramInterval;
import org.elasticsearch.test.ESIntegTestCase;
import org.elasticsearch.test.junit.annotations.TestLogging;
import org.elasticsearch.xpack.aggregatemetric.AggregateMetricMapperPlugin;
import org.elasticsearch.xpack.core.LocalStateCompositeXPackPlugin;

import java.util.Collection;
import java.util.HashSet;
import java.util.List;
import java.util.Set;
import java.util.concurrent.TimeUnit;

import static org.elasticsearch.cluster.metadata.ClusterChangedEventUtils.indicesCreated;
import static org.elasticsearch.cluster.metadata.DataStreamTestHelper.backingIndexEqualTo;
import static org.elasticsearch.xpack.downsample.DataStreamLifecycleDriver.getBackingIndices;
import static org.elasticsearch.xpack.downsample.DataStreamLifecycleDriver.putTSDBIndexTemplate;
import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertAcked;
import static org.hamcrest.Matchers.is;

public class DataStreamLifecycleDownsampleIT extends ESIntegTestCase {
public class DataStreamLifecycleDownsampleIT extends DownsamplingIntegTestCase {
public static final int DOC_COUNT = 50_000;

@Override
protected Collection<Class<? extends Plugin>> nodePlugins() {
return List.of(DataStreamsPlugin.class, LocalStateCompositeXPackPlugin.class, Downsample.class, AggregateMetricMapperPlugin.class);
}

@Override
protected Settings nodeSettings(int nodeOrdinal, Settings otherSettings) {
Settings.Builder settings = Settings.builder().put(super.nodeSettings(nodeOrdinal, otherSettings));
Expand All @@ -66,8 +54,7 @@ public void testDownsampling() throws Exception {
)
.build();

DataStreamLifecycleDriver.setupTSDBDataStreamAndIngestDocs(
client(),
setupTSDBDataStreamAndIngestDocs(
dataStreamName,
"1986-01-08T23:40:53.384Z",
"2022-01-08T23:40:53.384Z",
Expand All @@ -76,7 +63,7 @@ public void testDownsampling() throws Exception {
"1990-09-09T18:00:00"
);

List<String> backingIndices = getBackingIndices(client(), dataStreamName);
List<String> backingIndices = getDataStreamBackingIndexNames(dataStreamName);
String firstGenerationBackingIndex = backingIndices.get(0);
String oneSecondDownsampleIndex = "downsample-5m-" + firstGenerationBackingIndex;
String tenSecondsDownsampleIndex = "downsample-10m-" + firstGenerationBackingIndex;
Expand All @@ -93,7 +80,7 @@ public void testDownsampling() throws Exception {
});
// before we rollover we update the index template to remove the start/end time boundaries (they're there just to ease with
// testing so DSL doesn't have to wait for the end_time to lapse)
putTSDBIndexTemplate(client(), dataStreamName, null, null, lifecycle);
putTSDBIndexTemplate(dataStreamName, null, null, lifecycle);

client().execute(RolloverAction.INSTANCE, new RolloverRequest(dataStreamName, null)).actionGet();

Expand All @@ -109,7 +96,7 @@ public void testDownsampling() throws Exception {
}, 30, TimeUnit.SECONDS);

assertBusy(() -> {
List<String> dsBackingIndices = getBackingIndices(client(), dataStreamName);
List<String> dsBackingIndices = getDataStreamBackingIndexNames(dataStreamName);

assertThat(dsBackingIndices.size(), is(2));
String writeIndex = dsBackingIndices.get(1);
Expand All @@ -136,8 +123,7 @@ public void testDownsamplingOnlyExecutesTheLastMatchingRound() throws Exception
)
)
.build();
DataStreamLifecycleDriver.setupTSDBDataStreamAndIngestDocs(
client(),
setupTSDBDataStreamAndIngestDocs(
dataStreamName,
"1986-01-08T23:40:53.384Z",
"2022-01-08T23:40:53.384Z",
Expand All @@ -146,7 +132,7 @@ public void testDownsamplingOnlyExecutesTheLastMatchingRound() throws Exception
"1990-09-09T18:00:00"
);

List<String> backingIndices = getBackingIndices(client(), dataStreamName);
List<String> backingIndices = getDataStreamBackingIndexNames(dataStreamName);
String firstGenerationBackingIndex = backingIndices.get(0);
String oneSecondDownsampleIndex = "downsample-5m-" + firstGenerationBackingIndex;
String tenSecondsDownsampleIndex = "downsample-10m-" + firstGenerationBackingIndex;
Expand All @@ -163,7 +149,7 @@ public void testDownsamplingOnlyExecutesTheLastMatchingRound() throws Exception
});
// before we rollover we update the index template to remove the start/end time boundaries (they're there just to ease with
// testing so DSL doesn't have to wait for the end_time to lapse)
putTSDBIndexTemplate(client(), dataStreamName, null, null, lifecycle);
putTSDBIndexTemplate(dataStreamName, null, null, lifecycle);
client().execute(RolloverAction.INSTANCE, new RolloverRequest(dataStreamName, null)).actionGet();

assertBusy(() -> {
Expand All @@ -173,7 +159,7 @@ public void testDownsamplingOnlyExecutesTheLastMatchingRound() throws Exception
}, 30, TimeUnit.SECONDS);

assertBusy(() -> {
List<String> dsBackingIndices = getBackingIndices(client(), dataStreamName);
List<String> dsBackingIndices = getDataStreamBackingIndexNames(dataStreamName);

assertThat(dsBackingIndices.size(), is(2));
String writeIndex = dsBackingIndices.get(1);
Expand Down Expand Up @@ -201,8 +187,7 @@ public void testUpdateDownsampleRound() throws Exception {
)
.build();

DataStreamLifecycleDriver.setupTSDBDataStreamAndIngestDocs(
client(),
setupTSDBDataStreamAndIngestDocs(
dataStreamName,
"1986-01-08T23:40:53.384Z",
"2022-01-08T23:40:53.384Z",
Expand All @@ -211,7 +196,7 @@ public void testUpdateDownsampleRound() throws Exception {
"1990-09-09T18:00:00"
);

List<String> backingIndices = getBackingIndices(client(), dataStreamName);
List<String> backingIndices = getDataStreamBackingIndexNames(dataStreamName);
String firstGenerationBackingIndex = backingIndices.get(0);
String oneSecondDownsampleIndex = "downsample-5m-" + firstGenerationBackingIndex;
String tenSecondsDownsampleIndex = "downsample-10m-" + firstGenerationBackingIndex;
Expand All @@ -228,8 +213,8 @@ public void testUpdateDownsampleRound() throws Exception {
});
// before we rollover we update the index template to remove the start/end time boundaries (they're there just to ease with
// testing so DSL doesn't have to wait for the end_time to lapse)
putTSDBIndexTemplate(client(), dataStreamName, null, null, lifecycle);
client().execute(RolloverAction.INSTANCE, new RolloverRequest(dataStreamName, null)).actionGet();
putTSDBIndexTemplate(dataStreamName, null, null, lifecycle);
safeGet(client().execute(RolloverAction.INSTANCE, new RolloverRequest(dataStreamName, null)));

assertBusy(() -> {
assertThat(witnessedDownsamplingIndices.size(), is(1));
Expand All @@ -238,7 +223,7 @@ public void testUpdateDownsampleRound() throws Exception {
}, 30, TimeUnit.SECONDS);

assertBusy(() -> {
List<String> dsBackingIndices = getBackingIndices(client(), dataStreamName);
List<String> dsBackingIndices = getDataStreamBackingIndexNames(dataStreamName);
assertThat(dsBackingIndices.size(), is(2));
String writeIndex = dsBackingIndices.get(1);
assertThat(writeIndex, backingIndexEqualTo(dataStreamName, 2));
Expand All @@ -247,22 +232,23 @@ public void testUpdateDownsampleRound() throws Exception {

// update the lifecycle so that it only has one round, for the same `after` parameter as before, but a different interval
// the different interval should yield a different downsample index name so we expect the data stream lifecycle to get the previous
// `10s` interval downsample index, downsample it to `30s` and replace it in the data stream instead of the `10s` one.
// `10s` interval downsample index, downsample it to `20m` and replace it in the data stream instead of the `10s` one.
DataStreamLifecycle updatedLifecycle = DataStreamLifecycle.newBuilder()
.downsampling(
new Downsampling(
List.of(new Downsampling.Round(TimeValue.timeValueMillis(10), new DownsampleConfig(new DateHistogramInterval("20m"))))
)
)
.build();

client().execute(
PutDataStreamLifecycleAction.INSTANCE,
new PutDataStreamLifecycleAction.Request(
TEST_REQUEST_TIMEOUT,
TEST_REQUEST_TIMEOUT,
new String[] { dataStreamName },
updatedLifecycle
assertAcked(
client().execute(
PutDataStreamLifecycleAction.INSTANCE,
new PutDataStreamLifecycleAction.Request(
TEST_REQUEST_TIMEOUT,
TEST_REQUEST_TIMEOUT,
new String[] { dataStreamName },
updatedLifecycle
)
)
);

Expand All @@ -271,7 +257,7 @@ public void testUpdateDownsampleRound() throws Exception {
assertBusy(() -> {
assertThat(indexExists(tenSecondsDownsampleIndex), is(false));

List<String> dsBackingIndices = getBackingIndices(client(), dataStreamName);
List<String> dsBackingIndices = getDataStreamBackingIndexNames(dataStreamName);
assertThat(dsBackingIndices.size(), is(2));
String writeIndex = dsBackingIndices.get(1);
assertThat(writeIndex, backingIndexEqualTo(dataStreamName, 2));
Expand Down
Loading