Skip to content

Commit 0e42877

Browse files
authored
Updating TransportRolloverAction.checkBlock so that non-write-index blocks do not prevent data stream rollover (elastic#122905) (elastic#123068)
1 parent 2ee3027 commit 0e42877

File tree

4 files changed

+260
-17
lines changed

4 files changed

+260
-17
lines changed

docs/changelog/122905.yaml

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,6 @@
1+
pr: 122905
2+
summary: Updating `TransportRolloverAction.checkBlock` so that non-write-index blocks
3+
do not prevent data stream rollover
4+
area: Data streams
5+
type: bug
6+
issues: []

modules/data-streams/src/internalClusterTest/java/org/elasticsearch/datastreams/lifecycle/DataStreamLifecycleServiceIT.java

Lines changed: 5 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -104,6 +104,7 @@
104104
import static org.elasticsearch.index.IndexSettings.LIFECYCLE_ORIGINATION_DATE;
105105
import static org.elasticsearch.indices.ShardLimitValidator.SETTING_CLUSTER_MAX_SHARDS_PER_NODE;
106106
import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertAcked;
107+
import static org.hamcrest.Matchers.contains;
107108
import static org.hamcrest.Matchers.containsInAnyOrder;
108109
import static org.hamcrest.Matchers.containsString;
109110
import static org.hamcrest.Matchers.equalTo;
@@ -785,14 +786,10 @@ public void testErrorRecordingOnRetention() throws Exception {
785786
).get();
786787
DataStreamLifecycleHealthInfo dslHealthInfoOnHealthNode = healthNodeResponse.getHealthInfo().dslHealthInfo();
787788
assertThat(dslHealthInfoOnHealthNode, is(not(DataStreamLifecycleHealthInfo.NO_DSL_ERRORS)));
788-
// perhaps surprisingly rollover and delete are error-ing due to the read_only block on the first generation
789-
// index which prevents metadata updates so rolling over the data stream is also blocked (note that both indices error at
790-
// the same time so they'll have an equal retry count - the order becomes of the results, usually ordered by retry count,
791-
// becomes non deterministic, hence the dynamic matching of index name)
792-
assertThat(dslHealthInfoOnHealthNode.dslErrorsInfo().size(), is(2));
789+
assertThat(dslHealthInfoOnHealthNode.dslErrorsInfo().size(), is(1));
793790
DslErrorInfo errorInfo = dslHealthInfoOnHealthNode.dslErrorsInfo().get(0);
794791
assertThat(errorInfo.retryCount(), greaterThanOrEqualTo(3));
795-
assertThat(List.of(firstGenerationIndex, secondGenerationIndex).contains(errorInfo.indexName()), is(true));
792+
assertThat(errorInfo.indexName(), equalTo(firstGenerationIndex));
796793
});
797794

798795
GetHealthAction.Response healthResponse = client().execute(GetHealthAction.INSTANCE, new GetHealthAction.Request(true, 1000))
@@ -808,15 +805,12 @@ public void testErrorRecordingOnRetention() throws Exception {
808805
assertThat(dslIndicator.impacts(), is(STAGNATING_INDEX_IMPACT));
809806
assertThat(
810807
dslIndicator.symptom(),
811-
is("2 backing indices have repeatedly encountered errors whilst trying to advance in its lifecycle")
808+
is("A backing index has repeatedly encountered errors whilst trying to advance in its lifecycle")
812809
);
813810

814811
Diagnosis diagnosis = dslIndicator.diagnosisList().get(0);
815812
assertThat(diagnosis.definition(), is(STAGNATING_BACKING_INDICES_DIAGNOSIS_DEF));
816-
assertThat(
817-
diagnosis.affectedResources().get(0).getValues(),
818-
containsInAnyOrder(firstGenerationIndex, secondGenerationIndex)
819-
);
813+
assertThat(diagnosis.affectedResources().get(0).getValues(), contains(firstGenerationIndex));
820814
}
821815

822816
// let's mark the index as writeable and make sure it's deleted and the error store is empty

