4949import org .elasticsearch .xcontent .XContentType ;
5050import org .elasticsearch .xpack .migrate .MigratePlugin ;
5151import org .elasticsearch .xpack .migrate .MigrateTemplateRegistry ;
52- import org .junit .After ;
52+ import org .junit .Before ;
5353
5454import java .io .IOException ;
5555import java .time .Instant ;
6767import static org .hamcrest .Matchers .equalTo ;
6868
6969public class ReindexDatastreamIndexTransportActionIT extends ESIntegTestCase {
70- @ After
71- private void cleanup () {
70+
71+ @ Before
72+ private void setup () throws Exception {
7273 deletePipeline (MigrateTemplateRegistry .REINDEX_DATA_STREAM_PIPELINE_NAME );
74+ assertBusy (() -> { assertTrue (getPipelines (MigrateTemplateRegistry .REINDEX_DATA_STREAM_PIPELINE_NAME ).isFound ()); });
7375 }
7476
7577 private static final String MAPPING = """
@@ -114,6 +116,9 @@ public void testTimestamp0AddedIfMissing() {
114116 // add doc without timestamp
115117 addDoc (sourceIndex , "{\" foo\" :\" baz\" }" );
116118
119+ // wait until doc is written to all shards before adding mapping
120+ ensureHealth (sourceIndex );
121+
117122 // add timestamp to source mapping
118123 indicesAdmin ().preparePutMapping (sourceIndex ).setSource (DATA_STREAM_MAPPING , XContentType .JSON ).get ();
119124
@@ -129,6 +134,7 @@ public void testTimestamp0AddedIfMissing() {
129134 }
130135
131136 public void testTimestampNotAddedIfExists () {
137+
132138 var sourceIndex = randomAlphaOfLength (20 ).toLowerCase (Locale .ROOT );
133139 safeGet (indicesAdmin ().create (new CreateIndexRequest (sourceIndex )));
134140
@@ -137,6 +143,9 @@ public void testTimestampNotAddedIfExists() {
137143 var doc = String .format (Locale .ROOT , "{\" %s\" :\" %s\" }" , DEFAULT_TIMESTAMP_FIELD , time );
138144 addDoc (sourceIndex , doc );
139145
146+ // wait until doc is written to all shards before adding mapping
147+ ensureHealth (sourceIndex );
148+
140149 // add timestamp to source mapping
141150 indicesAdmin ().preparePutMapping (sourceIndex ).setSource (DATA_STREAM_MAPPING , XContentType .JSON ).get ();
142151
@@ -184,6 +193,9 @@ public void testCustomReindexPipeline() {
184193 var doc = String .format (Locale .ROOT , "{\" %s\" :\" %s\" }" , DEFAULT_TIMESTAMP_FIELD , time );
185194 addDoc (sourceIndex , doc );
186195
196+ // wait until doc is written to all shards before adding mapping
197+ ensureHealth (sourceIndex );
198+
187199 // add timestamp to source mapping
188200 indicesAdmin ().preparePutMapping (sourceIndex ).setSource (DATA_STREAM_MAPPING , XContentType .JSON ).get ();
189201
@@ -293,7 +305,7 @@ public void testMissingSourceIndex() {
293305 );
294306 }
295307
296- public void testSettingsAddedBeforeReindex () throws Exception {
308+ public void testSettingsAddedBeforeReindex () {
297309 // start with a static setting
298310 var numShards = randomIntBetween (1 , 10 );
299311 var staticSettings = Settings .builder ().put (IndexMetadata .SETTING_NUMBER_OF_SHARDS , numShards ).build ();
@@ -604,4 +616,12 @@ void addDoc(String index, String doc) {
604616 bulkRequest .add (new IndexRequest (index ).opType (DocWriteRequest .OpType .CREATE ).source (doc , XContentType .JSON ));
605617 safeGet (client ().bulk (bulkRequest ));
606618 }
619+
620+ private void ensureHealth (String index ) {
621+ if (cluster ().numDataNodes () > 1 ) {
622+ ensureGreen (index );
623+ } else {
624+ ensureYellow (index );
625+ }
626+ }
607627}
0 commit comments