88
99import org .apache .http .util .EntityUtils ;
1010import org .elasticsearch .Build ;
11+ import org .elasticsearch .TransportVersions ;
1112import org .elasticsearch .Version ;
1213import org .elasticsearch .client .Node ;
1314import org .elasticsearch .client .Request ;
@@ -194,15 +195,79 @@ public void testUpgradeDataStream() throws Exception {
194195 createAndRolloverDataStream (dataStreamName , numRollovers );
195196 createDataStreamFromNonDataStreamIndices (dataStreamFromNonDataStreamIndices );
196197 } else if (CLUSTER_TYPE == ClusterType .UPGRADED ) {
198+ Map <String , Map <String , Object >> oldIndicesMetadata = getIndicesMetadata (dataStreamName );
197199 upgradeDataStream (dataStreamName , numRollovers , numRollovers + 1 , 0 );
198200 upgradeDataStream (dataStreamFromNonDataStreamIndices , 0 , 1 , 0 );
201+ Map <String , Map <String , Object >> upgradedIndicesMetadata = getIndicesMetadata (dataStreamName );
202+ compareIndexMetadata (oldIndicesMetadata , upgradedIndicesMetadata );
199203 }
200204 }
201205
202- private static void createAndRolloverDataStream (String dataStreamName , int numRollovers ) throws IOException {
206+ private void compareIndexMetadata (
207+ Map <String , Map <String , Object >> oldIndicesMetadata ,
208+ Map <String , Map <String , Object >> upgradedIndicesMetadata
209+ ) {
210+ for (Map .Entry <String , Map <String , Object >> upgradedIndexEntry : upgradedIndicesMetadata .entrySet ()) {
211+ String upgradedIndexName = upgradedIndexEntry .getKey ();
212+ if (upgradedIndexName .startsWith (".migrated-" )) {
213+ String oldIndexName = "." + upgradedIndexName .substring (".migrated-" .length ());
214+ Map <String , Object > oldIndexMetadata = oldIndicesMetadata .get (oldIndexName );
215+ Map <String , Object > upgradedIndexMetadata = upgradedIndexEntry .getValue ();
216+ compareSettings (oldIndexMetadata , upgradedIndexMetadata );
217+ assertThat ("Mappings did not match" , upgradedIndexMetadata .get ("mappings" ), equalTo (oldIndexMetadata .get ("mappings" )));
218+ // TODO: Uncomment the following two checks once we are correctly copying this state over:
219+ // assertThat("ILM states did not match", upgradedIndexMetadata.get("ilm"), equalTo(oldIndexMetadata.get("ilm")));
220+ // assertThat(
221+ // "Rollover info did not match",
222+ // upgradedIndexMetadata.get("rollover_info"),
223+ // equalTo(oldIndexMetadata.get("rollover_info"))
224+ // );
225+ assertThat (upgradedIndexMetadata .get ("system" ), equalTo (oldIndexMetadata .get ("system" )));
226+ }
227+ }
228+ }
229+
230+ private void compareSettings (Map <String , Object > oldIndexMetadata , Map <String , Object > upgradedIndexMetadata ) {
231+ Map <String , Object > oldIndexSettings = getIndexSettingsFromIndexMetadata (oldIndexMetadata );
232+ Map <String , Object > upgradedIndexSettings = getIndexSettingsFromIndexMetadata (upgradedIndexMetadata );
233+ final Set <String > SETTINGS_TO_CHECK = Set .of (
234+ "lifecycle" ,
235+ "mode" ,
236+ "routing" ,
237+ "hidden" ,
238+ "number_of_shards" ,
239+ // "creation_date", TODO: Uncomment this once we are correctly copying over this setting
240+ "number_of_replicas"
241+ );
242+ for (String setting : SETTINGS_TO_CHECK ) {
243+ assertThat (
244+ "Unexpected value for setting " + setting ,
245+ upgradedIndexSettings .get (setting ),
246+ equalTo (oldIndexSettings .get (setting ))
247+ );
248+ }
249+ }
250+
251+ @ SuppressWarnings ("unchecked" )
252+ private Map <String , Object > getIndexSettingsFromIndexMetadata (Map <String , Object > indexMetadata ) {
253+ return (Map <String , Object >) ((Map <String , Object >) indexMetadata .get ("settings" )).get ("index" );
254+ }
255+
256+ private void createAndRolloverDataStream (String dataStreamName , int numRollovers ) throws IOException {
257+ boolean useIlm = minimumTransportVersion ().before (TransportVersions .V_8_9_X ) || randomBoolean ();
258+ if (useIlm ) {
259+ createIlmPolicy ();
260+ }
203261 // We want to create a data stream and roll it over several times so that we have several indices to upgrade
204- final String template = """
262+ String template = """
205263 {
264+ "settings":{
265+ "index": {
266+ $ILM_SETTING
267+ "number_of_replicas": 0
268+ }
269+ },
270+ $DSL_TEMPLATE
206271 "mappings":{
207272 "dynamic_templates": [
208273 {
@@ -248,6 +313,19 @@ private static void createAndRolloverDataStream(String dataStreamName, int numRo
248313 }
249314 }
250315 """ ;
316+ if (useIlm ) {
317+ template = template .replace ("$ILM_SETTING" , """
318+ "lifecycle.name": "test-lifecycle-policy",
319+ """ );
320+ template = template .replace ("$DSL_TEMPLATE" , "" );
321+ } else {
322+ template = template .replace ("$ILM_SETTING" , "" );
323+ template = template .replace ("$DSL_TEMPLATE" , """
324+ "lifecycle": {
325+ "data_retention": "7d"
326+ },
327+ """ );
328+ }
251329 final String indexTemplate = """
252330 {
253331 "index_patterns": ["$PATTERN"],
@@ -268,6 +346,52 @@ private static void createAndRolloverDataStream(String dataStreamName, int numRo
268346 }
269347 }
270348
349+ private static void createIlmPolicy () throws IOException {
350+ String ilmPolicy = """
351+ {
352+ "policy": {
353+ "phases": {
354+ "hot": {
355+ "actions": {
356+ "rollover": {
357+ "max_primary_shard_size": "50kb"
358+ }
359+ }
360+ },
361+ "warm": {
362+ "min_age": "30d",
363+ "actions": {
364+ "shrink": {
365+ "number_of_shards": 1
366+ },
367+ "forcemerge": {
368+ "max_num_segments": 1
369+ }
370+ }
371+ }
372+ }
373+ }
374+ }""" ;
375+ Request putIlmPolicyRequest = new Request ("PUT" , "_ilm/policy/test-lifecycle-policy" );
376+ putIlmPolicyRequest .setJsonEntity (ilmPolicy );
377+ assertOK (client ().performRequest (putIlmPolicyRequest ));
378+ }
379+
380+ /*
381+ * This returns a Map of index metadata for each index in the data stream, as retrieved from the cluster state.
382+ */
383+ @ SuppressWarnings ("unchecked" )
384+ private Map <String , Map <String , Object >> getIndicesMetadata (String dataStreamName ) throws IOException {
385+ Request getClusterStateRequest = new Request ("GET" , "/_cluster/state/metadata/" + dataStreamName );
386+ Response clusterStateResponse = client ().performRequest (getClusterStateRequest );
387+ Map <String , Object > clusterState = XContentHelper .convertToMap (
388+ JsonXContent .jsonXContent ,
389+ clusterStateResponse .getEntity ().getContent (),
390+ false
391+ );
392+ return ((Map <String , Map <String , Map <String , Object >>>) clusterState .get ("metadata" )).get ("indices" );
393+ }
394+
271395 private void createDataStreamFromNonDataStreamIndices (String dataStreamFromNonDataStreamIndices ) throws IOException {
272396 /*
273397 * This method creates an index, creates an alias to that index, and then converts the aliased index into a data stream. This is
0 commit comments