Skip to content

Commit ef53c7e

Browse files
committed
Fix ReindexDataStreamIndexAction timestamp validation bug in tests (elastic#122274)
Fix race condition test bugs related to the reindex-data-stream-pipeline. For tests that add doc without timestamp, then add mapping with timestamp, ensure green between adding doc and adding mapping. This makes sure that doc has been written to all shards and thus that timestamp validation does not occur while doc is being written to a shard. Delete pipeline in Before method, then wait for it to be re-created by the MigrateTemplateRegistry. (cherry picked from commit 05a2003) # Conflicts: # muted-tests.yml # x-pack/plugin/migrate/src/internalClusterTest/java/org/elasticsearch/xpack/migrate/action/ReindexDatastreamIndexTransportActionIT.java
1 parent 8d55836 commit ef53c7e

File tree

2 files changed

+23
-7
lines changed

2 files changed

+23
-7
lines changed

muted-tests.yml

Lines changed: 0 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -492,9 +492,6 @@ tests:
492492
- class: org.elasticsearch.search.fetch.subphase.highlight.HighlightBuilderTests
493493
method: testInvalidMaxAnalyzedOffset
494494
issue: https://github.com/elastic/elasticsearch/issues/121361
495-
- class: org.elasticsearch.xpack.migrate.action.ReindexDatastreamIndexTransportActionIT
496-
method: testTimestamp0AddedIfMissing
497-
issue: https://github.com/elastic/elasticsearch/issues/121745
498495
- class: org.elasticsearch.xpack.security.profile.ProfileIntegTests
499496
method: testGetUsersWithProfileUid
500497
issue: https://github.com/elastic/elasticsearch/issues/121483

x-pack/plugin/migrate/src/internalClusterTest/java/org/elasticsearch/xpack/migrate/action/ReindexDatastreamIndexTransportActionIT.java

Lines changed: 23 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -51,7 +51,7 @@
5151
import org.elasticsearch.xcontent.XContentType;
5252
import org.elasticsearch.xpack.migrate.MigratePlugin;
5353
import org.elasticsearch.xpack.migrate.MigrateTemplateRegistry;
54-
import org.junit.After;
54+
import org.junit.Before;
5555

5656
import java.io.IOException;
5757
import java.time.Instant;
@@ -69,14 +69,15 @@
6969
import static org.hamcrest.Matchers.equalTo;
7070

7171
public class ReindexDatastreamIndexTransportActionIT extends ESIntegTestCase {
72-
@After
73-
private void cleanup() {
72+
@Before
73+
private void setup() throws Exception {
7474
safeGet(
7575
clusterAdmin().execute(
7676
DeletePipelineTransportAction.TYPE,
7777
new DeletePipelineRequest(MigrateTemplateRegistry.REINDEX_DATA_STREAM_PIPELINE_NAME)
7878
)
7979
);
80+
assertBusy(() -> { assertTrue(getPipelines(MigrateTemplateRegistry.REINDEX_DATA_STREAM_PIPELINE_NAME).isFound()); });
8081
}
8182

8283
private static final String MAPPING = """
@@ -121,6 +122,9 @@ public void testTimestamp0AddedIfMissing() {
121122
// add doc without timestamp
122123
addDoc(sourceIndex, "{\"foo\":\"baz\"}");
123124

125+
// wait until doc is written to all shards before adding mapping
126+
ensureHealth(sourceIndex);
127+
124128
// add timestamp to source mapping
125129
indicesAdmin().preparePutMapping(sourceIndex).setSource(DATA_STREAM_MAPPING, XContentType.JSON).get();
126130

@@ -136,6 +140,7 @@ public void testTimestamp0AddedIfMissing() {
136140
}
137141

138142
public void testTimestampNotAddedIfExists() {
143+
139144
var sourceIndex = randomAlphaOfLength(20).toLowerCase(Locale.ROOT);
140145
safeGet(indicesAdmin().create(new CreateIndexRequest(sourceIndex)));
141146

@@ -144,6 +149,9 @@ public void testTimestampNotAddedIfExists() {
144149
var doc = String.format(Locale.ROOT, "{\"%s\":\"%s\"}", DEFAULT_TIMESTAMP_FIELD, time);
145150
addDoc(sourceIndex, doc);
146151

152+
// wait until doc is written to all shards before adding mapping
153+
ensureHealth(sourceIndex);
154+
147155
// add timestamp to source mapping
148156
indicesAdmin().preparePutMapping(sourceIndex).setSource(DATA_STREAM_MAPPING, XContentType.JSON).get();
149157

@@ -189,6 +197,9 @@ public void testCustomReindexPipeline() {
189197
var doc = String.format(Locale.ROOT, "{\"%s\":\"%s\"}", DEFAULT_TIMESTAMP_FIELD, time);
190198
addDoc(sourceIndex, doc);
191199

200+
// wait until doc is written to all shards before adding mapping
201+
ensureHealth(sourceIndex);
202+
192203
// add timestamp to source mapping
193204
indicesAdmin().preparePutMapping(sourceIndex).setSource(DATA_STREAM_MAPPING, XContentType.JSON).get();
194205

@@ -298,7 +309,7 @@ public void testMissingSourceIndex() {
298309
);
299310
}
300311

301-
public void testSettingsAddedBeforeReindex() throws Exception {
312+
public void testSettingsAddedBeforeReindex() {
302313
// start with a static setting
303314
var numShards = randomIntBetween(1, 10);
304315
var staticSettings = Settings.builder().put(IndexMetadata.SETTING_NUMBER_OF_SHARDS, numShards).build();
@@ -603,4 +614,12 @@ void addDoc(String index, String doc) {
603614
bulkRequest.add(new IndexRequest(index).opType(DocWriteRequest.OpType.CREATE).source(doc, XContentType.JSON));
604615
safeGet(client().bulk(bulkRequest));
605616
}
617+
618+
private void ensureHealth(String index) {
619+
if (cluster().numDataNodes() > 1) {
620+
ensureGreen(index);
621+
} else {
622+
ensureYellow(index);
623+
}
624+
}
606625
}

0 commit comments

Comments
 (0)