From 44411e4b9fc8986c7c8577f17742e8080fad836a Mon Sep 17 00:00:00 2001 From: Luke Whiting Date: Wed, 2 Oct 2024 14:56:51 +0100 Subject: [PATCH 01/11] Add data stream template validation to snapshot restore --- .../snapshots/RestoreService.java | 27 +++++++++++++++++++ 1 file changed, 27 insertions(+) diff --git a/server/src/main/java/org/elasticsearch/snapshots/RestoreService.java b/server/src/main/java/org/elasticsearch/snapshots/RestoreService.java index e901dc28ea541..9c9da59f928ac 100644 --- a/server/src/main/java/org/elasticsearch/snapshots/RestoreService.java +++ b/server/src/main/java/org/elasticsearch/snapshots/RestoreService.java @@ -52,6 +52,7 @@ import org.elasticsearch.common.Strings; import org.elasticsearch.common.UUIDs; import org.elasticsearch.common.collect.ImmutableOpenMap; +import org.elasticsearch.common.logging.HeaderWarning; import org.elasticsearch.common.lucene.Lucene; import org.elasticsearch.common.regex.Regex; import org.elasticsearch.common.settings.ClusterSettings; @@ -398,6 +399,8 @@ private void startRestore( Map dataStreamsToRestore = result.v1(); Map dataStreamAliasesToRestore = result.v2(); + validateDatastreamTemplatesExistAndWarnIfMissing(dataStreamsToRestore, snapshotInfo); + // Remove the data streams from the list of requested indices requestIndices.removeAll(dataStreamsToRestore.keySet()); @@ -510,6 +513,30 @@ private void startRestore( ); } + private void validateDatastreamTemplatesExistAndWarnIfMissing(Map dataStreamsToRestore, SnapshotInfo snapshotInfo) { + var templatePatterns = clusterService.state() + .metadata() + .templatesV2() + .values() + .stream() + .filter(cit -> cit.getDataStreamTemplate() != null) + .flatMap(cit -> cit.indexPatterns().stream()) + .collect(Collectors.toSet()); + + for (String name : dataStreamsToRestore.keySet()) { + if (templatePatterns.stream().noneMatch(pattern -> Regex.simpleMatch(pattern, name))) { + String warningMessage = format( + "Snapshot [%s] contains data stream [%s] but custer does not have a matching index template. This will cause" + + " rollover to fail until a matching index template is created", + snapshotInfo.snapshotId(), + name + ); + logger.warn(() -> warningMessage); + HeaderWarning.addWarning(warningMessage); + } + } + } + @SuppressForbidden(reason = "legacy usage of unbatched task") // TODO add support for batching here private void submitUnbatchedTask(@SuppressWarnings("SameParameterValue") String source, ClusterStateUpdateTask task) { clusterService.submitUnbatchedStateUpdateTask(source, task); From ebe2d5d2ce9422b43569f52a089bb6eddc239511 Mon Sep 17 00:00:00 2001 From: Luke Whiting Date: Wed, 2 Oct 2024 15:32:31 +0100 Subject: [PATCH 02/11] Add data stream template validation to data stream promotion endpoint --- .../PromoteDataStreamTransportAction.java | 44 ++++++++++++++++++- 1 file changed, 42 insertions(+), 2 deletions(-) diff --git a/modules/data-streams/src/main/java/org/elasticsearch/datastreams/action/PromoteDataStreamTransportAction.java b/modules/data-streams/src/main/java/org/elasticsearch/datastreams/action/PromoteDataStreamTransportAction.java index b9f5bdea9c90e..dd597030f3f0d 100644 --- a/modules/data-streams/src/main/java/org/elasticsearch/datastreams/action/PromoteDataStreamTransportAction.java +++ b/modules/data-streams/src/main/java/org/elasticsearch/datastreams/action/PromoteDataStreamTransportAction.java @@ -8,6 +8,8 @@ */ package org.elasticsearch.datastreams.action; +import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.Logger; import org.elasticsearch.ResourceNotFoundException; import org.elasticsearch.action.ActionListener; import org.elasticsearch.action.datastreams.PromoteDataStreamAction; @@ -23,6 +25,8 @@ import org.elasticsearch.cluster.metadata.Metadata; import org.elasticsearch.cluster.service.ClusterService; import org.elasticsearch.common.Priority; +import org.elasticsearch.common.logging.HeaderWarning; +import org.elasticsearch.common.regex.Regex; import org.elasticsearch.common.util.concurrent.EsExecutors; import org.elasticsearch.core.SuppressForbidden; import org.elasticsearch.indices.SystemIndices; @@ -31,9 +35,14 @@ import org.elasticsearch.threadpool.ThreadPool; import org.elasticsearch.transport.TransportService; +import static org.elasticsearch.core.Strings.format; + public class PromoteDataStreamTransportAction extends AcknowledgedTransportMasterNodeAction { + private static final Logger logger = LogManager.getLogger(PromoteDataStreamTransportAction.class); + private final SystemIndices systemIndices; + private final ClusterService clusterService; @Inject public PromoteDataStreamTransportAction( @@ -55,6 +64,7 @@ public PromoteDataStreamTransportAction( EsExecutors.DIRECT_EXECUTOR_SERVICE ); this.systemIndices = systemIndices; + this.clusterService = clusterService; } @Override @@ -76,7 +86,7 @@ public void onFailure(Exception e) { @Override public ClusterState execute(ClusterState currentState) { - return promoteDataStream(currentState, request); + return promoteDataStream(currentState, request, clusterService); } @Override @@ -92,18 +102,48 @@ private void submitUnbatchedTask(@SuppressWarnings("SameParameterValue") String clusterService.submitUnbatchedStateUpdateTask(source, task); } - static ClusterState promoteDataStream(ClusterState currentState, PromoteDataStreamAction.Request request) { + static ClusterState promoteDataStream( + ClusterState currentState, + PromoteDataStreamAction.Request request, + ClusterService clusterService + ) { DataStream dataStream = currentState.getMetadata().dataStreams().get(request.getName()); + if (dataStream == null) { throw new ResourceNotFoundException("data stream [" + request.getName() + "] does not exist"); } + warnIfTemplateMissingForDatastream(dataStream, clusterService); + DataStream promotedDataStream = dataStream.promoteDataStream(); Metadata.Builder metadata = Metadata.builder(currentState.metadata()); metadata.put(promotedDataStream); return ClusterState.builder(currentState).metadata(metadata).build(); } + private static void warnIfTemplateMissingForDatastream(DataStream dataStream, ClusterService clusterService) { + var datastreamName = dataStream.getName(); + + var matchingIndex = clusterService.state() + .metadata() + .templatesV2() + .values() + .stream() + .filter(cit -> cit.getDataStreamTemplate() != null) + .flatMap(cit -> cit.indexPatterns().stream()) + .anyMatch(pattern -> Regex.simpleMatch(pattern, datastreamName)); + + if (matchingIndex == false) { + String warningMessage = format( + "Data stream [%s] does not have a matching index template. This will cause rollover to fail until a matching index " + + "template is created", + datastreamName + ); + logger.warn(() -> warningMessage); + HeaderWarning.addWarning(warningMessage); + } + } + @Override protected ClusterBlockException checkBlock(PromoteDataStreamAction.Request request, ClusterState state) { return state.blocks().globalBlockedException(ClusterBlockLevel.METADATA_WRITE); From 3ca998790eb4f8ae299e363b679a0dd934d65695 Mon Sep 17 00:00:00 2001 From: Luke Whiting Date: Wed, 9 Oct 2024 14:55:08 +0100 Subject: [PATCH 03/11] Add new assertion for response headers Add a new assertion to synchronously execute a request and check the response contains a specific warning header --- .../hamcrest/ElasticsearchAssertions.java | 70 +++++++++++++++++++ 1 file changed, 70 insertions(+) diff --git a/test/framework/src/main/java/org/elasticsearch/test/hamcrest/ElasticsearchAssertions.java b/test/framework/src/main/java/org/elasticsearch/test/hamcrest/ElasticsearchAssertions.java index ad61a63f7c46e..a9f659e0226d1 100644 --- a/test/framework/src/main/java/org/elasticsearch/test/hamcrest/ElasticsearchAssertions.java +++ b/test/framework/src/main/java/org/elasticsearch/test/hamcrest/ElasticsearchAssertions.java @@ -14,7 +14,9 @@ import org.elasticsearch.ElasticsearchException; import org.elasticsearch.ExceptionsHelper; import org.elasticsearch.action.ActionFuture; +import org.elasticsearch.action.ActionListener; import org.elasticsearch.action.ActionRequest; +import org.elasticsearch.action.ActionRequestBuilder; import org.elasticsearch.action.ActionResponse; import org.elasticsearch.action.RequestBuilder; import org.elasticsearch.action.admin.cluster.health.ClusterHealthRequestBuilder; @@ -873,6 +875,74 @@ public static void awaitLatch(CountDownLatch latch, long timeout, TimeUnit unit) assertThat(message, isCountedDown, is(true)); } + /** + * Check the response of a client request to ensure it has a warning header that contains the provided string + * + * Currently, this allows a fixed 10 seconds for response to be received + * @param client Client making the request - Required to access the response headers + * @param requestBuilder Request to be made + * @param toMatch String to match in the warning headers + * @param Type of the response + * @throws InterruptedException If the request times out + */ + public static void assertWarningHeaderOnResponse( + Client client, + ActionRequestBuilder requestBuilder, + String toMatch + ) throws InterruptedException { + assertWarningHeaderMatchOnResponse(client, requestBuilder, hasItem(containsString(toMatch))); + } + + /** + * Check the response of a client request to ensure it does not have a warning header that contains the provided string + * + * Currently, this allows a fixed 10 seconds for response to be received + * @param client Client making the request - Required to access the response headers + * @param requestBuilder Request to be made + * @param toMatch String to not match in the warning headers + * @param Type of the response + * @throws InterruptedException If the request times out + */ + public static void assertNoWarningHeaderOnResponse( + Client client, + ActionRequestBuilder requestBuilder, + String toMatch + ) throws InterruptedException { + assertWarningHeaderMatchOnResponse(client, requestBuilder, not(hasItem(containsString(toMatch)))); + } + + private static void assertWarningHeaderMatchOnResponse( + Client client, + ActionRequestBuilder requestBuilder, + Matcher> matcher + ) throws InterruptedException { + CountDownLatch latch = new CountDownLatch(2); + requestBuilder.execute(new ActionListener<>() { + @Override + public void onResponse(T response) { + try { + final var warningHeaders = client.threadPool().getThreadContext().getResponseHeaders().get("Warning"); + assertThat(warningHeaders, matcher); + } finally { + latch.countDown(); + } + } + + @Override + public void onFailure(Exception e) { + try { + throw new AssertionError("Failed to execute request", e); + } finally { + latch.countDown(); + } + } + }); + latch.countDown(); + if (latch.await(10, TimeUnit.SECONDS) == false) { + fail("Did not receive request response before timeout"); + } + } + /** * Compares two maps recursively, using arrays comparisons for byte[] through Arrays.equals(byte[], byte[]) */ From cae9f35c6879d6526c9c43d1b99ec35cbb63dc75 Mon Sep 17 00:00:00 2001 From: Luke Whiting Date: Wed, 9 Oct 2024 14:55:58 +0100 Subject: [PATCH 04/11] Test for warning header on snapshot restore When missing templates --- .../datastreams/DataStreamsSnapshotsIT.java | 106 +++++++++++++++++- 1 file changed, 104 insertions(+), 2 deletions(-) diff --git a/modules/data-streams/src/internalClusterTest/java/org/elasticsearch/datastreams/DataStreamsSnapshotsIT.java b/modules/data-streams/src/internalClusterTest/java/org/elasticsearch/datastreams/DataStreamsSnapshotsIT.java index 36fb02dcff0d8..505c68650559d 100644 --- a/modules/data-streams/src/internalClusterTest/java/org/elasticsearch/datastreams/DataStreamsSnapshotsIT.java +++ b/modules/data-streams/src/internalClusterTest/java/org/elasticsearch/datastreams/DataStreamsSnapshotsIT.java @@ -15,6 +15,7 @@ import org.elasticsearch.action.admin.cluster.snapshots.create.CreateSnapshotRequest; import org.elasticsearch.action.admin.cluster.snapshots.create.CreateSnapshotResponse; import org.elasticsearch.action.admin.cluster.snapshots.restore.RestoreSnapshotRequest; +import org.elasticsearch.action.admin.cluster.snapshots.restore.RestoreSnapshotRequestBuilder; import org.elasticsearch.action.admin.cluster.snapshots.restore.RestoreSnapshotResponse; import org.elasticsearch.action.admin.indices.alias.Alias; import org.elasticsearch.action.admin.indices.alias.IndicesAliasesRequest; @@ -45,6 +46,7 @@ import org.elasticsearch.search.SearchHit; import org.elasticsearch.snapshots.AbstractSnapshotIntegTestCase; import org.elasticsearch.snapshots.RestoreInfo; +import org.elasticsearch.snapshots.SnapshotId; import org.elasticsearch.snapshots.SnapshotInProgressException; import org.elasticsearch.snapshots.SnapshotInfo; import org.elasticsearch.snapshots.SnapshotRestoreException; @@ -62,7 +64,9 @@ import java.util.stream.Collectors; import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertAcked; +import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertNoWarningHeaderOnResponse; import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertResponse; +import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertWarningHeaderOnResponse; import static org.hamcrest.Matchers.anEmptyMap; import static org.hamcrest.Matchers.contains; import static org.hamcrest.Matchers.containsInAnyOrder; @@ -80,6 +84,8 @@ public class DataStreamsSnapshotsIT extends AbstractSnapshotIntegTestCase { private static final Map DOCUMENT_SOURCE = Collections.singletonMap("@timestamp", 123); public static final String REPO = "repo"; public static final String SNAPSHOT = "snap"; + public static final String TEMPLATE_1_ID = "t1"; + public static final String TEMPLATE_2_ID = "t2"; private Client client; private String dsBackingIndexName; @@ -103,8 +109,8 @@ public void setup() throws Exception { Path location = randomRepoPath(); createRepository(REPO, "fs", location); - DataStreamIT.putComposableIndexTemplate("t1", List.of("ds", "other-ds")); - DataStreamIT.putComposableIndexTemplate("t2", """ + DataStreamIT.putComposableIndexTemplate(TEMPLATE_1_ID, List.of("ds", "other-ds")); + DataStreamIT.putComposableIndexTemplate(TEMPLATE_2_ID, """ { "properties": { "@timestamp": { @@ -1335,4 +1341,100 @@ public void testRestoreDataStreamAliasWithConflictingIndicesAlias() throws Excep ); assertThat(e.getMessage(), containsString("data stream alias and indices alias have the same name (my-alias)")); } + + public void testWarningHeaderOnRestoreWithoutTemplates() throws Exception { + String datastreamName = "ds"; + + CreateSnapshotResponse createSnapshotResponse = client.admin() + .cluster() + .prepareCreateSnapshot(TEST_REQUEST_TIMEOUT, REPO, SNAPSHOT) + .setWaitForCompletion(true) + .setIndices(datastreamName) + .setIncludeGlobalState(false) + .get(); + + RestStatus status = createSnapshotResponse.getSnapshotInfo().status(); + SnapshotId snapshotId = createSnapshotResponse.getSnapshotInfo().snapshotId(); + assertEquals(RestStatus.OK, status); + + assertEquals(Collections.singletonList(dsBackingIndexName), getSnapshot(REPO, SNAPSHOT).indices()); + + assertAcked( + client.execute( + DeleteDataStreamAction.INSTANCE, + new DeleteDataStreamAction.Request(TEST_REQUEST_TIMEOUT, datastreamName, "other-ds", "with-fs") + ) + ); + + assertAcked( + client.execute( + TransportDeleteComposableIndexTemplateAction.TYPE, + new TransportDeleteComposableIndexTemplateAction.Request(TEMPLATE_1_ID) + ).get() + ); + + assertAcked( + client.execute( + TransportDeleteComposableIndexTemplateAction.TYPE, + new TransportDeleteComposableIndexTemplateAction.Request(TEMPLATE_2_ID) + ).get() + ); + + RestoreSnapshotRequestBuilder request = client.admin() + .cluster() + .prepareRestoreSnapshot(TEST_REQUEST_TIMEOUT, REPO, SNAPSHOT) + .setWaitForCompletion(true) + .setIndices(datastreamName); + + assertWarningHeaderOnResponse( + client, + request, + "Snapshot [" + + snapshotId + + "] contains data stream [" + + datastreamName + + "] but custer does not have a matching index " + + "template. This will cause rollover to fail until a matching index template is created" + ); + + } + + public void testWarningHeaderAbsentOnRestoreWithTemplates() throws Exception { + String datastreamName = "ds"; + + CreateSnapshotResponse createSnapshotResponse = client.admin() + .cluster() + .prepareCreateSnapshot(TEST_REQUEST_TIMEOUT, REPO, SNAPSHOT) + .setWaitForCompletion(true) + .setIndices(datastreamName) + .setIncludeGlobalState(false) + .get(); + + RestStatus status = createSnapshotResponse.getSnapshotInfo().status(); + SnapshotId snapshotId = createSnapshotResponse.getSnapshotInfo().snapshotId(); + assertEquals(RestStatus.OK, status); + + assertEquals(Collections.singletonList(dsBackingIndexName), getSnapshot(REPO, SNAPSHOT).indices()); + + assertAcked( + client.execute( + DeleteDataStreamAction.INSTANCE, + new DeleteDataStreamAction.Request(TEST_REQUEST_TIMEOUT, datastreamName, "other-ds", "with-fs") + ) + ); + + RestoreSnapshotRequestBuilder request = client.admin() + .cluster() + .prepareRestoreSnapshot(TEST_REQUEST_TIMEOUT, REPO, SNAPSHOT) + .setWaitForCompletion(true) + .setIndices(datastreamName); + + assertNoWarningHeaderOnResponse( + client, + request, + "but custer does not have a matching index template. This will cause rollover to fail until a matching index " + + "template is created" + ); + + } } From c287da956bdd7f38feb69042df1ef97e150f30d9 Mon Sep 17 00:00:00 2001 From: Luke Whiting Date: Thu, 10 Oct 2024 15:22:14 +0100 Subject: [PATCH 05/11] Test for promotion warnings --- .../elasticsearch/xpack/ccr/AutoFollowIT.java | 110 ++++++++++++++++++ 1 file changed, 110 insertions(+) diff --git a/x-pack/plugin/ccr/qa/multi-cluster/src/test/java/org/elasticsearch/xpack/ccr/AutoFollowIT.java b/x-pack/plugin/ccr/qa/multi-cluster/src/test/java/org/elasticsearch/xpack/ccr/AutoFollowIT.java index 6f66e7e386066..ce7d4cbb4e08a 100644 --- a/x-pack/plugin/ccr/qa/multi-cluster/src/test/java/org/elasticsearch/xpack/ccr/AutoFollowIT.java +++ b/x-pack/plugin/ccr/qa/multi-cluster/src/test/java/org/elasticsearch/xpack/ccr/AutoFollowIT.java @@ -13,6 +13,7 @@ import org.elasticsearch.client.Response; import org.elasticsearch.client.ResponseException; import org.elasticsearch.client.RestClient; +import org.elasticsearch.client.WarningFailureException; import org.elasticsearch.cluster.metadata.DataStream; import org.elasticsearch.common.Strings; import org.elasticsearch.common.settings.SecureString; @@ -1120,6 +1121,115 @@ public void testAutoFollowSearchableSnapshotsFails() throws Exception { } } + public void testNoWarningOnPromoteDatastreamWhenTemplateExistsOnFollower() throws Exception { + if ("follow".equals(targetCluster) == false) { + return; + } + testDatastreamPromotionWarnings(true); + } + + public void testWarningOnPromoteDatastreamWhenTemplateDoesNotExistsOnFollower() { + if ("follow".equals(targetCluster) == false) { + return; + } + WarningFailureException exception = assertThrows(WarningFailureException.class, () -> testDatastreamPromotionWarnings(false)); + assertThat( + exception.getMessage(), + containsString( + "does not have a matching index template. " + "This will cause rollover to fail until a matching index template is created]" + ) + ); + } + + private void testDatastreamPromotionWarnings(Boolean createFollowerTemplate) throws Exception { + final int numDocs = 64; + final String dataStreamName = getTestName().toLowerCase(Locale.ROOT) + "-dopromo"; + final String autoFollowPatternName = getTestName().toLowerCase(Locale.ROOT); + + int initialNumberOfSuccessfulFollowedIndices = getNumberOfSuccessfulFollowedIndices(); + List backingIndexNames = null; + try { + // Create index template + Request putComposableIndexTemplateRequest = new Request("POST", "/_index_template/" + getTestName().toLowerCase(Locale.ROOT)); + putComposableIndexTemplateRequest.setJsonEntity("{\"index_patterns\":[\"" + dataStreamName + "*\"],\"data_stream\":{}}"); + + if (createFollowerTemplate) { + assertOK(client().performRequest(putComposableIndexTemplateRequest)); + } + + // Create auto follow pattern + createAutoFollowPattern(client(), autoFollowPatternName, dataStreamName + "*", "leader_cluster", null); + + // Create data stream and ensure that it is auto followed + try (var leaderClient = buildLeaderClient()) { + assertOK(leaderClient.performRequest(putComposableIndexTemplateRequest)); + + for (int i = 0; i < numDocs; i++) { + var indexRequest = new Request("POST", "/" + dataStreamName + "/_doc"); + indexRequest.addParameter("refresh", "true"); + indexRequest.setJsonEntity("{\"@timestamp\": \"" + DATE_FORMAT.format(new Date()) + "\",\"message\":\"abc\"}"); + assertOK(leaderClient.performRequest(indexRequest)); + } + verifyDataStream(leaderClient, dataStreamName, backingIndexName(dataStreamName, 1)); + verifyDocuments(leaderClient, dataStreamName, numDocs); + } + assertBusy(() -> { + assertThat(getNumberOfSuccessfulFollowedIndices(), equalTo(initialNumberOfSuccessfulFollowedIndices + 1)); + verifyDataStream(client(), dataStreamName, backingIndexName(dataStreamName, 1)); + ensureYellow(dataStreamName); + verifyDocuments(client(), dataStreamName, numDocs); + }); + + // Rollover in leader cluster and ensure second backing index is replicated: + try (var leaderClient = buildLeaderClient()) { + var rolloverRequest = new Request("POST", "/" + dataStreamName + "/_rollover"); + assertOK(leaderClient.performRequest(rolloverRequest)); + verifyDataStream(leaderClient, dataStreamName, backingIndexName(dataStreamName, 1), backingIndexName(dataStreamName, 2)); + + var indexRequest = new Request("POST", "/" + dataStreamName + "/_doc"); + indexRequest.addParameter("refresh", "true"); + indexRequest.setJsonEntity("{\"@timestamp\": \"" + DATE_FORMAT.format(new Date()) + "\",\"message\":\"abc\"}"); + assertOK(leaderClient.performRequest(indexRequest)); + verifyDocuments(leaderClient, dataStreamName, numDocs + 1); + } + assertBusy(() -> { + assertThat(getNumberOfSuccessfulFollowedIndices(), equalTo(initialNumberOfSuccessfulFollowedIndices + 2)); + verifyDataStream(client(), dataStreamName, backingIndexName(dataStreamName, 1), backingIndexName(dataStreamName, 2)); + ensureYellow(dataStreamName); + verifyDocuments(client(), dataStreamName, numDocs + 1); + }); + + backingIndexNames = verifyDataStream( + client(), + dataStreamName, + backingIndexName(dataStreamName, 1), + backingIndexName(dataStreamName, 2) + ); + + // Promote local data stream + var promoteRequest = new Request("POST", "/_data_stream/_promote/" + dataStreamName); + Response response = client().performRequest(promoteRequest); + assertOK(response); + } finally { + if (backingIndexNames == null) { + // we failed to compute the actual backing index names in the test because we failed earlier on, guessing them on a + // best-effort basis + backingIndexNames = List.of(backingIndexName(dataStreamName, 1), backingIndexName(dataStreamName, 2)); + } + + cleanUpFollower(backingIndexNames, List.of(dataStreamName), List.of(autoFollowPatternName)); + cleanUpLeader(backingIndexNames.subList(0, 1), List.of(dataStreamName), List.of()); + Request deleteTemplateRequest = new Request("DELETE", "/_index_template/" + getTestName().toLowerCase(Locale.ROOT)); + if (createFollowerTemplate) { + assertOK(client().performRequest(deleteTemplateRequest)); + } + try (var leaderClient = buildLeaderClient()) { + assertOK(leaderClient.performRequest(deleteTemplateRequest)); + } + } + + } + private int getNumberOfSuccessfulFollowedIndices() throws IOException { return getNumberOfSuccessfulFollowedIndices(client()); } From 797e1ec9ca5125c421e27c21ea7bcff3cc37b0a9 Mon Sep 17 00:00:00 2001 From: Luke Whiting Date: Fri, 11 Oct 2024 10:31:47 +0100 Subject: [PATCH 06/11] Add documentation for the potential error states --- docs/reference/data-streams/promote-data-stream-api.asciidoc | 3 +++ docs/reference/snapshot-restore/index.asciidoc | 2 ++ 2 files changed, 5 insertions(+) diff --git a/docs/reference/data-streams/promote-data-stream-api.asciidoc b/docs/reference/data-streams/promote-data-stream-api.asciidoc index 111c7a2256f8a..46d52b4a09b75 100644 --- a/docs/reference/data-streams/promote-data-stream-api.asciidoc +++ b/docs/reference/data-streams/promote-data-stream-api.asciidoc @@ -18,6 +18,9 @@ available, the data stream in the local cluster can be promoted to a regular data stream, which allows these data streams to be rolled over in the local cluster. +NOTE: When promoting a data stream, ensure the local cluster has a data stream enabled index template that matches the data stream. +If this is missing, the data stream will not be able to roll over until a matching index template is created. + [source,console] ---- POST /_data_stream/_promote/my-data-stream diff --git a/docs/reference/snapshot-restore/index.asciidoc b/docs/reference/snapshot-restore/index.asciidoc index 390f6664391bd..92e0ef3123efa 100644 --- a/docs/reference/snapshot-restore/index.asciidoc +++ b/docs/reference/snapshot-restore/index.asciidoc @@ -48,6 +48,8 @@ Snapshots don't contain or back up: * Node configuration files * <> +NOTE: When restoring a data stream, if the target cluster does not have an index template that matches the data stream, the data stream will not be able to roll over until a matching index template is created. + [discrete] [[feature-state]] === Feature states From 68447979f7454cb857add4b7cd2b14c0adffbb9f Mon Sep 17 00:00:00 2001 From: Luke Whiting Date: Mon, 14 Oct 2024 13:10:02 +0100 Subject: [PATCH 07/11] PR changes --- .../data-streams/promote-data-stream-api.asciidoc | 1 + docs/reference/snapshot-restore/index.asciidoc | 1 + .../datastreams/DataStreamsSnapshotsIT.java | 9 +-------- .../action/PromoteDataStreamTransportAction.java | 11 +++++------ .../test/hamcrest/ElasticsearchAssertions.java | 3 +-- .../org/elasticsearch/xpack/ccr/AutoFollowIT.java | 8 ++++---- 6 files changed, 13 insertions(+), 20 deletions(-) diff --git a/docs/reference/data-streams/promote-data-stream-api.asciidoc b/docs/reference/data-streams/promote-data-stream-api.asciidoc index 46d52b4a09b75..5ba9c4d9fad0e 100644 --- a/docs/reference/data-streams/promote-data-stream-api.asciidoc +++ b/docs/reference/data-streams/promote-data-stream-api.asciidoc @@ -20,6 +20,7 @@ be rolled over in the local cluster. NOTE: When promoting a data stream, ensure the local cluster has a data stream enabled index template that matches the data stream. If this is missing, the data stream will not be able to roll over until a matching index template is created. +This will affect the lifecycle management of the data stream and interfere with the data stream size and retention. [source,console] ---- diff --git a/docs/reference/snapshot-restore/index.asciidoc b/docs/reference/snapshot-restore/index.asciidoc index 92e0ef3123efa..33645034c30a1 100644 --- a/docs/reference/snapshot-restore/index.asciidoc +++ b/docs/reference/snapshot-restore/index.asciidoc @@ -49,6 +49,7 @@ Snapshots don't contain or back up: * <> NOTE: When restoring a data stream, if the target cluster does not have an index template that matches the data stream, the data stream will not be able to roll over until a matching index template is created. +This will affect the lifecycle management of the data stream and interfere with the data stream size and retention. [discrete] [[feature-state]] diff --git a/modules/data-streams/src/internalClusterTest/java/org/elasticsearch/datastreams/DataStreamsSnapshotsIT.java b/modules/data-streams/src/internalClusterTest/java/org/elasticsearch/datastreams/DataStreamsSnapshotsIT.java index 505c68650559d..d455e62c39903 100644 --- a/modules/data-streams/src/internalClusterTest/java/org/elasticsearch/datastreams/DataStreamsSnapshotsIT.java +++ b/modules/data-streams/src/internalClusterTest/java/org/elasticsearch/datastreams/DataStreamsSnapshotsIT.java @@ -1362,7 +1362,7 @@ public void testWarningHeaderOnRestoreWithoutTemplates() throws Exception { assertAcked( client.execute( DeleteDataStreamAction.INSTANCE, - new DeleteDataStreamAction.Request(TEST_REQUEST_TIMEOUT, datastreamName, "other-ds", "with-fs") + new DeleteDataStreamAction.Request(TEST_REQUEST_TIMEOUT, datastreamName, "other-ds") ) ); @@ -1373,13 +1373,6 @@ public void testWarningHeaderOnRestoreWithoutTemplates() throws Exception { ).get() ); - assertAcked( - client.execute( - TransportDeleteComposableIndexTemplateAction.TYPE, - new TransportDeleteComposableIndexTemplateAction.Request(TEMPLATE_2_ID) - ).get() - ); - RestoreSnapshotRequestBuilder request = client.admin() .cluster() .prepareRestoreSnapshot(TEST_REQUEST_TIMEOUT, REPO, SNAPSHOT) diff --git a/modules/data-streams/src/main/java/org/elasticsearch/datastreams/action/PromoteDataStreamTransportAction.java b/modules/data-streams/src/main/java/org/elasticsearch/datastreams/action/PromoteDataStreamTransportAction.java index dd597030f3f0d..eda010d09b28a 100644 --- a/modules/data-streams/src/main/java/org/elasticsearch/datastreams/action/PromoteDataStreamTransportAction.java +++ b/modules/data-streams/src/main/java/org/elasticsearch/datastreams/action/PromoteDataStreamTransportAction.java @@ -86,7 +86,7 @@ public void onFailure(Exception e) { @Override public ClusterState execute(ClusterState currentState) { - return promoteDataStream(currentState, request, clusterService); + return promoteDataStream(currentState, request); } @Override @@ -104,8 +104,7 @@ private void submitUnbatchedTask(@SuppressWarnings("SameParameterValue") String static ClusterState promoteDataStream( ClusterState currentState, - PromoteDataStreamAction.Request request, - ClusterService clusterService + PromoteDataStreamAction.Request request ) { DataStream dataStream = currentState.getMetadata().dataStreams().get(request.getName()); @@ -113,7 +112,7 @@ static ClusterState promoteDataStream( throw new ResourceNotFoundException("data stream [" + request.getName() + "] does not exist"); } - warnIfTemplateMissingForDatastream(dataStream, clusterService); + warnIfTemplateMissingForDatastream(dataStream, currentState); DataStream promotedDataStream = dataStream.promoteDataStream(); Metadata.Builder metadata = Metadata.builder(currentState.metadata()); @@ -121,10 +120,10 @@ static ClusterState promoteDataStream( return ClusterState.builder(currentState).metadata(metadata).build(); } - private static void warnIfTemplateMissingForDatastream(DataStream dataStream, ClusterService clusterService) { + private static void warnIfTemplateMissingForDatastream(DataStream dataStream, ClusterState currentState) { var datastreamName = dataStream.getName(); - var matchingIndex = clusterService.state() + var matchingIndex = currentState .metadata() .templatesV2() .values() diff --git a/test/framework/src/main/java/org/elasticsearch/test/hamcrest/ElasticsearchAssertions.java b/test/framework/src/main/java/org/elasticsearch/test/hamcrest/ElasticsearchAssertions.java index a9f659e0226d1..552e301650d9d 100644 --- a/test/framework/src/main/java/org/elasticsearch/test/hamcrest/ElasticsearchAssertions.java +++ b/test/framework/src/main/java/org/elasticsearch/test/hamcrest/ElasticsearchAssertions.java @@ -916,7 +916,7 @@ private static void assertWarningHeaderMatchOnRespons ActionRequestBuilder requestBuilder, Matcher> matcher ) throws InterruptedException { - CountDownLatch latch = new CountDownLatch(2); + CountDownLatch latch = new CountDownLatch(1); requestBuilder.execute(new ActionListener<>() { @Override public void onResponse(T response) { @@ -937,7 +937,6 @@ public void onFailure(Exception e) { } } }); - latch.countDown(); if (latch.await(10, TimeUnit.SECONDS) == false) { fail("Did not receive request response before timeout"); } diff --git a/x-pack/plugin/ccr/qa/multi-cluster/src/test/java/org/elasticsearch/xpack/ccr/AutoFollowIT.java b/x-pack/plugin/ccr/qa/multi-cluster/src/test/java/org/elasticsearch/xpack/ccr/AutoFollowIT.java index ce7d4cbb4e08a..77af985a3e7bb 100644 --- a/x-pack/plugin/ccr/qa/multi-cluster/src/test/java/org/elasticsearch/xpack/ccr/AutoFollowIT.java +++ b/x-pack/plugin/ccr/qa/multi-cluster/src/test/java/org/elasticsearch/xpack/ccr/AutoFollowIT.java @@ -1125,23 +1125,23 @@ public void testNoWarningOnPromoteDatastreamWhenTemplateExistsOnFollower() throw if ("follow".equals(targetCluster) == false) { return; } - testDatastreamPromotionWarnings(true); + testDataStreamPromotionWarnings(true); } public void testWarningOnPromoteDatastreamWhenTemplateDoesNotExistsOnFollower() { if ("follow".equals(targetCluster) == false) { return; } - WarningFailureException exception = assertThrows(WarningFailureException.class, () -> testDatastreamPromotionWarnings(false)); + WarningFailureException exception = assertThrows(WarningFailureException.class, () -> testDataStreamPromotionWarnings(false)); assertThat( exception.getMessage(), containsString( - "does not have a matching index template. " + "This will cause rollover to fail until a matching index template is created]" + "does not have a matching index template. This will cause rollover to fail until a matching index template is created]" ) ); } - private void testDatastreamPromotionWarnings(Boolean createFollowerTemplate) throws Exception { + private void testDataStreamPromotionWarnings(Boolean createFollowerTemplate) throws Exception { final int numDocs = 64; final String dataStreamName = getTestName().toLowerCase(Locale.ROOT) + "-dopromo"; final String autoFollowPatternName = getTestName().toLowerCase(Locale.ROOT); From 7d430dfd0f0153dc3590bfc16a7d26afed2e55a8 Mon Sep 17 00:00:00 2001 From: Luke Whiting Date: Mon, 14 Oct 2024 13:23:37 +0100 Subject: [PATCH 08/11] Spotless reformatting --- .../action/PromoteDataStreamTransportAction.java | 8 ++------ 1 file changed, 2 insertions(+), 6 deletions(-) diff --git a/modules/data-streams/src/main/java/org/elasticsearch/datastreams/action/PromoteDataStreamTransportAction.java b/modules/data-streams/src/main/java/org/elasticsearch/datastreams/action/PromoteDataStreamTransportAction.java index eda010d09b28a..8b4ab3e2df3c9 100644 --- a/modules/data-streams/src/main/java/org/elasticsearch/datastreams/action/PromoteDataStreamTransportAction.java +++ b/modules/data-streams/src/main/java/org/elasticsearch/datastreams/action/PromoteDataStreamTransportAction.java @@ -102,10 +102,7 @@ private void submitUnbatchedTask(@SuppressWarnings("SameParameterValue") String clusterService.submitUnbatchedStateUpdateTask(source, task); } - static ClusterState promoteDataStream( - ClusterState currentState, - PromoteDataStreamAction.Request request - ) { + static ClusterState promoteDataStream(ClusterState currentState, PromoteDataStreamAction.Request request) { DataStream dataStream = currentState.getMetadata().dataStreams().get(request.getName()); if (dataStream == null) { @@ -123,8 +120,7 @@ static ClusterState promoteDataStream( private static void warnIfTemplateMissingForDatastream(DataStream dataStream, ClusterState currentState) { var datastreamName = dataStream.getName(); - var matchingIndex = currentState - .metadata() + var matchingIndex = currentState.metadata() .templatesV2() .values() .stream() From 5646cbba0e51f004274ef08e1765b62c68ceb9bb Mon Sep 17 00:00:00 2001 From: Luke Whiting Date: Mon, 14 Oct 2024 16:10:47 +0100 Subject: [PATCH 09/11] Add logic to look in snapshot global metadata This checks if the snapshot contains a matching template for the DS --- .../datastreams/DataStreamsSnapshotsIT.java | 56 +++++++++++++++++++ .../snapshots/RestoreService.java | 22 +++++--- 2 files changed, 70 insertions(+), 8 deletions(-) diff --git a/modules/data-streams/src/internalClusterTest/java/org/elasticsearch/datastreams/DataStreamsSnapshotsIT.java b/modules/data-streams/src/internalClusterTest/java/org/elasticsearch/datastreams/DataStreamsSnapshotsIT.java index d455e62c39903..638e4d813a79a 100644 --- a/modules/data-streams/src/internalClusterTest/java/org/elasticsearch/datastreams/DataStreamsSnapshotsIT.java +++ b/modules/data-streams/src/internalClusterTest/java/org/elasticsearch/datastreams/DataStreamsSnapshotsIT.java @@ -1430,4 +1430,60 @@ public void testWarningHeaderAbsentOnRestoreWithTemplates() throws Exception { ); } + + /** + * This test is muted as it's awaiting the same fix as {@link #testPartialRestoreSnapshotThatIncludesDataStreamWithGlobalState()} + */ + @AwaitsFix(bugUrl = "https://github.com/elastic/elasticsearch/issues/107515") + public void testWarningHeaderOnRestoreTemplateFromSnapshot() throws Exception { + String datastreamName = "ds"; + + CreateSnapshotResponse createSnapshotResponse = client.admin() + .cluster() + .prepareCreateSnapshot(TEST_REQUEST_TIMEOUT, REPO, SNAPSHOT) + .setWaitForCompletion(true) + .setIndices(datastreamName) + .setIncludeGlobalState(true) + .get(); + + RestStatus status = createSnapshotResponse.getSnapshotInfo().status(); + SnapshotId snapshotId = createSnapshotResponse.getSnapshotInfo().snapshotId(); + assertEquals(RestStatus.OK, status); + + assertEquals(Collections.singletonList(dsBackingIndexName), getSnapshot(REPO, SNAPSHOT).indices()); + + assertAcked( + client.execute( + DeleteDataStreamAction.INSTANCE, + new DeleteDataStreamAction.Request(TEST_REQUEST_TIMEOUT, datastreamName, "other-ds") + ) + ); + + assertAcked( + client.execute( + TransportDeleteComposableIndexTemplateAction.TYPE, + new TransportDeleteComposableIndexTemplateAction.Request(TEMPLATE_1_ID) + ).get() + ); + + RestoreSnapshotRequestBuilder request = client.admin() + .cluster() + .prepareRestoreSnapshot(TEST_REQUEST_TIMEOUT, REPO, SNAPSHOT) + .setWaitForCompletion(true) + .setRestoreGlobalState(true) + .setIndices(datastreamName); + + assertNoWarningHeaderOnResponse( + client, + request, + "Snapshot [" + + snapshotId + + "] contains data stream [" + + datastreamName + + "] but custer does not have a matching index " + + "template. This will cause rollover to fail until a matching index template is created" + ); + + } + } diff --git a/server/src/main/java/org/elasticsearch/snapshots/RestoreService.java b/server/src/main/java/org/elasticsearch/snapshots/RestoreService.java index 9c9da59f928ac..cf023b0e629c6 100644 --- a/server/src/main/java/org/elasticsearch/snapshots/RestoreService.java +++ b/server/src/main/java/org/elasticsearch/snapshots/RestoreService.java @@ -26,6 +26,7 @@ import org.elasticsearch.cluster.SnapshotDeletionsInProgress; import org.elasticsearch.cluster.block.ClusterBlocks; import org.elasticsearch.cluster.metadata.AliasMetadata; +import org.elasticsearch.cluster.metadata.ComposableIndexTemplate; import org.elasticsearch.cluster.metadata.DataStream; import org.elasticsearch.cluster.metadata.DataStreamAlias; import org.elasticsearch.cluster.metadata.IndexMetadata; @@ -399,7 +400,7 @@ private void startRestore( Map dataStreamsToRestore = result.v1(); Map dataStreamAliasesToRestore = result.v2(); - validateDatastreamTemplatesExistAndWarnIfMissing(dataStreamsToRestore, snapshotInfo); + validateDataStreamTemplatesExistAndWarnIfMissing(dataStreamsToRestore, snapshotInfo, globalMetadata); // Remove the data streams from the list of requested indices requestIndices.removeAll(dataStreamsToRestore.keySet()); @@ -513,13 +514,18 @@ private void startRestore( ); } - private void validateDatastreamTemplatesExistAndWarnIfMissing(Map dataStreamsToRestore, SnapshotInfo snapshotInfo) { - var templatePatterns = clusterService.state() - .metadata() - .templatesV2() - .values() - .stream() - .filter(cit -> cit.getDataStreamTemplate() != null) + private void validateDataStreamTemplatesExistAndWarnIfMissing( + Map dataStreamsToRestore, + SnapshotInfo snapshotInfo, + Metadata globalMetadata + ) { + + Stream streams = Stream.concat( + clusterService.state().metadata().templatesV2().values().stream(), + globalMetadata == null ? Stream.empty() : globalMetadata.templatesV2().values().stream() + ); + + Set templatePatterns = streams.filter(cit -> cit.getDataStreamTemplate() != null) .flatMap(cit -> cit.indexPatterns().stream()) .collect(Collectors.toSet()); From c6d9167bbf8d135db8709b0dc816035499277466 Mon Sep 17 00:00:00 2001 From: Luke Whiting Date: Tue, 15 Oct 2024 09:37:23 +0100 Subject: [PATCH 10/11] Comment on test cleanup to explain it was copied --- .../src/test/java/org/elasticsearch/xpack/ccr/AutoFollowIT.java | 2 ++ 1 file changed, 2 insertions(+) diff --git a/x-pack/plugin/ccr/qa/multi-cluster/src/test/java/org/elasticsearch/xpack/ccr/AutoFollowIT.java b/x-pack/plugin/ccr/qa/multi-cluster/src/test/java/org/elasticsearch/xpack/ccr/AutoFollowIT.java index 77af985a3e7bb..9303191fcf75f 100644 --- a/x-pack/plugin/ccr/qa/multi-cluster/src/test/java/org/elasticsearch/xpack/ccr/AutoFollowIT.java +++ b/x-pack/plugin/ccr/qa/multi-cluster/src/test/java/org/elasticsearch/xpack/ccr/AutoFollowIT.java @@ -1217,6 +1217,8 @@ private void testDataStreamPromotionWarnings(Boolean createFollowerTemplate) thr backingIndexNames = List.of(backingIndexName(dataStreamName, 1), backingIndexName(dataStreamName, 2)); } + // These cleanup methods are copied from the finally block of other Data Stream tests in this class however + // they may no longer be required but have been included for completeness cleanUpFollower(backingIndexNames, List.of(dataStreamName), List.of(autoFollowPatternName)); cleanUpLeader(backingIndexNames.subList(0, 1), List.of(dataStreamName), List.of()); Request deleteTemplateRequest = new Request("DELETE", "/_index_template/" + getTestName().toLowerCase(Locale.ROOT)); From 5bcfb5eeb15c12d348295e14bf80da7b04e0a866 Mon Sep 17 00:00:00 2001 From: Luke Whiting Date: Tue, 15 Oct 2024 09:40:02 +0100 Subject: [PATCH 11/11] Removed cluster service field --- .../datastreams/action/PromoteDataStreamTransportAction.java | 2 -- 1 file changed, 2 deletions(-) diff --git a/modules/data-streams/src/main/java/org/elasticsearch/datastreams/action/PromoteDataStreamTransportAction.java b/modules/data-streams/src/main/java/org/elasticsearch/datastreams/action/PromoteDataStreamTransportAction.java index 8b4ab3e2df3c9..edc17433ab746 100644 --- a/modules/data-streams/src/main/java/org/elasticsearch/datastreams/action/PromoteDataStreamTransportAction.java +++ b/modules/data-streams/src/main/java/org/elasticsearch/datastreams/action/PromoteDataStreamTransportAction.java @@ -42,7 +42,6 @@ public class PromoteDataStreamTransportAction extends AcknowledgedTransportMaste private static final Logger logger = LogManager.getLogger(PromoteDataStreamTransportAction.class); private final SystemIndices systemIndices; - private final ClusterService clusterService; @Inject public PromoteDataStreamTransportAction( @@ -64,7 +63,6 @@ public PromoteDataStreamTransportAction( EsExecutors.DIRECT_EXECUTOR_SERVICE ); this.systemIndices = systemIndices; - this.clusterService = clusterService; } @Override