server/src/main/java/org/elasticsearch/action/admin/indices/rollover/TransportRolloverAction.java

Lines changed: 27 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -154,12 +154,33 @@ protected ClusterBlockException checkBlock(RolloverRequest request, ClusterState
154154
.build(),
155155
IndicesOptions.GatekeeperOptions.DEFAULT
156156
);
157-
158-
return state.blocks()
159-
.indicesBlockedException(
160-
ClusterBlockLevel.METADATA_WRITE,
161-
indexNameExpressionResolver.concreteIndexNames(state, indicesOptions, request)
162-
);
157+
ResolvedExpression resolvedRolloverTarget = SelectorResolver.parseExpression(request.getRolloverTarget(), request.indicesOptions());
158+
final IndexAbstraction indexAbstraction = state.metadata().getIndicesLookup().get(resolvedRolloverTarget.resource());
159+
final String[] indicesToCheck;
160+
if (indexAbstraction.getType().equals(IndexAbstraction.Type.DATA_STREAM)) {
161+
DataStream dataStream = (DataStream) indexAbstraction;
162+
boolean targetFailureStore = resolvedRolloverTarget.selector() != null
163+
&& resolvedRolloverTarget.selector().shouldIncludeFailures();
164+
if (targetFailureStore == false) {
165+
assert dataStream.getWriteIndex() != null : dataStream.getName() + " is a data stream but has no write index";
166+
assert dataStream.getWriteIndex().getName() != null
167+
: dataStream.getName() + " is a data stream but the write index is null";
168+
indicesToCheck = new String[] { dataStream.getWriteIndex().getName() };
169+
} else if (dataStream.getWriteFailureIndex() != null) {
170+
assert dataStream.getWriteFailureIndex().getName() != null
171+
: "the write index for the data stream " + dataStream.getName() + " is null";
172+
indicesToCheck = new String[] { dataStream.getWriteFailureIndex().getName() };
173+
} else {
174+
indicesToCheck = null;
175+
}
176+
} else {
177+
indicesToCheck = indexNameExpressionResolver.concreteIndexNames(state, indicesOptions, request);
178+
}
179+
if (indicesToCheck == null) {
180+
return null;
181+
} else {
182+
return state.blocks().indicesBlockedException(ClusterBlockLevel.METADATA_WRITE, indicesToCheck);
183+
}
163184
}
164185

165186
@Override

server/src/test/java/org/elasticsearch/action/admin/indices/rollover/TransportRolloverActionTests.java

Lines changed: 222 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,7 @@
1111

1212
import org.elasticsearch.action.ActionListener;
1313
import org.elasticsearch.action.ActionRequest;
14+
import org.elasticsearch.action.IndicesRequest;
1415
import org.elasticsearch.action.admin.indices.stats.CommonStats;
1516
import org.elasticsearch.action.admin.indices.stats.IndexStats;
1617
import org.elasticsearch.action.admin.indices.stats.IndicesStatsAction;
@@ -19,11 +20,14 @@
1920
import org.elasticsearch.action.admin.indices.stats.ShardStats;
2021
import org.elasticsearch.action.datastreams.autosharding.DataStreamAutoShardingService;
2122
import org.elasticsearch.action.support.ActionFilters;
23+
import org.elasticsearch.action.support.IndicesOptions;
2224
import org.elasticsearch.action.support.PlainActionFuture;
2325
import org.elasticsearch.action.support.master.AcknowledgedResponse;
2426
import org.elasticsearch.client.internal.Client;
2527
import org.elasticsearch.cluster.ClusterName;
2628
import org.elasticsearch.cluster.ClusterState;
29+
import org.elasticsearch.cluster.block.ClusterBlockException;
30+
import org.elasticsearch.cluster.block.ClusterBlocks;
2731
import org.elasticsearch.cluster.metadata.AliasMetadata;
2832
import org.elasticsearch.cluster.metadata.DataStream;
2933
import org.elasticsearch.cluster.metadata.IndexMetadata;
@@ -44,6 +48,7 @@
4448
import org.elasticsearch.common.unit.ByteSizeValue;
4549
import org.elasticsearch.core.TimeValue;
4650
import org.elasticsearch.features.FeatureService;
51+
import org.elasticsearch.index.Index;
4752
import org.elasticsearch.index.IndexMode;
4853
import org.elasticsearch.index.IndexVersion;
4954
import org.elasticsearch.index.cache.query.QueryCacheStats;
@@ -580,6 +585,223 @@ public void testRolloverAliasToDataStreamFails() throws Exception {
580585
assertThat(illegalStateException.getMessage(), containsString("Aliases to data streams cannot be rolled over."));
581586
}
582587

