diff --git a/x-pack/plugin/migrate/src/main/java/org/elasticsearch/xpack/migrate/action/ReindexDataStreamIndexTransportAction.java b/x-pack/plugin/migrate/src/main/java/org/elasticsearch/xpack/migrate/action/ReindexDataStreamIndexTransportAction.java index f0ffa0780de5b..456c61451f7f2 100644 --- a/x-pack/plugin/migrate/src/main/java/org/elasticsearch/xpack/migrate/action/ReindexDataStreamIndexTransportAction.java +++ b/x-pack/plugin/migrate/src/main/java/org/elasticsearch/xpack/migrate/action/ReindexDataStreamIndexTransportAction.java @@ -25,6 +25,7 @@ import org.elasticsearch.action.admin.indices.refresh.RefreshAction; import org.elasticsearch.action.admin.indices.refresh.RefreshRequest; import org.elasticsearch.action.admin.indices.settings.put.UpdateSettingsRequest; +import org.elasticsearch.action.bulk.BulkItemResponse; import org.elasticsearch.action.search.SearchRequest; import org.elasticsearch.action.support.ActionFilters; import org.elasticsearch.action.support.HandledTransportAction; @@ -44,6 +45,7 @@ import org.elasticsearch.index.reindex.BulkByScrollResponse; import org.elasticsearch.index.reindex.ReindexAction; import org.elasticsearch.index.reindex.ReindexRequest; +import org.elasticsearch.index.reindex.ScrollableHitSource; import org.elasticsearch.injection.guice.Inject; import org.elasticsearch.search.builder.SearchSourceBuilder; import org.elasticsearch.tasks.Task; @@ -275,7 +277,34 @@ void reindex(String sourceIndexName, String destIndexName, ActionListener checkForFailuresListener = ActionListener.wrap(bulkByScrollResponse -> { + if (bulkByScrollResponse.getSearchFailures().isEmpty() == false) { + ScrollableHitSource.SearchFailure firstSearchFailure = bulkByScrollResponse.getSearchFailures().get(0); + listener.onFailure( + new ElasticsearchException( + "Failure reading data from {} caused by {}", + firstSearchFailure.getReason(), + sourceIndexName, + firstSearchFailure.getReason().getMessage() + ) + ); + } else if (bulkByScrollResponse.getBulkFailures().isEmpty() == false) { + BulkItemResponse.Failure firstBulkFailure = bulkByScrollResponse.getBulkFailures().get(0); + listener.onFailure( + new ElasticsearchException( + "Failure loading data from {} into {} caused by {}", + firstBulkFailure.getCause(), + sourceIndexName, + destIndexName, + firstBulkFailure.getCause().getMessage() + ) + ); + } else { + listener.onResponse(bulkByScrollResponse); + } + }, listener::onFailure); + client.execute(ReindexAction.INSTANCE, reindexRequest, checkForFailuresListener); } private void updateSettings( diff --git a/x-pack/plugin/migrate/src/test/java/org/elasticsearch/xpack/migrate/action/ReindexDataStreamIndexTransportActionTests.java b/x-pack/plugin/migrate/src/test/java/org/elasticsearch/xpack/migrate/action/ReindexDataStreamIndexTransportActionTests.java index 99e1031dec3a2..55e4da30cdf11 100644 --- a/x-pack/plugin/migrate/src/test/java/org/elasticsearch/xpack/migrate/action/ReindexDataStreamIndexTransportActionTests.java +++ b/x-pack/plugin/migrate/src/test/java/org/elasticsearch/xpack/migrate/action/ReindexDataStreamIndexTransportActionTests.java @@ -30,6 +30,7 @@ import java.util.Collections; +import static org.mockito.ArgumentMatchers.any; import static org.mockito.ArgumentMatchers.eq; import static org.mockito.Mockito.doNothing; import static org.mockito.Mockito.when; @@ -111,7 +112,7 @@ public void testReindexIncludesRateLimit() { ) ); - doNothing().when(client).execute(eq(ReindexAction.INSTANCE), request.capture(), eq(listener)); + doNothing().when(client).execute(eq(ReindexAction.INSTANCE), request.capture(), any()); action.reindex(sourceIndex, destIndex, listener, taskId); @@ -136,7 +137,7 @@ public void testReindexIncludesInfiniteRateLimit() { Collections.singleton(ReindexDataStreamIndexTransportAction.REINDEX_MAX_REQUESTS_PER_SECOND_SETTING) ) ); - doNothing().when(client).execute(eq(ReindexAction.INSTANCE), request.capture(), eq(listener)); + doNothing().when(client).execute(eq(ReindexAction.INSTANCE), request.capture(), any()); action.reindex(sourceIndex, destIndex, listener, taskId); diff --git a/x-pack/qa/rolling-upgrade/src/test/java/org/elasticsearch/upgrades/DataStreamsUpgradeIT.java b/x-pack/qa/rolling-upgrade/src/test/java/org/elasticsearch/upgrades/DataStreamsUpgradeIT.java index 2d229d7ffece5..e55e35ae0932a 100644 --- a/x-pack/qa/rolling-upgrade/src/test/java/org/elasticsearch/upgrades/DataStreamsUpgradeIT.java +++ b/x-pack/qa/rolling-upgrade/src/test/java/org/elasticsearch/upgrades/DataStreamsUpgradeIT.java @@ -183,12 +183,19 @@ public void testDataStreamValidationDoesNotBreakUpgrade() throws Exception { } public void testUpgradeDataStream() throws Exception { + /* + * This test tests upgrading a "normal" data stream (dataStreamName), and upgrading a data stream that was originally just an + * ordinary index that was converted to a data stream (dataStreamFromNonDataStreamIndices). + */ String dataStreamName = "reindex_test_data_stream"; + String dataStreamFromNonDataStreamIndices = "index_first_reindex_test_data_stream"; int numRollovers = randomIntBetween(0, 5); if (CLUSTER_TYPE == ClusterType.OLD) { createAndRolloverDataStream(dataStreamName, numRollovers); + createDataStreamFromNonDataStreamIndices(dataStreamFromNonDataStreamIndices); } else if (CLUSTER_TYPE == ClusterType.UPGRADED) { - upgradeDataStream(dataStreamName, numRollovers); + upgradeDataStream(dataStreamName, numRollovers, numRollovers + 1, 0); + upgradeDataStream(dataStreamFromNonDataStreamIndices, 0, 0, 1); } } @@ -266,7 +273,116 @@ private static void createAndRolloverDataStream(String dataStreamName, int numRo } } - private void upgradeDataStream(String dataStreamName, int numRolloversOnOldCluster) throws Exception { + private void createDataStreamFromNonDataStreamIndices(String dataStreamFromNonDataStreamIndices) throws IOException { + /* + * This method creates an index, creates an alias to that index, and then converts the aliased index into a data stream. This is + * similar to the path that many indices (including system indices) took in versions 7/8. + */ + // First, we create an ordinary index with no @timestamp mapping: + final String templateWithNoTimestamp = """ + { + "mappings":{ + "properties": { + "message": { + "type": "text" + } + } + } + } + """; + // Note that this is not a data stream template: + final String indexTemplate = """ + { + "index_patterns": ["$PATTERN"], + "template": $TEMPLATE + }"""; + var putIndexTemplateRequest = new Request("POST", "/_index_template/reindex_test_data_stream_index_template"); + putIndexTemplateRequest.setJsonEntity( + indexTemplate.replace("$TEMPLATE", templateWithNoTimestamp).replace("$PATTERN", dataStreamFromNonDataStreamIndices + "-*") + ); + assertOK(client().performRequest(putIndexTemplateRequest)); + String indexName = dataStreamFromNonDataStreamIndices + "-01"; + bulkLoadDataMissingTimestamp(indexName); + /* + * Next, we will change the index's mapping to include a @timestamp field since we are going to convert it to a data stream. But + * first we have to flush the translog to disk because adding a @timestamp field will cause errors if it is done before the translog + * is flushed: + */ + assertOK(client().performRequest(new Request("POST", indexName + "/_flush"))); + ensureHealth(indexName, (request -> { + request.addParameter("wait_for_nodes", "3"); + request.addParameter("wait_for_status", "green"); + request.addParameter("timeout", "70s"); + request.addParameter("level", "shards"); + })); + + // Updating the mapping to include @timestamp: + Request updateIndexMappingRequest = new Request("PUT", indexName + "/_mapping"); + updateIndexMappingRequest.setJsonEntity(""" + { + "properties": { + "@timestamp" : { + "type": "date" + }, + "message": { + "type": "text" + } + } + }"""); + assertOK(client().performRequest(updateIndexMappingRequest)); + + // Creating an alias with the same name that the data stream will have: + Request createAliasRequest = new Request("POST", "/_aliases"); + String aliasRequestBody = """ + { + "actions": [ + { + "add": { + "index": "$index", + "alias": "$alias" + } + } + ] + }"""; + createAliasRequest.setJsonEntity( + aliasRequestBody.replace("$index", indexName).replace("$alias", dataStreamFromNonDataStreamIndices) + ); + assertOK(client().performRequest(createAliasRequest)); + + // This is now just an aliased index. We'll convert it into a data stream + final String templateWithTimestamp = """ + { + "mappings":{ + "properties": { + "@timestamp" : { + "type": "date" + }, + "message": { + "type": "text" + } + } + } + } + """; + final String dataStreamTemplate = """ + { + "index_patterns": ["$PATTERN"], + "template": $TEMPLATE, + "data_stream": { + } + }"""; + var putDataStreamTemplateRequest = new Request("POST", "/_index_template/reindex_test_data_stream_data_stream_template"); + putDataStreamTemplateRequest.setJsonEntity( + dataStreamTemplate.replace("$TEMPLATE", templateWithTimestamp).replace("$PATTERN", dataStreamFromNonDataStreamIndices) + ); + assertOK(client().performRequest(putDataStreamTemplateRequest)); + Request migrateToDataStreamRequest = new Request("POST", "/_data_stream/_migrate/" + dataStreamFromNonDataStreamIndices); + assertOK(client().performRequest(migrateToDataStreamRequest)); + } + + @SuppressWarnings("unchecked") + private void upgradeDataStream(String dataStreamName, int numRolloversOnOldCluster, int expectedSuccessesCount, int expectedErrorCount) + throws Exception { Set indicesNeedingUpgrade = getDataStreamIndices(dataStreamName); final int explicitRolloverOnNewClusterCount = randomIntBetween(0, 2); for (int i = 0; i < explicitRolloverOnNewClusterCount; i++) { @@ -334,16 +450,19 @@ private void upgradeDataStream(String dataStreamName, int numRolloversOnOldClust statusResponseMap.get("total_indices_requiring_upgrade"), equalTo(originalWriteIndex + numRolloversOnOldCluster) ); - assertThat(statusResponseString, statusResponseMap.get("successes"), equalTo(numRolloversOnOldCluster + 1)); + assertThat(statusResponseString, statusResponseMap.get("successes"), equalTo(expectedSuccessesCount)); // We expect all the original indices to have been deleted - for (String oldIndex : indicesNeedingUpgrade) { - assertThat(statusResponseString, indexExists(oldIndex), equalTo(false)); + if (expectedErrorCount == 0) { + for (String oldIndex : indicesNeedingUpgrade) { + assertThat(statusResponseString, indexExists(oldIndex), equalTo(false)); + } } assertThat( statusResponseString, getDataStreamIndices(dataStreamName).size(), equalTo(expectedTotalIndicesInDataStream) ); + assertThat(statusResponseString, ((List) statusResponseMap.get("errors")).size(), equalTo(expectedErrorCount)); } }, 60, TimeUnit.SECONDS); Request cancelRequest = new Request("POST", "_migration/reindex/" + dataStreamName + "/_cancel"); @@ -399,6 +518,26 @@ private static void bulkLoadData(String dataStreamName) throws IOException { assertOK(response); } + /* + * This bulkloads data, where some documents have no @timestamp field and some do. + */ + private static void bulkLoadDataMissingTimestamp(String dataStreamName) throws IOException { + final String bulk = """ + {"create": {}} + {"metricset": "pod", "k8s": {"pod": {"name": "cat", "network": {"tx": 2001818691, "rx": 802133794}}}} + {"create": {}} + {"metricset": "pod", "k8s": {"pod": {"name": "hamster", "network": {"tx": 2005177954, "rx": 801479970}}}} + {"create": {}} + {"metricset": "pod", "k8s": {"pod": {"name": "cow", "network": {"tx": 2006223737, "rx": 802337279}}}} + {"create": {}} + {"@timestamp": "$now", "metricset": "pod", "k8s": {"pod": {"name": "rat", "network": {"tx": 2012916202, "rx": 803685721}}}} + """; + var bulkRequest = new Request("POST", "/" + dataStreamName + "/_bulk"); + bulkRequest.setJsonEntity(bulk.replace("$now", formatInstant(Instant.now()))); + var response = client().performRequest(bulkRequest); + assertOK(response); + } + static String formatInstant(Instant instant) { return DateFormatter.forPattern(FormatNames.STRICT_DATE_OPTIONAL_TIME.getName()).format(instant); }