Skip to content

Commit ffd089e

Browse files
committed
Fix ReindexDataStreamIndexAction timestamp validation bug in tests (elastic#122274)
Fix race condition test bugs related to the reindex-data-stream-pipeline. For tests that add doc without timestamp, then add mapping with timestamp, ensure green between adding doc and adding mapping. This makes sure that doc has been written to all shards and thus that timestamp validation does not occur while doc is being written to a shard. Delete pipeline in Before method, then wait for it to be re-created by the MigrateTemplateRegistry.
1 parent 3d2c68c commit ffd089e

File tree

2 files changed

+24
-28
lines changed

2 files changed

+24
-28
lines changed

muted-tests.yml

Lines changed: 0 additions & 24 deletions
Original file line numberDiff line numberDiff line change
@@ -329,32 +329,8 @@ tests:
329329
- class: org.elasticsearch.search.CrossClusterSearchUnavailableClusterIT
330330
method: testSearchSkipUnavailable
331331
issue: https://github.com/elastic/elasticsearch/issues/121497
332-
- class: org.elasticsearch.xpack.migrate.action.ReindexDatastreamIndexTransportActionIT
333-
method: testDestIndexContainsDocs
334-
issue: https://github.com/elastic/elasticsearch/issues/121915
335-
- class: org.elasticsearch.xpack.migrate.action.ReindexDatastreamIndexTransportActionIT
336-
method: testTsdbStartEndSet
337-
issue: https://github.com/elastic/elasticsearch/issues/121916
338-
- class: org.elasticsearch.xpack.migrate.action.ReindexDatastreamIndexTransportActionIT
339-
method: testDestIndexNameSet_noDotPrefix
340-
issue: https://github.com/elastic/elasticsearch/issues/121772
341332
- class: org.elasticsearch.ingest.geoip.FullClusterRestartIT
342333
issue: https://github.com/elastic/elasticsearch/issues/121935
343-
- class: org.elasticsearch.xpack.migrate.action.ReindexDatastreamIndexTransportActionIT
344-
method: testTimestampNotAddedIfExists
345-
issue: https://github.com/elastic/elasticsearch/issues/121842
346-
- class: org.elasticsearch.xpack.migrate.action.ReindexDatastreamIndexTransportActionIT
347-
method: testDestIndexNameSet_withDotPrefix
348-
issue: https://github.com/elastic/elasticsearch/issues/121977
349-
- class: org.elasticsearch.xpack.migrate.action.ReindexDatastreamIndexTransportActionIT
350-
method: testFailIfMetadataBlockSet
351-
issue: https://github.com/elastic/elasticsearch/issues/121978
352-
- class: org.elasticsearch.xpack.migrate.action.ReindexDatastreamIndexTransportActionIT
353-
method: testFailIfReadBlockSet
354-
issue: https://github.com/elastic/elasticsearch/issues/122123
355-
- class: org.elasticsearch.xpack.migrate.action.ReindexDatastreamIndexTransportActionIT
356-
method: testTimestamp0AddedIfMissing
357-
issue: https://github.com/elastic/elasticsearch/issues/121745
358334

359335
# Examples:
360336
#

x-pack/plugin/migrate/src/internalClusterTest/java/org/elasticsearch/xpack/migrate/action/ReindexDatastreamIndexTransportActionIT.java

Lines changed: 24 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -49,7 +49,7 @@
4949
import org.elasticsearch.xcontent.XContentType;
5050
import org.elasticsearch.xpack.migrate.MigratePlugin;
5151
import org.elasticsearch.xpack.migrate.MigrateTemplateRegistry;
52-
import org.junit.After;
52+
import org.junit.Before;
5353

5454
import java.io.IOException;
5555
import java.time.Instant;
@@ -67,9 +67,11 @@
6767
import static org.hamcrest.Matchers.equalTo;
6868

6969
public class ReindexDatastreamIndexTransportActionIT extends ESIntegTestCase {
70-
@After
71-
private void cleanup() {
70+
71+
@Before
72+
private void setup() throws Exception {
7273
deletePipeline(MigrateTemplateRegistry.REINDEX_DATA_STREAM_PIPELINE_NAME);
74+
assertBusy(() -> { assertTrue(getPipelines(MigrateTemplateRegistry.REINDEX_DATA_STREAM_PIPELINE_NAME).isFound()); });
7375
}
7476

7577
private static final String MAPPING = """
@@ -114,6 +116,9 @@ public void testTimestamp0AddedIfMissing() {
114116
// add doc without timestamp
115117
addDoc(sourceIndex, "{\"foo\":\"baz\"}");
116118

119+
// wait until doc is written to all shards before adding mapping
120+
ensureHealth(sourceIndex);
121+
117122
// add timestamp to source mapping
118123
indicesAdmin().preparePutMapping(sourceIndex).setSource(DATA_STREAM_MAPPING, XContentType.JSON).get();
119124

@@ -129,6 +134,7 @@ public void testTimestamp0AddedIfMissing() {
129134
}
130135

131136
public void testTimestampNotAddedIfExists() {
137+
132138
var sourceIndex = randomAlphaOfLength(20).toLowerCase(Locale.ROOT);
133139
safeGet(indicesAdmin().create(new CreateIndexRequest(sourceIndex)));
134140

@@ -137,6 +143,9 @@ public void testTimestampNotAddedIfExists() {
137143
var doc = String.format(Locale.ROOT, "{\"%s\":\"%s\"}", DEFAULT_TIMESTAMP_FIELD, time);
138144
addDoc(sourceIndex, doc);
139145

146+
// wait until doc is written to all shards before adding mapping
147+
ensureHealth(sourceIndex);
148+
140149
// add timestamp to source mapping
141150
indicesAdmin().preparePutMapping(sourceIndex).setSource(DATA_STREAM_MAPPING, XContentType.JSON).get();
142151

@@ -184,6 +193,9 @@ public void testCustomReindexPipeline() {
184193
var doc = String.format(Locale.ROOT, "{\"%s\":\"%s\"}", DEFAULT_TIMESTAMP_FIELD, time);
185194
addDoc(sourceIndex, doc);
186195

196+
// wait until doc is written to all shards before adding mapping
197+
ensureHealth(sourceIndex);
198+
187199
// add timestamp to source mapping
188200
indicesAdmin().preparePutMapping(sourceIndex).setSource(DATA_STREAM_MAPPING, XContentType.JSON).get();
189201

@@ -293,7 +305,7 @@ public void testMissingSourceIndex() {
293305
);
294306
}
295307

296-
public void testSettingsAddedBeforeReindex() throws Exception {
308+
public void testSettingsAddedBeforeReindex() {
297309
// start with a static setting
298310
var numShards = randomIntBetween(1, 10);
299311
var staticSettings = Settings.builder().put(IndexMetadata.SETTING_NUMBER_OF_SHARDS, numShards).build();
@@ -604,4 +616,12 @@ void addDoc(String index, String doc) {
604616
bulkRequest.add(new IndexRequest(index).opType(DocWriteRequest.OpType.CREATE).source(doc, XContentType.JSON));
605617
safeGet(client().bulk(bulkRequest));
606618
}
619+
620+
private void ensureHealth(String index) {
621+
if (cluster().numDataNodes() > 1) {
622+
ensureGreen(index);
623+
} else {
624+
ensureYellow(index);
625+
}
626+
}
607627
}

0 commit comments

Comments
 (0)