Skip to content

Commit 658af6b

Browse files
masseykeafoucret
authored andcommitted
Updating TransportRolloverAction.checkBlock so that non-write-index blocks do not prevent data stream rollover (elastic#122905)
1 parent b8b3a08 commit 658af6b

File tree

4 files changed

+260
-17
lines changed

4 files changed

+260
-17
lines changed

docs/changelog/122905.yaml

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,6 @@
1+
pr: 122905
2+
summary: Updating `TransportRolloverAction.checkBlock` so that non-write-index blocks
3+
do not prevent data stream rollover
4+
area: Data streams
5+
type: bug
6+
issues: []

modules/data-streams/src/internalClusterTest/java/org/elasticsearch/datastreams/lifecycle/DataStreamLifecycleServiceIT.java

Lines changed: 5 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -104,6 +104,7 @@
104104
import static org.elasticsearch.index.IndexSettings.LIFECYCLE_ORIGINATION_DATE;
105105
import static org.elasticsearch.indices.ShardLimitValidator.SETTING_CLUSTER_MAX_SHARDS_PER_NODE;
106106
import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertAcked;
107+
import static org.hamcrest.Matchers.contains;
107108
import static org.hamcrest.Matchers.containsInAnyOrder;
108109
import static org.hamcrest.Matchers.containsString;
109110
import static org.hamcrest.Matchers.equalTo;
@@ -785,14 +786,10 @@ public void testErrorRecordingOnRetention() throws Exception {
785786
).get();
786787
DataStreamLifecycleHealthInfo dslHealthInfoOnHealthNode = healthNodeResponse.getHealthInfo().dslHealthInfo();
787788
assertThat(dslHealthInfoOnHealthNode, is(not(DataStreamLifecycleHealthInfo.NO_DSL_ERRORS)));
788-
// perhaps surprisingly rollover and delete are error-ing due to the read_only block on the first generation
789-
// index which prevents metadata updates so rolling over the data stream is also blocked (note that both indices error at
790-
// the same time so they'll have an equal retry count - the order becomes of the results, usually ordered by retry count,
791-
// becomes non deterministic, hence the dynamic matching of index name)
792-
assertThat(dslHealthInfoOnHealthNode.dslErrorsInfo().size(), is(2));
789+
assertThat(dslHealthInfoOnHealthNode.dslErrorsInfo().size(), is(1));
793790
DslErrorInfo errorInfo = dslHealthInfoOnHealthNode.dslErrorsInfo().get(0);
794791
assertThat(errorInfo.retryCount(), greaterThanOrEqualTo(3));
795-
assertThat(List.of(firstGenerationIndex, secondGenerationIndex).contains(errorInfo.indexName()), is(true));
792+
assertThat(errorInfo.indexName(), equalTo(firstGenerationIndex));
796793
});
797794

798795
GetHealthAction.Response healthResponse = client().execute(GetHealthAction.INSTANCE, new GetHealthAction.Request(true, 1000))
@@ -808,15 +805,12 @@ public void testErrorRecordingOnRetention() throws Exception {
808805
assertThat(dslIndicator.impacts(), is(STAGNATING_INDEX_IMPACT));
809806
assertThat(
810807
dslIndicator.symptom(),
811-
is("2 backing indices have repeatedly encountered errors whilst trying to advance in its lifecycle")
808+
is("A backing index has repeatedly encountered errors whilst trying to advance in its lifecycle")
812809
);
813810

814811
Diagnosis diagnosis = dslIndicator.diagnosisList().get(0);
815812
assertThat(diagnosis.definition(), is(STAGNATING_BACKING_INDICES_DIAGNOSIS_DEF));
816-
assertThat(
817-
diagnosis.affectedResources().get(0).getValues(),
818-
containsInAnyOrder(firstGenerationIndex, secondGenerationIndex)
819-
);
813+
assertThat(diagnosis.affectedResources().get(0).getValues(), contains(firstGenerationIndex));
820814
}
821815

822816
// let's mark the index as writeable and make sure it's deleted and the error store is empty

