Skip to content

Commit 8b4a0df

Browse files
committed
Fix ReindexDataStreamIndexAction timestamp validation bug in tests (elastic#122274)
Fix race condition test bugs related to the reindex-data-stream-pipeline. For tests that add doc without timestamp, then add mapping with timestamp, ensure green between adding doc and adding mapping. This makes sure that doc has been written to all shards and thus that timestamp validation does not occur while doc is being written to a shard. Delete pipeline in Before method, then wait for it to be re-created by the MigrateTemplateRegistry. (cherry picked from commit 05a2003) # Conflicts: # muted-tests.yml # x-pack/plugin/migrate/src/internalClusterTest/java/org/elasticsearch/xpack/migrate/action/ReindexDatastreamIndexTransportActionIT.java
1 parent 7bf974c commit 8b4a0df

File tree

2 files changed

+24
-6
lines changed

2 files changed

+24
-6
lines changed

muted-tests.yml

Lines changed: 0 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -513,8 +513,6 @@ tests:
513513
- class: org.elasticsearch.xpack.transform.checkpoint.TransformCCSCanMatchIT
514514
method: testTransformLifecycle_RangeQueryThatMatchesNoShards
515515
issue: https://github.com/elastic/elasticsearch/issues/121480
516-
- class: org.elasticsearch.xpack.migrate.action.ReindexDatastreamIndexTransportActionIT
517-
issue: https://github.com/elastic/elasticsearch/issues/121737
518516
- class: org.elasticsearch.xpack.security.authc.service.ServiceAccountSingleNodeTests
519517
method: testAuthenticateWithServiceFileToken
520518
issue: https://github.com/elastic/elasticsearch/issues/120988

x-pack/plugin/migrate/src/internalClusterTest/java/org/elasticsearch/xpack/migrate/action/ReindexDatastreamIndexTransportActionIT.java

Lines changed: 24 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -51,7 +51,7 @@
5151
import org.elasticsearch.xcontent.XContentType;
5252
import org.elasticsearch.xpack.migrate.MigratePlugin;
5353
import org.elasticsearch.xpack.migrate.MigrateTemplateRegistry;
54-
import org.junit.After;
54+
import org.junit.Before;
5555

5656
import java.io.IOException;
5757
import java.time.Instant;
@@ -69,14 +69,16 @@
6969
import static org.hamcrest.Matchers.equalTo;
7070

7171
public class ReindexDatastreamIndexTransportActionIT extends ESIntegTestCase {
72-
@After
73-
private void cleanup() {
72+
73+
@Before
74+
private void setup() throws Exception {
7475
safeGet(
7576
clusterAdmin().execute(
7677
DeletePipelineTransportAction.TYPE,
7778
new DeletePipelineRequest(MigrateTemplateRegistry.REINDEX_DATA_STREAM_PIPELINE_NAME)
7879
)
7980
);
81+
assertBusy(() -> { assertTrue(getPipelines(MigrateTemplateRegistry.REINDEX_DATA_STREAM_PIPELINE_NAME).isFound()); });
8082
}
8183

8284
private static final String MAPPING = """
@@ -121,6 +123,9 @@ public void testTimestamp0AddedIfMissing() {
121123
// add doc without timestamp
122124
addDoc(sourceIndex, "{\"foo\":\"baz\"}");
123125

126+
// wait until doc is written to all shards before adding mapping
127+
ensureHealth(sourceIndex);
128+
124129
// add timestamp to source mapping
125130
indicesAdmin().preparePutMapping(sourceIndex).setSource(DATA_STREAM_MAPPING, XContentType.JSON).get();
126131

@@ -136,6 +141,7 @@ public void testTimestamp0AddedIfMissing() {
136141
}
137142

138143
public void testTimestampNotAddedIfExists() {
144+
139145
var sourceIndex = randomAlphaOfLength(20).toLowerCase(Locale.ROOT);
140146
safeGet(indicesAdmin().create(new CreateIndexRequest(sourceIndex)));
141147

@@ -144,6 +150,9 @@ public void testTimestampNotAddedIfExists() {
144150
var doc = String.format(Locale.ROOT, "{\"%s\":\"%s\"}", DEFAULT_TIMESTAMP_FIELD, time);
145151
addDoc(sourceIndex, doc);
146152

153+
// wait until doc is written to all shards before adding mapping
154+
ensureHealth(sourceIndex);
155+
147156
// add timestamp to source mapping
148157
indicesAdmin().preparePutMapping(sourceIndex).setSource(DATA_STREAM_MAPPING, XContentType.JSON).get();
149158

@@ -189,6 +198,9 @@ public void testCustomReindexPipeline() {
189198
var doc = String.format(Locale.ROOT, "{\"%s\":\"%s\"}", DEFAULT_TIMESTAMP_FIELD, time);
190199
addDoc(sourceIndex, doc);
191200

201+
// wait until doc is written to all shards before adding mapping
202+
ensureHealth(sourceIndex);
203+
192204
// add timestamp to source mapping
193205
indicesAdmin().preparePutMapping(sourceIndex).setSource(DATA_STREAM_MAPPING, XContentType.JSON).get();
194206

@@ -298,7 +310,7 @@ public void testMissingSourceIndex() {
298310
);
299311
}
300312

301-
public void testSettingsAddedBeforeReindex() throws Exception {
313+
public void testSettingsAddedBeforeReindex() {
302314
// start with a static setting
303315
var numShards = randomIntBetween(1, 10);
304316
var staticSettings = Settings.builder().put(IndexMetadata.SETTING_NUMBER_OF_SHARDS, numShards).build();
@@ -603,4 +615,12 @@ void addDoc(String index, String doc) {
603615
bulkRequest.add(new IndexRequest(index).opType(DocWriteRequest.OpType.CREATE).source(doc, XContentType.JSON));
604616
safeGet(client().bulk(bulkRequest));
605617
}
618+
619+
private void ensureHealth(String index) {
620+
if (cluster().numDataNodes() > 1) {
621+
ensureGreen(index);
622+
} else {
623+
ensureYellow(index);
624+
}
625+
}
606626
}

0 commit comments

Comments
 (0)