588+
public void testCheckBlockForIndices() {
589+
final TransportRolloverAction transportRolloverAction = new TransportRolloverAction(
590+
mock(TransportService.class),
591+
mockClusterService,
592+
mockThreadPool,
593+
mockActionFilters,
594+
mockIndexNameExpressionResolver,
595+
rolloverService,
596+
mockClient,
597+
mockAllocationService,
598+
mockMetadataDataStreamService,
599+
dataStreamAutoShardingService
600+
);
601+
final IndexMetadata.Builder indexMetadata1 = IndexMetadata.builder("my-index-1")
602+
.putAlias(AliasMetadata.builder("my-alias").writeIndex(true).build())
603+
.settings(settings(IndexVersion.current()))
604+
.numberOfShards(1)
605+
.numberOfReplicas(1);
606+
final IndexMetadata indexMetadata2 = IndexMetadata.builder("my-index-2")
607+
.settings(settings(IndexVersion.current()).put(IndexMetadata.INDEX_READ_ONLY_SETTING.getKey(), true))
608+
.numberOfShards(1)
609+
.numberOfReplicas(1)
610+
.build();
611+
final ClusterState stateBefore = ClusterState.builder(ClusterName.DEFAULT)
612+
.metadata(Metadata.builder().put(indexMetadata1).put(indexMetadata2, false))
613+
.blocks(ClusterBlocks.builder().addBlocks(indexMetadata2))
614+
.build();
615+
{
616+
RolloverRequest rolloverRequest = new RolloverRequest("my-alias", "my-new-index");
617+
when(mockIndexNameExpressionResolver.concreteIndexNames(any(), any(), (IndicesRequest) any())).thenReturn(
618+
new String[] { "my-index-1" }
619+
);
620+
assertNull(transportRolloverAction.checkBlock(rolloverRequest, stateBefore));
621+
}
622+
{
623+
RolloverRequest rolloverRequest = new RolloverRequest("my-index-2", "my-new-index");
624+
when(mockIndexNameExpressionResolver.concreteIndexNames(any(), any(), (IndicesRequest) any())).thenReturn(
625+
new String[] { "my-index-2" }
626+
);
627+
assertNotNull(transportRolloverAction.checkBlock(rolloverRequest, stateBefore));
628+
}
629+
}
630+
631+
public void testCheckBlockForDataStreams() {
632+
final TransportRolloverAction transportRolloverAction = new TransportRolloverAction(
633+
mock(TransportService.class),
634+
mockClusterService,
635+
mockThreadPool,
636+
mockActionFilters,
637+
mockIndexNameExpressionResolver,
638+
rolloverService,
639+
mockClient,
640+
mockAllocationService,
641+
mockMetadataDataStreamService,
642+
dataStreamAutoShardingService
643+
);
644+
String dataStreamName = randomAlphaOfLength(20);
645+
{
646+
// First, make sure checkBlock returns null when there are no blocks
647+
final ClusterState clusterState = createDataStream(
648+
dataStreamName,
649+
false,
650+
false,
651+
randomBoolean(),
652+
randomBoolean(),
653+
randomBoolean()
654+
);
655+
RolloverRequest rolloverRequest = new RolloverRequest(dataStreamName, null);
656+
assertNull(transportRolloverAction.checkBlock(rolloverRequest, clusterState));
657+
}
658+
{
659+
// Make sure checkBlock returns null when indices other than the write index have blocks
660+
final ClusterState clusterState = createDataStream(
661+
dataStreamName,
662+
false,
663+
true,
664+
randomBoolean(),
665+
randomBoolean(),
666+
randomBoolean()
667+
);
668+
RolloverRequest rolloverRequest = new RolloverRequest(dataStreamName, null);
669+
assertNull(transportRolloverAction.checkBlock(rolloverRequest, clusterState));
670+
}
671+
{
672+
// Make sure checkBlock returns null when indices other than the write index have blocks and we use "::data"
673+
final ClusterState clusterState = createDataStream(
674+
dataStreamName,
675+
false,
676+
true,
677+
randomBoolean(),
678+
randomBoolean(),
679+
randomBoolean()
680+
);
681+
RolloverRequest rolloverRequest = new RolloverRequest(dataStreamName + "::data", null);
682+
assertNull(transportRolloverAction.checkBlock(rolloverRequest, clusterState));
683+
}
684+
{
685+
// Make sure checkBlock returns an exception when the write index has a block
686+
ClusterState clusterState = createDataStream(
687+
dataStreamName,
688+
true,
689+
randomBoolean(),
690+
randomBoolean(),
691+
randomBoolean(),
692+
randomBoolean()
693+
);
694+
RolloverRequest rolloverRequest = new RolloverRequest(dataStreamName, null);
695+
if (randomBoolean()) {
696+
rolloverRequest.setIndicesOptions(IndicesOptions.lenientExpandOpenNoSelectors());
697+
}
698+
ClusterBlockException e = transportRolloverAction.checkBlock(rolloverRequest, clusterState);
699+
assertNotNull(e);
700+
}
701+
{
702+
// Make sure checkBlock returns an exception when the write index has a block and we use "::data"
703+
ClusterState clusterState = createDataStream(
704+
dataStreamName,
705+
true,
706+
randomBoolean(),
707+
randomBoolean(),
708+
randomBoolean(),
709+
randomBoolean()
710+
);
711+
RolloverRequest rolloverRequest = new RolloverRequest(dataStreamName + "::data", null);
712+
ClusterBlockException e = transportRolloverAction.checkBlock(rolloverRequest, clusterState);
713+
assertNotNull(e);
714+
}
715+
}
716+
717+
public void testCheckBlockForDataStreamFailureStores() {
718+
final TransportRolloverAction transportRolloverAction = new TransportRolloverAction(
719+
mock(TransportService.class),
720+
mockClusterService,
721+
mockThreadPool,
722+
mockActionFilters,
723+
mockIndexNameExpressionResolver,
724+
rolloverService,
725+
mockClient,
726+
mockAllocationService,
727+
mockMetadataDataStreamService,
728+
dataStreamAutoShardingService
729+
);
730+
String dataStreamName = randomAlphaOfLength(20);
731+
{
732+
// Make sure checkBlock returns no exception when there is no failure store block
733+
ClusterState clusterState = createDataStream(dataStreamName, randomBoolean(), randomBoolean(), true, false, false);
734+
RolloverRequest rolloverRequest = new RolloverRequest(dataStreamName + "::failures", null);
735+
assertNull(transportRolloverAction.checkBlock(rolloverRequest, clusterState));
736+
}
737+
{
738+
// Make sure checkBlock returns an exception when the failure store write index has a block
739+
ClusterState clusterState = createDataStream(dataStreamName, randomBoolean(), randomBoolean(), true, true, randomBoolean());
740+
RolloverRequest rolloverRequest = new RolloverRequest(dataStreamName + "::failures", null);
741+
assertNotNull(transportRolloverAction.checkBlock(rolloverRequest, clusterState));
742+
}
743+
{
744+
// Make sure checkBlock returns no exception when failure store non-write indices have a block
745+
ClusterState clusterState = createDataStream(dataStreamName, randomBoolean(), randomBoolean(), true, false, true);
746+
RolloverRequest rolloverRequest = new RolloverRequest(dataStreamName + "::failures", null);
747+
assertNull(transportRolloverAction.checkBlock(rolloverRequest, clusterState));
748+
}
749+
}
750+
751+
private ClusterState createDataStream(
752+
String dataStreamName,
753+
boolean blockOnWriteIndex,
754+
boolean blocksOnNonWriteIndices,
755+
boolean includeFailureStore,
756+
boolean blockOnFailureStoreWriteIndex,
757+
boolean blockOnFailureStoreNonWriteIndices
758+
) {
759+
ClusterState.Builder clusterStateBuilder = ClusterState.builder(ClusterName.DEFAULT);
760+
Metadata.Builder metadataBuilder = Metadata.builder();
761+
ClusterBlocks.Builder clusterBlocksBuilder = ClusterBlocks.builder();
762+
List<Index> indices = new ArrayList<>();
763+
int totalIndices = randomIntBetween(1, 20);
764+
for (int i = 0; i < totalIndices; i++) {
765+
Settings.Builder settingsBuilder = settings(IndexVersion.current());
766+
if ((blockOnWriteIndex && i == totalIndices - 1) || (blocksOnNonWriteIndices && i != totalIndices - 1)) {
767+
settingsBuilder.put(IndexMetadata.INDEX_READ_ONLY_SETTING.getKey(), true);
768+
}
769+
final IndexMetadata backingIndexMetadata = IndexMetadata.builder(".ds-logs-ds-00000" + (i + 1))
770+
.settings(settingsBuilder)
771+
.numberOfShards(1)
772+
.numberOfReplicas(1)
773+
.build();
774+
metadataBuilder.put(backingIndexMetadata, false);
775+
indices.add(backingIndexMetadata.getIndex());
776+
clusterBlocksBuilder.addBlocks(backingIndexMetadata);
777+
}
778+
779+
DataStream.Builder dataStreamBuilder = DataStream.builder(dataStreamName, indices)
780+
.setMetadata(Map.of())
781+
.setIndexMode(randomFrom(IndexMode.values()));
782+
if (includeFailureStore) {
783+
List<Index> failureStoreIndices = new ArrayList<>();
784+
int totalFailureStoreIndices = randomIntBetween(1, 20);
785+
for (int i = 0; i < totalFailureStoreIndices; i++) {
786+
Settings.Builder settingsBuilder = settings(IndexVersion.current());
787+
if ((blockOnFailureStoreWriteIndex && i == totalFailureStoreIndices - 1)
788+
|| (blockOnFailureStoreNonWriteIndices && i != totalFailureStoreIndices - 1)) {
789+
settingsBuilder.put(IndexMetadata.INDEX_READ_ONLY_SETTING.getKey(), true);
790+
}
791+
final IndexMetadata failureStoreIndexMetadata = IndexMetadata.builder(
792+
DataStream.getDefaultFailureStoreName(dataStreamName, i + 1, randomMillisUpToYear9999())
793+
).settings(settingsBuilder).numberOfShards(1).numberOfReplicas(1).build();
794+
failureStoreIndices.add(failureStoreIndexMetadata.getIndex());
795+
clusterBlocksBuilder.addBlocks(failureStoreIndexMetadata);
796+
}
797+
dataStreamBuilder.setFailureIndices(DataStream.DataStreamIndices.failureIndicesBuilder(failureStoreIndices).build());
798+
}
799+
clusterStateBuilder.blocks(clusterBlocksBuilder);
800+
final DataStream dataStream = dataStreamBuilder.build();
801+
metadataBuilder.put(dataStream);
802+
return clusterStateBuilder.metadata(metadataBuilder).build();
803+
}
804+
583805
private IndicesStatsResponse createIndicesStatResponse(String indexName, long totalDocs, long primariesDocs) {
584806
final CommonStats primaryStats = mock(CommonStats.class);
585807
when(primaryStats.getDocs()).thenReturn(new DocsStats(primariesDocs, 0, between(1, 10000)));

0 commit comments

Comments
 (0)