server/src/main/java/org/elasticsearch/action/admin/indices/rollover/TransportRolloverAction.java

Lines changed: 27 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -154,12 +154,33 @@ protected ClusterBlockException checkBlock(RolloverRequest request, ClusterState
154154
.build(),
155155
IndicesOptions.GatekeeperOptions.DEFAULT
156156
);
157-
158-
return state.blocks()
159-
.indicesBlockedException(
160-
ClusterBlockLevel.METADATA_WRITE,
161-
indexNameExpressionResolver.concreteIndexNames(state, indicesOptions, request)
162-
);
157+
ResolvedExpression resolvedRolloverTarget = SelectorResolver.parseExpression(request.getRolloverTarget(), request.indicesOptions());
158+
final IndexAbstraction indexAbstraction = state.metadata().getIndicesLookup().get(resolvedRolloverTarget.resource());
159+
final String[] indicesToCheck;
160+
if (indexAbstraction.getType().equals(IndexAbstraction.Type.DATA_STREAM)) {
161+
DataStream dataStream = (DataStream) indexAbstraction;
162+
boolean targetFailureStore = resolvedRolloverTarget.selector() != null
163+
&& resolvedRolloverTarget.selector().shouldIncludeFailures();
164+
if (targetFailureStore == false) {
165+
assert dataStream.getWriteIndex() != null : dataStream.getName() + " is a data stream but has no write index";
166+
assert dataStream.getWriteIndex().getName() != null
167+
: dataStream.getName() + " is a data stream but the write index is null";
168+
indicesToCheck = new String[] { dataStream.getWriteIndex().getName() };
169+
} else if (dataStream.getWriteFailureIndex() != null) {
170+
assert dataStream.getWriteFailureIndex().getName() != null
171+
: "the write index for the data stream " + dataStream.getName() + " is null";
172+
indicesToCheck = new String[] { dataStream.getWriteFailureIndex().getName() };
173+
} else {
174+
indicesToCheck = null;
175+
}
176+
} else {
177+
indicesToCheck = indexNameExpressionResolver.concreteIndexNames(state, indicesOptions, request);
178+
}
179+
if (indicesToCheck == null) {
180+
return null;
181+
} else {
182+
return state.blocks().indicesBlockedException(ClusterBlockLevel.METADATA_WRITE, indicesToCheck);
183+
}
163184
}
164185

165186
@Override

server/src/test/java/org/elasticsearch/action/admin/indices/rollover/TransportRolloverActionTests.java

Lines changed: 222 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,7 @@
1111

1212
import org.elasticsearch.action.ActionListener;
1313
import org.elasticsearch.action.ActionRequest;
14+
import org.elasticsearch.action.IndicesRequest;
1415
import org.elasticsearch.action.admin.indices.stats.CommonStats;
1516
import org.elasticsearch.action.admin.indices.stats.IndexStats;
1617
import org.elasticsearch.action.admin.indices.stats.IndicesStatsAction;
@@ -19,11 +20,14 @@
1920
import org.elasticsearch.action.admin.indices.stats.ShardStats;
2021
import org.elasticsearch.action.datastreams.autosharding.DataStreamAutoShardingService;
2122
import org.elasticsearch.action.support.ActionFilters;
23+
import org.elasticsearch.action.support.IndicesOptions;
2224
import org.elasticsearch.action.support.PlainActionFuture;
2325
import org.elasticsearch.action.support.master.AcknowledgedResponse;
2426
import org.elasticsearch.client.internal.Client;
2527
import org.elasticsearch.cluster.ClusterName;
2628
import org.elasticsearch.cluster.ClusterState;
29+
import org.elasticsearch.cluster.block.ClusterBlockException;
30+
import org.elasticsearch.cluster.block.ClusterBlocks;
2731
import org.elasticsearch.cluster.metadata.AliasMetadata;
2832
import org.elasticsearch.cluster.metadata.DataStream;
2933
import org.elasticsearch.cluster.metadata.IndexMetadata;
@@ -43,6 +47,7 @@
4347
import org.elasticsearch.common.settings.Settings;
4448
import org.elasticsearch.common.unit.ByteSizeValue;
4549
import org.elasticsearch.core.TimeValue;
50+
import org.elasticsearch.index.Index;
4651
import org.elasticsearch.index.IndexMode;
4752
import org.elasticsearch.index.IndexVersion;
4853
import org.elasticsearch.index.cache.query.QueryCacheStats;
@@ -578,6 +583,223 @@ public void testRolloverAliasToDataStreamFails() throws Exception {
578583
assertThat(illegalStateException.getMessage(), containsString("Aliases to data streams cannot be rolled over."));
579584
}
580585

