Skip to content

Commit 4108492

Browse files
[8.x] Fix ReindexDataStreamIndexAction timestamp validation bug in tests (#122274) (#122382)
* Fix ReindexDataStreamIndexAction timestamp validation bug in tests (#122274) Fix race condition test bugs related to the reindex-data-stream-pipeline. For tests that add doc without timestamp, then add mapping with timestamp, ensure green between adding doc and adding mapping. This makes sure that doc has been written to all shards and thus that timestamp validation does not occur while doc is being written to a shard. Delete pipeline in Before method, then wait for it to be re-created by the MigrateTemplateRegistry. (cherry picked from commit 05a2003) # Conflicts: # muted-tests.yml # x-pack/plugin/migrate/src/internalClusterTest/java/org/elasticsearch/xpack/migrate/action/ReindexDatastreamIndexTransportActionIT.java * getPipelines missing from 8x.
1 parent 900ca08 commit 4108492

File tree

2 files changed

+36
-6
lines changed

2 files changed

+36
-6
lines changed

muted-tests.yml

Lines changed: 0 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -513,8 +513,6 @@ tests:
513513
- class: org.elasticsearch.xpack.transform.checkpoint.TransformCCSCanMatchIT
514514
method: testTransformLifecycle_RangeQueryThatMatchesNoShards
515515
issue: https://github.com/elastic/elasticsearch/issues/121480
516-
- class: org.elasticsearch.xpack.migrate.action.ReindexDatastreamIndexTransportActionIT
517-
issue: https://github.com/elastic/elasticsearch/issues/121737
518516
- class: org.elasticsearch.xpack.security.authc.service.ServiceAccountSingleNodeTests
519517
method: testAuthenticateWithServiceFileToken
520518
issue: https://github.com/elastic/elasticsearch/issues/120988

x-pack/plugin/migrate/src/internalClusterTest/java/org/elasticsearch/xpack/migrate/action/ReindexDatastreamIndexTransportActionIT.java

Lines changed: 36 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,8 @@
2626
import org.elasticsearch.action.index.IndexRequest;
2727
import org.elasticsearch.action.ingest.DeletePipelineRequest;
2828
import org.elasticsearch.action.ingest.DeletePipelineTransportAction;
29+
import org.elasticsearch.action.ingest.GetPipelineAction;
30+
import org.elasticsearch.action.ingest.GetPipelineRequest;
2931
import org.elasticsearch.action.ingest.PutPipelineRequest;
3032
import org.elasticsearch.action.ingest.PutPipelineTransportAction;
3133
import org.elasticsearch.cluster.block.ClusterBlockException;
@@ -51,7 +53,7 @@
5153
import org.elasticsearch.xcontent.XContentType;
5254
import org.elasticsearch.xpack.migrate.MigratePlugin;
5355
import org.elasticsearch.xpack.migrate.MigrateTemplateRegistry;
54-
import org.junit.After;
56+
import org.junit.Before;
5557

5658
import java.io.IOException;
5759
import java.time.Instant;
@@ -69,14 +71,26 @@
6971
import static org.hamcrest.Matchers.equalTo;
7072

7173
public class ReindexDatastreamIndexTransportActionIT extends ESIntegTestCase {
72-
@After
73-
private void cleanup() {
74+
75+
@Before
76+
private void setup() throws Exception {
7477
safeGet(
7578
clusterAdmin().execute(
7679
DeletePipelineTransportAction.TYPE,
7780
new DeletePipelineRequest(MigrateTemplateRegistry.REINDEX_DATA_STREAM_PIPELINE_NAME)
7881
)
7982
);
83+
84+
assertBusy(() -> {
85+
assertTrue(
86+
safeGet(
87+
clusterAdmin().execute(
88+
GetPipelineAction.INSTANCE,
89+
new GetPipelineRequest(MigrateTemplateRegistry.REINDEX_DATA_STREAM_PIPELINE_NAME)
90+
)
91+
).isFound()
92+
);
93+
});
8094
}
8195

8296
private static final String MAPPING = """
@@ -121,6 +135,9 @@ public void testTimestamp0AddedIfMissing() {
121135
// add doc without timestamp
122136
addDoc(sourceIndex, "{\"foo\":\"baz\"}");
123137

138+
// wait until doc is written to all shards before adding mapping
139+
ensureHealth(sourceIndex);
140+
124141
// add timestamp to source mapping
125142
indicesAdmin().preparePutMapping(sourceIndex).setSource(DATA_STREAM_MAPPING, XContentType.JSON).get();
126143

@@ -136,6 +153,7 @@ public void testTimestamp0AddedIfMissing() {
136153
}
137154

138155
public void testTimestampNotAddedIfExists() {
156+
139157
var sourceIndex = randomAlphaOfLength(20).toLowerCase(Locale.ROOT);
140158
safeGet(indicesAdmin().create(new CreateIndexRequest(sourceIndex)));
141159

@@ -144,6 +162,9 @@ public void testTimestampNotAddedIfExists() {
144162
var doc = String.format(Locale.ROOT, "{\"%s\":\"%s\"}", DEFAULT_TIMESTAMP_FIELD, time);
145163
addDoc(sourceIndex, doc);
146164

165+
// wait until doc is written to all shards before adding mapping
166+
ensureHealth(sourceIndex);
167+
147168
// add timestamp to source mapping
148169
indicesAdmin().preparePutMapping(sourceIndex).setSource(DATA_STREAM_MAPPING, XContentType.JSON).get();
149170

@@ -189,6 +210,9 @@ public void testCustomReindexPipeline() {
189210
var doc = String.format(Locale.ROOT, "{\"%s\":\"%s\"}", DEFAULT_TIMESTAMP_FIELD, time);
190211
addDoc(sourceIndex, doc);
191212

213+
// wait until doc is written to all shards before adding mapping
214+
ensureHealth(sourceIndex);
215+
192216
// add timestamp to source mapping
193217
indicesAdmin().preparePutMapping(sourceIndex).setSource(DATA_STREAM_MAPPING, XContentType.JSON).get();
194218

@@ -298,7 +322,7 @@ public void testMissingSourceIndex() {
298322
);
299323
}
300324

301-
public void testSettingsAddedBeforeReindex() throws Exception {
325+
public void testSettingsAddedBeforeReindex() {
302326
// start with a static setting
303327
var numShards = randomIntBetween(1, 10);
304328
var staticSettings = Settings.builder().put(IndexMetadata.SETTING_NUMBER_OF_SHARDS, numShards).build();
@@ -603,4 +627,12 @@ void addDoc(String index, String doc) {
603627
bulkRequest.add(new IndexRequest(index).opType(DocWriteRequest.OpType.CREATE).source(doc, XContentType.JSON));
604628
safeGet(client().bulk(bulkRequest));
605629
}
630+
631+
private void ensureHealth(String index) {
632+
if (cluster().numDataNodes() > 1) {
633+
ensureGreen(index);
634+
} else {
635+
ensureYellow(index);
636+
}
637+
}
606638
}

0 commit comments

Comments
 (0)