4949import org .elasticsearch .xcontent .XContentType ;
5050import org .elasticsearch .xpack .migrate .MigratePlugin ;
5151import org .elasticsearch .xpack .migrate .MigrateTemplateRegistry ;
52+ import org .junit .After ;
5253
5354import java .io .IOException ;
5455import java .time .Instant ;
6768
6869public class ReindexDatastreamIndexTransportActionIT extends ESIntegTestCase {
6970
71+ @ After
72+ private void cleanup () throws Exception {
73+ deletePipeline (MigrateTemplateRegistry .REINDEX_DATA_STREAM_PIPELINE_NAME );
74+ assertBusy (() -> {
75+ assertTrue (getPipelines (MigrateTemplateRegistry .REINDEX_DATA_STREAM_PIPELINE_NAME ).isFound ());
76+ });
77+ }
78+
7079 private static final String MAPPING = """
7180 {
7281 "_doc":{
@@ -103,16 +112,19 @@ protected Collection<Class<? extends Plugin>> nodePlugins() {
103112 """ ;
104113
105114 public void testTimestamp0AddedIfMissing () {
106- // Delete pipeline in case is a custom pipeline from another test
107- // MigrateTemplateRegistry will immediately rebuild pipeline
108- deletePipeline (MigrateTemplateRegistry .REINDEX_DATA_STREAM_PIPELINE_NAME );
109-
110115 var sourceIndex = randomAlphaOfLength (20 ).toLowerCase (Locale .ROOT );
111116 safeGet (indicesAdmin ().create (new CreateIndexRequest (sourceIndex )));
112117
113118 // add doc without timestamp
114119 addDoc (sourceIndex , "{\" foo\" :\" baz\" }" );
115120
121+ // wait until doc is written to all shards before adding mapping
122+ if (cluster ().numDataNodes () > 1 ) {
123+ ensureGreen (sourceIndex );
124+ } else {
125+ ensureYellow (sourceIndex );
126+ }
127+
116128 // add timestamp to source mapping
117129 indicesAdmin ().preparePutMapping (sourceIndex ).setSource (DATA_STREAM_MAPPING , XContentType .JSON ).get ();
118130
@@ -128,9 +140,6 @@ public void testTimestamp0AddedIfMissing() {
128140 }
129141
130142 public void testTimestampNotAddedIfExists () {
131- // Delete pipeline in case is a custom pipeline from another test
132- // MigrateTemplateRegistry will immediately rebuild pipeline
133- deletePipeline (MigrateTemplateRegistry .REINDEX_DATA_STREAM_PIPELINE_NAME );
134143
135144 var sourceIndex = randomAlphaOfLength (20 ).toLowerCase (Locale .ROOT );
136145 safeGet (indicesAdmin ().create (new CreateIndexRequest (sourceIndex )));
@@ -296,7 +305,7 @@ public void testMissingSourceIndex() {
296305 );
297306 }
298307
299- public void testSettingsAddedBeforeReindex () throws Exception {
308+ public void testSettingsAddedBeforeReindex () {
300309 // start with a static setting
301310 var numShards = randomIntBetween (1 , 10 );
302311 var staticSettings = Settings .builder ().put (IndexMetadata .SETTING_NUMBER_OF_SHARDS , numShards ).build ();
0 commit comments