586+
public void testCheckBlockForIndices() {
587+
final TransportRolloverAction transportRolloverAction = new TransportRolloverAction(
588+
mock(TransportService.class),
589+
mockClusterService,
590+
mockThreadPool,
591+
mockActionFilters,
592+
mockIndexNameExpressionResolver,
593+
rolloverService,
594+
mockClient,
595+
mockAllocationService,
596+
mockMetadataDataStreamService,
597+
dataStreamAutoShardingService
598+
);
599+
final IndexMetadata.Builder indexMetadata1 = IndexMetadata.builder("my-index-1")
600+
.putAlias(AliasMetadata.builder("my-alias").writeIndex(true).build())
601+
.settings(settings(IndexVersion.current()))
602+
.numberOfShards(1)
603+
.numberOfReplicas(1);
604+
final IndexMetadata indexMetadata2 = IndexMetadata.builder("my-index-2")
605+
.settings(settings(IndexVersion.current()).put(IndexMetadata.INDEX_READ_ONLY_SETTING.getKey(), true))
606+
.numberOfShards(1)
607+
.numberOfReplicas(1)
608+
.build();
609+
final ClusterState stateBefore = ClusterState.builder(ClusterName.DEFAULT)
610+
.metadata(Metadata.builder().put(indexMetadata1).put(indexMetadata2, false))
611+
.blocks(ClusterBlocks.builder().addBlocks(indexMetadata2))
612+
.build();
613+
{
614+
RolloverRequest rolloverRequest = new RolloverRequest("my-alias", "my-new-index");
615+
when(mockIndexNameExpressionResolver.concreteIndexNames(any(), any(), (IndicesRequest) any())).thenReturn(
616+
new String[] { "my-index-1" }
617+
);
618+
assertNull(transportRolloverAction.checkBlock(rolloverRequest, stateBefore));
619+
}
620+
{
621+
RolloverRequest rolloverRequest = new RolloverRequest("my-index-2", "my-new-index");
622+
when(mockIndexNameExpressionResolver.concreteIndexNames(any(), any(), (IndicesRequest) any())).thenReturn(
623+
new String[] { "my-index-2" }
624+
);
625+
assertNotNull(transportRolloverAction.checkBlock(rolloverRequest, stateBefore));
626+
}
627+
}
628+
629+
public void testCheckBlockForDataStreams() {
630+
final TransportRolloverAction transportRolloverAction = new TransportRolloverAction(
631+
mock(TransportService.class),
632+
mockClusterService,
633+
mockThreadPool,
634+
mockActionFilters,
635+
mockIndexNameExpressionResolver,
636+
rolloverService,
637+
mockClient,
638+
mockAllocationService,
639+
mockMetadataDataStreamService,
640+
dataStreamAutoShardingService
641+
);
642+
String dataStreamName = randomAlphaOfLength(20);
643+
{
644+
// First, make sure checkBlock returns null when there are no blocks
645+
final ClusterState clusterState = createDataStream(
646+
dataStreamName,
647+
false,
648+
false,
649+
randomBoolean(),
650+
randomBoolean(),
651+
randomBoolean()
652+
);
653+
RolloverRequest rolloverRequest = new RolloverRequest(dataStreamName, null);
654+
assertNull(transportRolloverAction.checkBlock(rolloverRequest, clusterState));
655+
}
656+
{
657+
// Make sure checkBlock returns null when indices other than the write index have blocks
658+
final ClusterState clusterState = createDataStream(
659+
dataStreamName,
660+
false,
661+
true,
662+
randomBoolean(),
663+
randomBoolean(),
664+
randomBoolean()
665+
);
666+
RolloverRequest rolloverRequest = new RolloverRequest(dataStreamName, null);
667+
assertNull(transportRolloverAction.checkBlock(rolloverRequest, clusterState));
668+
}
669+
{
670+
// Make sure checkBlock returns null when indices other than the write index have blocks and we use "::data"
671+
final ClusterState clusterState = createDataStream(
672+
dataStreamName,
673+
false,
674+
true,
675+
randomBoolean(),
676+
randomBoolean(),
677+
randomBoolean()
678+
);
679+
RolloverRequest rolloverRequest = new RolloverRequest(dataStreamName + "::data", null);
680+
assertNull(transportRolloverAction.checkBlock(rolloverRequest, clusterState));
681+
}
682+
{
683+
// Make sure checkBlock returns an exception when the write index has a block
684+
ClusterState clusterState = createDataStream(
685+
dataStreamName,
686+
true,
687+
randomBoolean(),
688+
randomBoolean(),
689+
randomBoolean(),
690+
randomBoolean()
691+
);
692+
RolloverRequest rolloverRequest = new RolloverRequest(dataStreamName, null);
693+
if (randomBoolean()) {
694+
rolloverRequest.setIndicesOptions(IndicesOptions.lenientExpandOpenNoSelectors());
695+
}
696+
ClusterBlockException e = transportRolloverAction.checkBlock(rolloverRequest, clusterState);
697+
assertNotNull(e);
698+
}
699+
{
700+
// Make sure checkBlock returns an exception when the write index has a block and we use "::data"
701+
ClusterState clusterState = createDataStream(
702+
dataStreamName,
703+
true,
704+
randomBoolean(),
705+
randomBoolean(),
706+
randomBoolean(),
707+
randomBoolean()
708+
);
709+
RolloverRequest rolloverRequest = new RolloverRequest(dataStreamName + "::data", null);
710+
ClusterBlockException e = transportRolloverAction.checkBlock(rolloverRequest, clusterState);
711+
assertNotNull(e);
712+
}
713+
}
714+
715+
public void testCheckBlockForDataStreamFailureStores() {
716+
final TransportRolloverAction transportRolloverAction = new TransportRolloverAction(
717+
mock(TransportService.class),
718+
mockClusterService,
719+
mockThreadPool,
720+
mockActionFilters,
721+
mockIndexNameExpressionResolver,
722+
rolloverService,
723+
mockClient,
724+
mockAllocationService,
725+
mockMetadataDataStreamService,
726+
dataStreamAutoShardingService
727+
);
728+
String dataStreamName = randomAlphaOfLength(20);
729+
{
730+
// Make sure checkBlock returns no exception when there is no failure store block
731+
ClusterState clusterState = createDataStream(dataStreamName, randomBoolean(), randomBoolean(), true, false, false);
732+
RolloverRequest rolloverRequest = new RolloverRequest(dataStreamName + "::failures", null);
733+
assertNull(transportRolloverAction.checkBlock(rolloverRequest, clusterState));
734+
}
735+
{
736+
// Make sure checkBlock returns an exception when the failure store write index has a block
737+
ClusterState clusterState = createDataStream(dataStreamName, randomBoolean(), randomBoolean(), true, true, randomBoolean());
738+
RolloverRequest rolloverRequest = new RolloverRequest(dataStreamName + "::failures", null);
739+
assertNotNull(transportRolloverAction.checkBlock(rolloverRequest, clusterState));
740+
}
741+
{
742+
// Make sure checkBlock returns no exception when failure store non-write indices have a block
743+
ClusterState clusterState = createDataStream(dataStreamName, randomBoolean(), randomBoolean(), true, false, true);
744+
RolloverRequest rolloverRequest = new RolloverRequest(dataStreamName + "::failures", null);
745+
assertNull(transportRolloverAction.checkBlock(rolloverRequest, clusterState));
746+
}
747+
}
748+
749+
private ClusterState createDataStream(
750+
String dataStreamName,
751+
boolean blockOnWriteIndex,
752+
boolean blocksOnNonWriteIndices,
753+
boolean includeFailureStore,
754+
boolean blockOnFailureStoreWriteIndex,
755+
boolean blockOnFailureStoreNonWriteIndices
756+
) {
757+
ClusterState.Builder clusterStateBuilder = ClusterState.builder(ClusterName.DEFAULT);
758+
Metadata.Builder metadataBuilder = Metadata.builder();
759+
ClusterBlocks.Builder clusterBlocksBuilder = ClusterBlocks.builder();
760+
List<Index> indices = new ArrayList<>();
761+
int totalIndices = randomIntBetween(1, 20);
762+
for (int i = 0; i < totalIndices; i++) {
763+
Settings.Builder settingsBuilder = settings(IndexVersion.current());
764+
if ((blockOnWriteIndex && i == totalIndices - 1) || (blocksOnNonWriteIndices && i != totalIndices - 1)) {
765+
settingsBuilder.put(IndexMetadata.INDEX_READ_ONLY_SETTING.getKey(), true);
766+
}
767+
final IndexMetadata backingIndexMetadata = IndexMetadata.builder(".ds-logs-ds-00000" + (i + 1))
768+
.settings(settingsBuilder)
769+
.numberOfShards(1)
770+
.numberOfReplicas(1)
771+
.build();
772+
metadataBuilder.put(backingIndexMetadata, false);
773+
indices.add(backingIndexMetadata.getIndex());
774+
clusterBlocksBuilder.addBlocks(backingIndexMetadata);
775+
}
776+
777+
DataStream.Builder dataStreamBuilder = DataStream.builder(dataStreamName, indices)
778+
.setMetadata(Map.of())
779+
.setIndexMode(randomFrom(IndexMode.values()));
780+
if (includeFailureStore) {
781+
List<Index> failureStoreIndices = new ArrayList<>();
782+
int totalFailureStoreIndices = randomIntBetween(1, 20);
783+
for (int i = 0; i < totalFailureStoreIndices; i++) {
784+
Settings.Builder settingsBuilder = settings(IndexVersion.current());
785+
if ((blockOnFailureStoreWriteIndex && i == totalFailureStoreIndices - 1)
786+
|| (blockOnFailureStoreNonWriteIndices && i != totalFailureStoreIndices - 1)) {
787+
settingsBuilder.put(IndexMetadata.INDEX_READ_ONLY_SETTING.getKey(), true);
788+
}
789+
final IndexMetadata failureStoreIndexMetadata = IndexMetadata.builder(
790+
DataStream.getDefaultFailureStoreName(dataStreamName, i + 1, randomMillisUpToYear9999())
791+
).settings(settingsBuilder).numberOfShards(1).numberOfReplicas(1).build();
792+
failureStoreIndices.add(failureStoreIndexMetadata.getIndex());
793+
clusterBlocksBuilder.addBlocks(failureStoreIndexMetadata);
794+
}
795+
dataStreamBuilder.setFailureIndices(DataStream.DataStreamIndices.failureIndicesBuilder(failureStoreIndices).build());
796+
}
797+
clusterStateBuilder.blocks(clusterBlocksBuilder);
798+
final DataStream dataStream = dataStreamBuilder.build();
799+
metadataBuilder.put(dataStream);
800+
return clusterStateBuilder.metadata(metadataBuilder).build();
801+
}
802+
581803
private IndicesStatsResponse createIndicesStatResponse(String indexName, long totalDocs, long primariesDocs) {
582804
final CommonStats primaryStats = mock(CommonStats.class);
583805
when(primaryStats.getDocs()).thenReturn(new DocsStats(primariesDocs, 0, between(1, 10000)));

0 commit comments

Comments
 (0)