3434import  java .nio .charset .StandardCharsets ;
3535import  java .time .Instant ;
3636import  java .util .List ;
37+ import  java .util .Locale ;
3738import  java .util .Map ;
3839import  java .util .Set ;
3940import  java .util .concurrent .TimeUnit ;
@@ -208,7 +209,9 @@ public void testUpgradeDataStream() throws Exception {
208209        } else  if  (CLUSTER_TYPE  == ClusterType .UPGRADED ) {
209210            Map <String , Map <String , Object >> oldIndicesMetadata  = getIndicesMetadata (dataStreamName );
210211            upgradeDataStream (dataStreamName , numRollovers , numRollovers  + 1 , 0 , ilmEnabled );
212+             cancelReindexTask (dataStreamName );
211213            upgradeDataStream (dataStreamFromNonDataStreamIndices , 0 , 1 , 0 , ilmEnabled );
214+             cancelReindexTask (dataStreamFromNonDataStreamIndices );
212215            Map <String , Map <String , Object >> upgradedIndicesMetadata  = getIndicesMetadata (dataStreamName );
213216
214217            if  (ilmEnabled ) {
@@ -219,6 +222,38 @@ public void testUpgradeDataStream() throws Exception {
219222        }
220223    }
221224
225+     public  void  testMigrateDoesNotRestartOnUpgrade () throws  Exception  {
226+         /* 
227+          * This test makes sure that if reindex is run and completed, then when the cluster is upgraded the task 
228+          * does not begin running again. 
229+          */ 
230+         String  dataStreamName  = "reindex_test_data_stream_ugprade_test" ;
231+         int  numRollovers  = randomIntBetween (0 , 5 );
232+         boolean  hasILMPolicy  = randomBoolean ();
233+         boolean  ilmEnabled  = hasILMPolicy  && randomBoolean ();
234+         if  (CLUSTER_TYPE  == ClusterType .OLD ) {
235+             createAndRolloverDataStream (dataStreamName , numRollovers , hasILMPolicy , ilmEnabled );
236+             upgradeDataStream (dataStreamName , numRollovers , numRollovers  + 1 , 0 , ilmEnabled );
237+         } else  if  (CLUSTER_TYPE  == ClusterType .UPGRADED ) {
238+             makeSureNoUpgrade (dataStreamName );
239+             cancelReindexTask (dataStreamName );
240+         } else  {
241+             makeSureNoUpgrade (dataStreamName );
242+         }
243+     }
244+ 
245+     private  void  cancelReindexTask (String  dataStreamName ) throws  IOException  {
246+         Request  cancelRequest  = new  Request ("POST" , "_migration/reindex/"  + dataStreamName  + "/_cancel" );
247+         String  upgradeUser  = "upgrade_user" ;
248+         String  upgradeUserPassword  = "x-pack-test-password" ;
249+         createRole ("upgrade_role" , dataStreamName );
250+         createUser (upgradeUser , upgradeUserPassword , "upgrade_role" );
251+         try  (RestClient  upgradeUserClient  = getClient (upgradeUser , upgradeUserPassword )) {
252+             Response  cancelResponse  = upgradeUserClient .performRequest (cancelRequest );
253+             assertOK (cancelResponse );
254+         }
255+     }
256+ 
222257    private  void  compareIndexMetadata (
223258        Map <String , Map <String , Object >> oldIndicesMetadata ,
224259        Map <String , Map <String , Object >> upgradedIndicesMetadata 
@@ -422,7 +457,10 @@ private void createAndRolloverDataStream(String dataStreamName, int numRollovers
422457                "data_stream": { 
423458                } 
424459            }""" ;
425-         var  putIndexTemplateRequest  = new  Request ("POST" , "/_index_template/reindex_test_data_stream_template" );
460+         var  putIndexTemplateRequest  = new  Request (
461+             "POST" ,
462+             "/_index_template/reindex_test_data_stream_template"  + randomAlphanumericOfLength (10 ).toLowerCase (Locale .ROOT )
463+         );
426464        putIndexTemplateRequest .setJsonEntity (indexTemplate .replace ("$TEMPLATE" , template ).replace ("$PATTERN" , dataStreamName ));
427465        assertOK (client ().performRequest (putIndexTemplateRequest ));
428466        bulkLoadData (dataStreamName );
@@ -651,7 +689,7 @@ private void upgradeDataStream(
651689                assertOK (statusResponse );
652690                assertThat (statusResponseString , statusResponseMap .get ("complete" ), equalTo (true ));
653691                final  int  originalWriteIndex  = 1 ;
654-                 if  (isOriginalClusterSameMajorVersionAsCurrent ()) {
692+                 if  (isOriginalClusterSameMajorVersionAsCurrent () ||  CLUSTER_TYPE  ==  ClusterType . OLD ) {
655693                    assertThat (
656694                        statusResponseString ,
657695                        statusResponseMap .get ("total_indices_in_data_stream" ),
@@ -698,10 +736,35 @@ private void upgradeDataStream(
698736            // Verify it's possible to reindex again after a successful reindex 
699737            reindexResponse  = upgradeUserClient .performRequest (reindexRequest );
700738            assertOK (reindexResponse );
739+         }
740+     }
701741
702-             Request  cancelRequest  = new  Request ("POST" , "_migration/reindex/"  + dataStreamName  + "/_cancel" );
703-             Response  cancelResponse  = upgradeUserClient .performRequest (cancelRequest );
704-             assertOK (cancelResponse );
742+     private  void  makeSureNoUpgrade (String  dataStreamName ) throws  Exception  {
743+         String  upgradeUser  = "upgrade_user" ;
744+         String  upgradeUserPassword  = "x-pack-test-password" ;
745+         createRole ("upgrade_role" , dataStreamName );
746+         createUser (upgradeUser , upgradeUserPassword , "upgrade_role" );
747+         try  (RestClient  upgradeUserClient  = getClient (upgradeUser , upgradeUserPassword )) {
748+             assertBusy (() -> {
749+                 try  {
750+                     Request  statusRequest  = new  Request ("GET" , "_migration/reindex/"  + dataStreamName  + "/_status" );
751+                     Response  statusResponse  = upgradeUserClient .performRequest (statusRequest );
752+                     Map <String , Object > statusResponseMap  = XContentHelper .convertToMap (
753+                         JsonXContent .jsonXContent ,
754+                         statusResponse .getEntity ().getContent (),
755+                         false 
756+                     );
757+                     String  statusResponseString  = statusResponseMap .keySet ()
758+                         .stream ()
759+                         .map (key  -> key  + "="  + statusResponseMap .get (key ))
760+                         .collect (Collectors .joining (", " , "{" , "}" ));
761+                     assertOK (statusResponse );
762+                     assertThat (statusResponseString , statusResponseMap .get ("complete" ), equalTo (true ));
763+                     assertThat (statusResponseString , statusResponseMap .get ("successes" ), equalTo (0 ));
764+                 } catch  (Exception  e ) {
765+                     fail (e );
766+                 }
767+             }, 60 , TimeUnit .SECONDS );
705768        }
706769    }
707770
0 commit comments