3434import java .nio .charset .StandardCharsets ;
3535import java .time .Instant ;
3636import java .util .List ;
37+ import java .util .Locale ;
3738import java .util .Map ;
3839import java .util .Set ;
3940import java .util .concurrent .TimeUnit ;
@@ -208,7 +209,9 @@ public void testUpgradeDataStream() throws Exception {
208209 } else if (CLUSTER_TYPE == ClusterType .UPGRADED ) {
209210 Map <String , Map <String , Object >> oldIndicesMetadata = getIndicesMetadata (dataStreamName );
210211 upgradeDataStream (dataStreamName , numRollovers , numRollovers + 1 , 0 , ilmEnabled );
212+ cancelReindexTask (dataStreamName );
211213 upgradeDataStream (dataStreamFromNonDataStreamIndices , 0 , 1 , 0 , ilmEnabled );
214+ cancelReindexTask (dataStreamFromNonDataStreamIndices );
212215 Map <String , Map <String , Object >> upgradedIndicesMetadata = getIndicesMetadata (dataStreamName );
213216
214217 if (ilmEnabled ) {
@@ -219,6 +222,38 @@ public void testUpgradeDataStream() throws Exception {
219222 }
220223 }
221224
225+ public void testMigrateDoesNotRestartOnUpgrade () throws Exception {
226+ /*
227+ * This test makes sure that if reindex is run and completed, then when the cluster is upgraded the task
228+ * does not begin running again.
229+ */
230+ String dataStreamName = "reindex_test_data_stream_ugprade_test" ;
231+ int numRollovers = randomIntBetween (0 , 5 );
232+ boolean hasILMPolicy = randomBoolean ();
233+ boolean ilmEnabled = hasILMPolicy && randomBoolean ();
234+ if (CLUSTER_TYPE == ClusterType .OLD ) {
235+ createAndRolloverDataStream (dataStreamName , numRollovers , hasILMPolicy , ilmEnabled );
236+ upgradeDataStream (dataStreamName , numRollovers , numRollovers + 1 , 0 , ilmEnabled );
237+ } else if (CLUSTER_TYPE == ClusterType .UPGRADED ) {
238+ makeSureNoUpgrade (dataStreamName );
239+ cancelReindexTask (dataStreamName );
240+ } else {
241+ makeSureNoUpgrade (dataStreamName );
242+ }
243+ }
244+
245+ private void cancelReindexTask (String dataStreamName ) throws IOException {
246+ Request cancelRequest = new Request ("POST" , "_migration/reindex/" + dataStreamName + "/_cancel" );
247+ String upgradeUser = "upgrade_user" ;
248+ String upgradeUserPassword = "x-pack-test-password" ;
249+ createRole ("upgrade_role" , dataStreamName );
250+ createUser (upgradeUser , upgradeUserPassword , "upgrade_role" );
251+ try (RestClient upgradeUserClient = getClient (upgradeUser , upgradeUserPassword )) {
252+ Response cancelResponse = upgradeUserClient .performRequest (cancelRequest );
253+ assertOK (cancelResponse );
254+ }
255+ }
256+
222257 private void compareIndexMetadata (
223258 Map <String , Map <String , Object >> oldIndicesMetadata ,
224259 Map <String , Map <String , Object >> upgradedIndicesMetadata
@@ -422,7 +457,10 @@ private void createAndRolloverDataStream(String dataStreamName, int numRollovers
422457 "data_stream": {
423458 }
424459 }""" ;
425- var putIndexTemplateRequest = new Request ("POST" , "/_index_template/reindex_test_data_stream_template" );
460+ var putIndexTemplateRequest = new Request (
461+ "POST" ,
462+ "/_index_template/reindex_test_data_stream_template" + randomAlphanumericOfLength (10 ).toLowerCase (Locale .ROOT )
463+ );
426464 putIndexTemplateRequest .setJsonEntity (indexTemplate .replace ("$TEMPLATE" , template ).replace ("$PATTERN" , dataStreamName ));
427465 assertOK (client ().performRequest (putIndexTemplateRequest ));
428466 bulkLoadData (dataStreamName );
@@ -651,7 +689,7 @@ private void upgradeDataStream(
651689 assertOK (statusResponse );
652690 assertThat (statusResponseString , statusResponseMap .get ("complete" ), equalTo (true ));
653691 final int originalWriteIndex = 1 ;
654- if (isOriginalClusterSameMajorVersionAsCurrent ()) {
692+ if (isOriginalClusterSameMajorVersionAsCurrent () || CLUSTER_TYPE == ClusterType . OLD ) {
655693 assertThat (
656694 statusResponseString ,
657695 statusResponseMap .get ("total_indices_in_data_stream" ),
@@ -698,10 +736,35 @@ private void upgradeDataStream(
698736 // Verify it's possible to reindex again after a successful reindex
699737 reindexResponse = upgradeUserClient .performRequest (reindexRequest );
700738 assertOK (reindexResponse );
739+ }
740+ }
701741
702- Request cancelRequest = new Request ("POST" , "_migration/reindex/" + dataStreamName + "/_cancel" );
703- Response cancelResponse = upgradeUserClient .performRequest (cancelRequest );
704- assertOK (cancelResponse );
742+ private void makeSureNoUpgrade (String dataStreamName ) throws Exception {
743+ String upgradeUser = "upgrade_user" ;
744+ String upgradeUserPassword = "x-pack-test-password" ;
745+ createRole ("upgrade_role" , dataStreamName );
746+ createUser (upgradeUser , upgradeUserPassword , "upgrade_role" );
747+ try (RestClient upgradeUserClient = getClient (upgradeUser , upgradeUserPassword )) {
748+ assertBusy (() -> {
749+ try {
750+ Request statusRequest = new Request ("GET" , "_migration/reindex/" + dataStreamName + "/_status" );
751+ Response statusResponse = upgradeUserClient .performRequest (statusRequest );
752+ Map <String , Object > statusResponseMap = XContentHelper .convertToMap (
753+ JsonXContent .jsonXContent ,
754+ statusResponse .getEntity ().getContent (),
755+ false
756+ );
757+ String statusResponseString = statusResponseMap .keySet ()
758+ .stream ()
759+ .map (key -> key + "=" + statusResponseMap .get (key ))
760+ .collect (Collectors .joining (", " , "{" , "}" ));
761+ assertOK (statusResponse );
762+ assertThat (statusResponseString , statusResponseMap .get ("complete" ), equalTo (true ));
763+ assertThat (statusResponseString , statusResponseMap .get ("successes" ), equalTo (0 ));
764+ } catch (Exception e ) {
765+ fail (e );
766+ }
767+ }, 60 , TimeUnit .SECONDS );
705768 }
706769 }
707770
0 commit comments