@@ -183,12 +183,19 @@ public void testDataStreamValidationDoesNotBreakUpgrade() throws Exception {
183183 }
184184
185185 public void testUpgradeDataStream () throws Exception {
186+ /*
187+ * This test tests upgrading a "normal" data stream (dataStreamName), and upgrading a data stream that was originally just an
188+ * ordinary index that was converted to a data stream (dataStreamFromNonDataStreamIndices).
189+ */
186190 String dataStreamName = "reindex_test_data_stream" ;
191+ String dataStreamFromNonDataStreamIndices = "index_first_reindex_test_data_stream" ;
187192 int numRollovers = randomIntBetween (0 , 5 );
188193 if (CLUSTER_TYPE == ClusterType .OLD ) {
189194 createAndRolloverDataStream (dataStreamName , numRollovers );
195+ createDataStreamFromNonDataStreamIndices (dataStreamFromNonDataStreamIndices );
190196 } else if (CLUSTER_TYPE == ClusterType .UPGRADED ) {
191- upgradeDataStream (dataStreamName , numRollovers );
197+ upgradeDataStream (dataStreamName , numRollovers , numRollovers + 1 , 0 );
198+ upgradeDataStream (dataStreamFromNonDataStreamIndices , 0 , 0 , 1 );
192199 }
193200 }
194201
@@ -266,7 +273,116 @@ private static void createAndRolloverDataStream(String dataStreamName, int numRo
266273 }
267274 }
268275
269- private void upgradeDataStream (String dataStreamName , int numRolloversOnOldCluster ) throws Exception {
276+ private void createDataStreamFromNonDataStreamIndices (String dataStreamFromNonDataStreamIndices ) throws IOException {
277+ /*
278+ * This method creates an index, creates an alias to that index, and then converts the aliased index into a data stream. This is
279+ * similar to the path that many indices (including system indices) took in versions 7/8.
280+ */
281+ // First, we create an ordinary index with no @timestamp mapping:
282+ final String templateWithNoTimestamp = """
283+ {
284+ "mappings":{
285+ "properties": {
286+ "message": {
287+ "type": "text"
288+ }
289+ }
290+ }
291+ }
292+ """ ;
293+ // Note that this is not a data stream template:
294+ final String indexTemplate = """
295+ {
296+ "index_patterns": ["$PATTERN"],
297+ "template": $TEMPLATE
298+ }""" ;
299+ var putIndexTemplateRequest = new Request ("POST" , "/_index_template/reindex_test_data_stream_index_template" );
300+ putIndexTemplateRequest .setJsonEntity (
301+ indexTemplate .replace ("$TEMPLATE" , templateWithNoTimestamp ).replace ("$PATTERN" , dataStreamFromNonDataStreamIndices + "-*" )
302+ );
303+ assertOK (client ().performRequest (putIndexTemplateRequest ));
304+ String indexName = dataStreamFromNonDataStreamIndices + "-01" ;
305+ bulkLoadDataMissingTimestamp (indexName );
306+ /*
307+ * Next, we will change the index's mapping to include a @timestamp field since we are going to convert it to a data stream. But
308+ * first we have to flush the translog to disk because adding a @timestamp field will cause errors if it is done before the translog
309+ * is flushed:
310+ */
311+ assertOK (client ().performRequest (new Request ("POST" , indexName + "/_flush" )));
312+ ensureHealth (indexName , (request -> {
313+ request .addParameter ("wait_for_nodes" , "3" );
314+ request .addParameter ("wait_for_status" , "green" );
315+ request .addParameter ("timeout" , "70s" );
316+ request .addParameter ("level" , "shards" );
317+ }));
318+
319+ // Updating the mapping to include @timestamp:
320+ Request updateIndexMappingRequest = new Request ("PUT" , indexName + "/_mapping" );
321+ updateIndexMappingRequest .setJsonEntity ("""
322+ {
323+ "properties": {
324+ "@timestamp" : {
325+ "type": "date"
326+ },
327+ "message": {
328+ "type": "text"
329+ }
330+ }
331+ }""" );
332+ assertOK (client ().performRequest (updateIndexMappingRequest ));
333+
334+ // Creating an alias with the same name that the data stream will have:
335+ Request createAliasRequest = new Request ("POST" , "/_aliases" );
336+ String aliasRequestBody = """
337+ {
338+ "actions": [
339+ {
340+ "add": {
341+ "index": "$index",
342+ "alias": "$alias"
343+ }
344+ }
345+ ]
346+ }""" ;
347+ createAliasRequest .setJsonEntity (
348+ aliasRequestBody .replace ("$index" , indexName ).replace ("$alias" , dataStreamFromNonDataStreamIndices )
349+ );
350+ assertOK (client ().performRequest (createAliasRequest ));
351+
352+ // This is now just an aliased index. We'll convert it into a data stream
353+ final String templateWithTimestamp = """
354+ {
355+ "mappings":{
356+ "properties": {
357+ "@timestamp" : {
358+ "type": "date"
359+ },
360+ "message": {
361+ "type": "text"
362+ }
363+ }
364+ }
365+ }
366+ """ ;
367+ final String dataStreamTemplate = """
368+ {
369+ "index_patterns": ["$PATTERN"],
370+ "template": $TEMPLATE,
371+ "data_stream": {
372+ }
373+ }""" ;
374+ var putDataStreamTemplateRequest = new Request ("POST" , "/_index_template/reindex_test_data_stream_data_stream_template" );
375+ putDataStreamTemplateRequest .setJsonEntity (
376+ dataStreamTemplate .replace ("$TEMPLATE" , templateWithTimestamp ).replace ("$PATTERN" , dataStreamFromNonDataStreamIndices )
377+ );
378+ assertOK (client ().performRequest (putDataStreamTemplateRequest ));
379+ Request migrateToDataStreamRequest = new Request ("POST" , "/_data_stream/_migrate/" + dataStreamFromNonDataStreamIndices );
380+ assertOK (client ().performRequest (migrateToDataStreamRequest ));
381+ }
382+
383+ @ SuppressWarnings ("unchecked" )
384+ private void upgradeDataStream (String dataStreamName , int numRolloversOnOldCluster , int expectedSuccessesCount , int expectedErrorCount )
385+ throws Exception {
270386 Set <String > indicesNeedingUpgrade = getDataStreamIndices (dataStreamName );
271387 final int explicitRolloverOnNewClusterCount = randomIntBetween (0 , 2 );
272388 for (int i = 0 ; i < explicitRolloverOnNewClusterCount ; i ++) {
@@ -334,16 +450,19 @@ private void upgradeDataStream(String dataStreamName, int numRolloversOnOldClust
334450 statusResponseMap .get ("total_indices_requiring_upgrade" ),
335451 equalTo (originalWriteIndex + numRolloversOnOldCluster )
336452 );
337- assertThat (statusResponseString , statusResponseMap .get ("successes" ), equalTo (numRolloversOnOldCluster + 1 ));
453+ assertThat (statusResponseString , statusResponseMap .get ("successes" ), equalTo (expectedSuccessesCount ));
338454 // We expect all the original indices to have been deleted
339- for (String oldIndex : indicesNeedingUpgrade ) {
340- assertThat (statusResponseString , indexExists (oldIndex ), equalTo (false ));
455+ if (expectedErrorCount == 0 ) {
456+ for (String oldIndex : indicesNeedingUpgrade ) {
457+ assertThat (statusResponseString , indexExists (oldIndex ), equalTo (false ));
458+ }
341459 }
342460 assertThat (
343461 statusResponseString ,
344462 getDataStreamIndices (dataStreamName ).size (),
345463 equalTo (expectedTotalIndicesInDataStream )
346464 );
465+ assertThat (statusResponseString , ((List <Object >) statusResponseMap .get ("errors" )).size (), equalTo (expectedErrorCount ));
347466 }
348467 }, 60 , TimeUnit .SECONDS );
349468 Request cancelRequest = new Request ("POST" , "_migration/reindex/" + dataStreamName + "/_cancel" );
@@ -399,6 +518,26 @@ private static void bulkLoadData(String dataStreamName) throws IOException {
399518 assertOK (response );
400519 }
401520
521+ /*
522+ * This bulkloads data, where some documents have no @timestamp field and some do.
523+ */
524+ private static void bulkLoadDataMissingTimestamp (String dataStreamName ) throws IOException {
525+ final String bulk = """
526+ {"create": {}}
527+ {"metricset": "pod", "k8s": {"pod": {"name": "cat", "network": {"tx": 2001818691, "rx": 802133794}}}}
528+ {"create": {}}
529+ {"metricset": "pod", "k8s": {"pod": {"name": "hamster", "network": {"tx": 2005177954, "rx": 801479970}}}}
530+ {"create": {}}
531+ {"metricset": "pod", "k8s": {"pod": {"name": "cow", "network": {"tx": 2006223737, "rx": 802337279}}}}
532+ {"create": {}}
533+ {"@timestamp": "$now", "metricset": "pod", "k8s": {"pod": {"name": "rat", "network": {"tx": 2012916202, "rx": 803685721}}}}
534+ """ ;
535+ var bulkRequest = new Request ("POST" , "/" + dataStreamName + "/_bulk" );
536+ bulkRequest .setJsonEntity (bulk .replace ("$now" , formatInstant (Instant .now ())));
537+ var response = client ().performRequest (bulkRequest );
538+ assertOK (response );
539+ }
540+
402541 static String formatInstant (Instant instant ) {
403542 return DateFormatter .forPattern (FormatNames .STRICT_DATE_OPTIONAL_TIME .getName ()).format (instant );
404543 }
0 commit comments