From 2dd5e454ccca2b2b9ce502b660b3080cc5854ea9 Mon Sep 17 00:00:00 2001 From: Keith Massey Date: Thu, 20 Feb 2025 10:20:44 -0600 Subject: [PATCH] Updating TransportRolloverAction.checkBlock so that non-write-index blocks do not prevent data stream rollover (#122905) --- docs/changelog/122905.yaml | 6 + .../DataStreamLifecycleServiceIT.java | 16 +- .../rollover/TransportRolloverAction.java | 33 ++- .../TransportRolloverActionTests.java | 222 ++++++++++++++++++ 4 files changed, 260 insertions(+), 17 deletions(-) create mode 100644 docs/changelog/122905.yaml diff --git a/docs/changelog/122905.yaml b/docs/changelog/122905.yaml new file mode 100644 index 0000000000000..eccd50a759734 --- /dev/null +++ b/docs/changelog/122905.yaml @@ -0,0 +1,6 @@ +pr: 122905 +summary: Updating `TransportRolloverAction.checkBlock` so that non-write-index blocks + do not prevent data stream rollover +area: Data streams +type: bug +issues: [] diff --git a/modules/data-streams/src/internalClusterTest/java/org/elasticsearch/datastreams/lifecycle/DataStreamLifecycleServiceIT.java b/modules/data-streams/src/internalClusterTest/java/org/elasticsearch/datastreams/lifecycle/DataStreamLifecycleServiceIT.java index 1c00e3ad380dc..ddfc4561fe485 100644 --- a/modules/data-streams/src/internalClusterTest/java/org/elasticsearch/datastreams/lifecycle/DataStreamLifecycleServiceIT.java +++ b/modules/data-streams/src/internalClusterTest/java/org/elasticsearch/datastreams/lifecycle/DataStreamLifecycleServiceIT.java @@ -104,6 +104,7 @@ import static org.elasticsearch.index.IndexSettings.LIFECYCLE_ORIGINATION_DATE; import static org.elasticsearch.indices.ShardLimitValidator.SETTING_CLUSTER_MAX_SHARDS_PER_NODE; import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertAcked; +import static org.hamcrest.Matchers.contains; import static org.hamcrest.Matchers.containsInAnyOrder; import static org.hamcrest.Matchers.containsString; import static org.hamcrest.Matchers.equalTo; @@ -785,14 +786,10 @@ public void testErrorRecordingOnRetention() throws Exception { ).get(); DataStreamLifecycleHealthInfo dslHealthInfoOnHealthNode = healthNodeResponse.getHealthInfo().dslHealthInfo(); assertThat(dslHealthInfoOnHealthNode, is(not(DataStreamLifecycleHealthInfo.NO_DSL_ERRORS))); - // perhaps surprisingly rollover and delete are error-ing due to the read_only block on the first generation - // index which prevents metadata updates so rolling over the data stream is also blocked (note that both indices error at - // the same time so they'll have an equal retry count - the order becomes of the results, usually ordered by retry count, - // becomes non deterministic, hence the dynamic matching of index name) - assertThat(dslHealthInfoOnHealthNode.dslErrorsInfo().size(), is(2)); + assertThat(dslHealthInfoOnHealthNode.dslErrorsInfo().size(), is(1)); DslErrorInfo errorInfo = dslHealthInfoOnHealthNode.dslErrorsInfo().get(0); assertThat(errorInfo.retryCount(), greaterThanOrEqualTo(3)); - assertThat(List.of(firstGenerationIndex, secondGenerationIndex).contains(errorInfo.indexName()), is(true)); + assertThat(errorInfo.indexName(), equalTo(firstGenerationIndex)); }); GetHealthAction.Response healthResponse = client().execute(GetHealthAction.INSTANCE, new GetHealthAction.Request(true, 1000)) @@ -808,15 +805,12 @@ public void testErrorRecordingOnRetention() throws Exception { assertThat(dslIndicator.impacts(), is(STAGNATING_INDEX_IMPACT)); assertThat( dslIndicator.symptom(), - is("2 backing indices have repeatedly encountered errors whilst trying to advance in its lifecycle") + is("A backing index has repeatedly encountered errors whilst trying to advance in its lifecycle") ); Diagnosis diagnosis = dslIndicator.diagnosisList().get(0); assertThat(diagnosis.definition(), is(STAGNATING_BACKING_INDICES_DIAGNOSIS_DEF)); - assertThat( - diagnosis.affectedResources().get(0).getValues(), - containsInAnyOrder(firstGenerationIndex, secondGenerationIndex) - ); + assertThat(diagnosis.affectedResources().get(0).getValues(), contains(firstGenerationIndex)); } // let's mark the index as writeable and make sure it's deleted and the error store is empty diff --git a/server/src/main/java/org/elasticsearch/action/admin/indices/rollover/TransportRolloverAction.java b/server/src/main/java/org/elasticsearch/action/admin/indices/rollover/TransportRolloverAction.java index 8a6e84645a92f..ccf3c1696851b 100644 --- a/server/src/main/java/org/elasticsearch/action/admin/indices/rollover/TransportRolloverAction.java +++ b/server/src/main/java/org/elasticsearch/action/admin/indices/rollover/TransportRolloverAction.java @@ -154,12 +154,33 @@ protected ClusterBlockException checkBlock(RolloverRequest request, ClusterState .build(), IndicesOptions.GatekeeperOptions.DEFAULT ); - - return state.blocks() - .indicesBlockedException( - ClusterBlockLevel.METADATA_WRITE, - indexNameExpressionResolver.concreteIndexNames(state, indicesOptions, request) - ); + ResolvedExpression resolvedRolloverTarget = SelectorResolver.parseExpression(request.getRolloverTarget(), request.indicesOptions()); + final IndexAbstraction indexAbstraction = state.metadata().getIndicesLookup().get(resolvedRolloverTarget.resource()); + final String[] indicesToCheck; + if (indexAbstraction.getType().equals(IndexAbstraction.Type.DATA_STREAM)) { + DataStream dataStream = (DataStream) indexAbstraction; + boolean targetFailureStore = resolvedRolloverTarget.selector() != null + && resolvedRolloverTarget.selector().shouldIncludeFailures(); + if (targetFailureStore == false) { + assert dataStream.getWriteIndex() != null : dataStream.getName() + " is a data stream but has no write index"; + assert dataStream.getWriteIndex().getName() != null + : dataStream.getName() + " is a data stream but the write index is null"; + indicesToCheck = new String[] { dataStream.getWriteIndex().getName() }; + } else if (dataStream.getWriteFailureIndex() != null) { + assert dataStream.getWriteFailureIndex().getName() != null + : "the write index for the data stream " + dataStream.getName() + " is null"; + indicesToCheck = new String[] { dataStream.getWriteFailureIndex().getName() }; + } else { + indicesToCheck = null; + } + } else { + indicesToCheck = indexNameExpressionResolver.concreteIndexNames(state, indicesOptions, request); + } + if (indicesToCheck == null) { + return null; + } else { + return state.blocks().indicesBlockedException(ClusterBlockLevel.METADATA_WRITE, indicesToCheck); + } } @Override diff --git a/server/src/test/java/org/elasticsearch/action/admin/indices/rollover/TransportRolloverActionTests.java b/server/src/test/java/org/elasticsearch/action/admin/indices/rollover/TransportRolloverActionTests.java index 657a03066ada9..5ea7968b0af6e 100644 --- a/server/src/test/java/org/elasticsearch/action/admin/indices/rollover/TransportRolloverActionTests.java +++ b/server/src/test/java/org/elasticsearch/action/admin/indices/rollover/TransportRolloverActionTests.java @@ -11,6 +11,7 @@ import org.elasticsearch.action.ActionListener; import org.elasticsearch.action.ActionRequest; +import org.elasticsearch.action.IndicesRequest; import org.elasticsearch.action.admin.indices.stats.CommonStats; import org.elasticsearch.action.admin.indices.stats.IndexStats; import org.elasticsearch.action.admin.indices.stats.IndicesStatsAction; @@ -19,11 +20,14 @@ import org.elasticsearch.action.admin.indices.stats.ShardStats; import org.elasticsearch.action.datastreams.autosharding.DataStreamAutoShardingService; import org.elasticsearch.action.support.ActionFilters; +import org.elasticsearch.action.support.IndicesOptions; import org.elasticsearch.action.support.PlainActionFuture; import org.elasticsearch.action.support.master.AcknowledgedResponse; import org.elasticsearch.client.internal.Client; import org.elasticsearch.cluster.ClusterName; import org.elasticsearch.cluster.ClusterState; +import org.elasticsearch.cluster.block.ClusterBlockException; +import org.elasticsearch.cluster.block.ClusterBlocks; import org.elasticsearch.cluster.metadata.AliasMetadata; import org.elasticsearch.cluster.metadata.DataStream; import org.elasticsearch.cluster.metadata.IndexMetadata; @@ -43,6 +47,7 @@ import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.unit.ByteSizeValue; import org.elasticsearch.core.TimeValue; +import org.elasticsearch.index.Index; import org.elasticsearch.index.IndexMode; import org.elasticsearch.index.IndexVersion; import org.elasticsearch.index.cache.query.QueryCacheStats; @@ -578,6 +583,223 @@ public void testRolloverAliasToDataStreamFails() throws Exception { assertThat(illegalStateException.getMessage(), containsString("Aliases to data streams cannot be rolled over.")); } + public void testCheckBlockForIndices() { + final TransportRolloverAction transportRolloverAction = new TransportRolloverAction( + mock(TransportService.class), + mockClusterService, + mockThreadPool, + mockActionFilters, + mockIndexNameExpressionResolver, + rolloverService, + mockClient, + mockAllocationService, + mockMetadataDataStreamService, + dataStreamAutoShardingService + ); + final IndexMetadata.Builder indexMetadata1 = IndexMetadata.builder("my-index-1") + .putAlias(AliasMetadata.builder("my-alias").writeIndex(true).build()) + .settings(settings(IndexVersion.current())) + .numberOfShards(1) + .numberOfReplicas(1); + final IndexMetadata indexMetadata2 = IndexMetadata.builder("my-index-2") + .settings(settings(IndexVersion.current()).put(IndexMetadata.INDEX_READ_ONLY_SETTING.getKey(), true)) + .numberOfShards(1) + .numberOfReplicas(1) + .build(); + final ClusterState stateBefore = ClusterState.builder(ClusterName.DEFAULT) + .metadata(Metadata.builder().put(indexMetadata1).put(indexMetadata2, false)) + .blocks(ClusterBlocks.builder().addBlocks(indexMetadata2)) + .build(); + { + RolloverRequest rolloverRequest = new RolloverRequest("my-alias", "my-new-index"); + when(mockIndexNameExpressionResolver.concreteIndexNames(any(), any(), (IndicesRequest) any())).thenReturn( + new String[] { "my-index-1" } + ); + assertNull(transportRolloverAction.checkBlock(rolloverRequest, stateBefore)); + } + { + RolloverRequest rolloverRequest = new RolloverRequest("my-index-2", "my-new-index"); + when(mockIndexNameExpressionResolver.concreteIndexNames(any(), any(), (IndicesRequest) any())).thenReturn( + new String[] { "my-index-2" } + ); + assertNotNull(transportRolloverAction.checkBlock(rolloverRequest, stateBefore)); + } + } + + public void testCheckBlockForDataStreams() { + final TransportRolloverAction transportRolloverAction = new TransportRolloverAction( + mock(TransportService.class), + mockClusterService, + mockThreadPool, + mockActionFilters, + mockIndexNameExpressionResolver, + rolloverService, + mockClient, + mockAllocationService, + mockMetadataDataStreamService, + dataStreamAutoShardingService + ); + String dataStreamName = randomAlphaOfLength(20); + { + // First, make sure checkBlock returns null when there are no blocks + final ClusterState clusterState = createDataStream( + dataStreamName, + false, + false, + randomBoolean(), + randomBoolean(), + randomBoolean() + ); + RolloverRequest rolloverRequest = new RolloverRequest(dataStreamName, null); + assertNull(transportRolloverAction.checkBlock(rolloverRequest, clusterState)); + } + { + // Make sure checkBlock returns null when indices other than the write index have blocks + final ClusterState clusterState = createDataStream( + dataStreamName, + false, + true, + randomBoolean(), + randomBoolean(), + randomBoolean() + ); + RolloverRequest rolloverRequest = new RolloverRequest(dataStreamName, null); + assertNull(transportRolloverAction.checkBlock(rolloverRequest, clusterState)); + } + { + // Make sure checkBlock returns null when indices other than the write index have blocks and we use "::data" + final ClusterState clusterState = createDataStream( + dataStreamName, + false, + true, + randomBoolean(), + randomBoolean(), + randomBoolean() + ); + RolloverRequest rolloverRequest = new RolloverRequest(dataStreamName + "::data", null); + assertNull(transportRolloverAction.checkBlock(rolloverRequest, clusterState)); + } + { + // Make sure checkBlock returns an exception when the write index has a block + ClusterState clusterState = createDataStream( + dataStreamName, + true, + randomBoolean(), + randomBoolean(), + randomBoolean(), + randomBoolean() + ); + RolloverRequest rolloverRequest = new RolloverRequest(dataStreamName, null); + if (randomBoolean()) { + rolloverRequest.setIndicesOptions(IndicesOptions.lenientExpandOpenNoSelectors()); + } + ClusterBlockException e = transportRolloverAction.checkBlock(rolloverRequest, clusterState); + assertNotNull(e); + } + { + // Make sure checkBlock returns an exception when the write index has a block and we use "::data" + ClusterState clusterState = createDataStream( + dataStreamName, + true, + randomBoolean(), + randomBoolean(), + randomBoolean(), + randomBoolean() + ); + RolloverRequest rolloverRequest = new RolloverRequest(dataStreamName + "::data", null); + ClusterBlockException e = transportRolloverAction.checkBlock(rolloverRequest, clusterState); + assertNotNull(e); + } + } + + public void testCheckBlockForDataStreamFailureStores() { + final TransportRolloverAction transportRolloverAction = new TransportRolloverAction( + mock(TransportService.class), + mockClusterService, + mockThreadPool, + mockActionFilters, + mockIndexNameExpressionResolver, + rolloverService, + mockClient, + mockAllocationService, + mockMetadataDataStreamService, + dataStreamAutoShardingService + ); + String dataStreamName = randomAlphaOfLength(20); + { + // Make sure checkBlock returns no exception when there is no failure store block + ClusterState clusterState = createDataStream(dataStreamName, randomBoolean(), randomBoolean(), true, false, false); + RolloverRequest rolloverRequest = new RolloverRequest(dataStreamName + "::failures", null); + assertNull(transportRolloverAction.checkBlock(rolloverRequest, clusterState)); + } + { + // Make sure checkBlock returns an exception when the failure store write index has a block + ClusterState clusterState = createDataStream(dataStreamName, randomBoolean(), randomBoolean(), true, true, randomBoolean()); + RolloverRequest rolloverRequest = new RolloverRequest(dataStreamName + "::failures", null); + assertNotNull(transportRolloverAction.checkBlock(rolloverRequest, clusterState)); + } + { + // Make sure checkBlock returns no exception when failure store non-write indices have a block + ClusterState clusterState = createDataStream(dataStreamName, randomBoolean(), randomBoolean(), true, false, true); + RolloverRequest rolloverRequest = new RolloverRequest(dataStreamName + "::failures", null); + assertNull(transportRolloverAction.checkBlock(rolloverRequest, clusterState)); + } + } + + private ClusterState createDataStream( + String dataStreamName, + boolean blockOnWriteIndex, + boolean blocksOnNonWriteIndices, + boolean includeFailureStore, + boolean blockOnFailureStoreWriteIndex, + boolean blockOnFailureStoreNonWriteIndices + ) { + ClusterState.Builder clusterStateBuilder = ClusterState.builder(ClusterName.DEFAULT); + Metadata.Builder metadataBuilder = Metadata.builder(); + ClusterBlocks.Builder clusterBlocksBuilder = ClusterBlocks.builder(); + List indices = new ArrayList<>(); + int totalIndices = randomIntBetween(1, 20); + for (int i = 0; i < totalIndices; i++) { + Settings.Builder settingsBuilder = settings(IndexVersion.current()); + if ((blockOnWriteIndex && i == totalIndices - 1) || (blocksOnNonWriteIndices && i != totalIndices - 1)) { + settingsBuilder.put(IndexMetadata.INDEX_READ_ONLY_SETTING.getKey(), true); + } + final IndexMetadata backingIndexMetadata = IndexMetadata.builder(".ds-logs-ds-00000" + (i + 1)) + .settings(settingsBuilder) + .numberOfShards(1) + .numberOfReplicas(1) + .build(); + metadataBuilder.put(backingIndexMetadata, false); + indices.add(backingIndexMetadata.getIndex()); + clusterBlocksBuilder.addBlocks(backingIndexMetadata); + } + + DataStream.Builder dataStreamBuilder = DataStream.builder(dataStreamName, indices) + .setMetadata(Map.of()) + .setIndexMode(randomFrom(IndexMode.values())); + if (includeFailureStore) { + List failureStoreIndices = new ArrayList<>(); + int totalFailureStoreIndices = randomIntBetween(1, 20); + for (int i = 0; i < totalFailureStoreIndices; i++) { + Settings.Builder settingsBuilder = settings(IndexVersion.current()); + if ((blockOnFailureStoreWriteIndex && i == totalFailureStoreIndices - 1) + || (blockOnFailureStoreNonWriteIndices && i != totalFailureStoreIndices - 1)) { + settingsBuilder.put(IndexMetadata.INDEX_READ_ONLY_SETTING.getKey(), true); + } + final IndexMetadata failureStoreIndexMetadata = IndexMetadata.builder( + DataStream.getDefaultFailureStoreName(dataStreamName, i + 1, randomMillisUpToYear9999()) + ).settings(settingsBuilder).numberOfShards(1).numberOfReplicas(1).build(); + failureStoreIndices.add(failureStoreIndexMetadata.getIndex()); + clusterBlocksBuilder.addBlocks(failureStoreIndexMetadata); + } + dataStreamBuilder.setFailureIndices(DataStream.DataStreamIndices.failureIndicesBuilder(failureStoreIndices).build()); + } + clusterStateBuilder.blocks(clusterBlocksBuilder); + final DataStream dataStream = dataStreamBuilder.build(); + metadataBuilder.put(dataStream); + return clusterStateBuilder.metadata(metadataBuilder).build(); + } + private IndicesStatsResponse createIndicesStatResponse(String indexName, long totalDocs, long primariesDocs) { final CommonStats primaryStats = mock(CommonStats.class); when(primaryStats.getDocs()).thenReturn(new DocsStats(primariesDocs, 0, between(1, 10000)));