Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions docs/changelog/122905.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
pr: 122905
summary: Updating `TransportRolloverAction.checkBlock` so that non-write-index blocks
do not prevent data stream rollover
area: Data streams
type: bug
issues: []
Original file line number Diff line number Diff line change
Expand Up @@ -104,6 +104,7 @@
import static org.elasticsearch.index.IndexSettings.LIFECYCLE_ORIGINATION_DATE;
import static org.elasticsearch.indices.ShardLimitValidator.SETTING_CLUSTER_MAX_SHARDS_PER_NODE;
import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertAcked;
import static org.hamcrest.Matchers.contains;
import static org.hamcrest.Matchers.containsInAnyOrder;
import static org.hamcrest.Matchers.containsString;
import static org.hamcrest.Matchers.equalTo;
Expand Down Expand Up @@ -785,14 +786,10 @@ public void testErrorRecordingOnRetention() throws Exception {
).get();
DataStreamLifecycleHealthInfo dslHealthInfoOnHealthNode = healthNodeResponse.getHealthInfo().dslHealthInfo();
assertThat(dslHealthInfoOnHealthNode, is(not(DataStreamLifecycleHealthInfo.NO_DSL_ERRORS)));
// perhaps surprisingly rollover and delete are error-ing due to the read_only block on the first generation
// index which prevents metadata updates so rolling over the data stream is also blocked (note that both indices error at
// the same time so they'll have an equal retry count - the order becomes of the results, usually ordered by retry count,
// becomes non deterministic, hence the dynamic matching of index name)
assertThat(dslHealthInfoOnHealthNode.dslErrorsInfo().size(), is(2));
assertThat(dslHealthInfoOnHealthNode.dslErrorsInfo().size(), is(1));
DslErrorInfo errorInfo = dslHealthInfoOnHealthNode.dslErrorsInfo().get(0);
assertThat(errorInfo.retryCount(), greaterThanOrEqualTo(3));
assertThat(List.of(firstGenerationIndex, secondGenerationIndex).contains(errorInfo.indexName()), is(true));
assertThat(errorInfo.indexName(), equalTo(firstGenerationIndex));
});

GetHealthAction.Response healthResponse = client().execute(GetHealthAction.INSTANCE, new GetHealthAction.Request(true, 1000))
Expand All @@ -808,15 +805,12 @@ public void testErrorRecordingOnRetention() throws Exception {
assertThat(dslIndicator.impacts(), is(STAGNATING_INDEX_IMPACT));
assertThat(
dslIndicator.symptom(),
is("2 backing indices have repeatedly encountered errors whilst trying to advance in its lifecycle")
is("A backing index has repeatedly encountered errors whilst trying to advance in its lifecycle")
);

Diagnosis diagnosis = dslIndicator.diagnosisList().get(0);
assertThat(diagnosis.definition(), is(STAGNATING_BACKING_INDICES_DIAGNOSIS_DEF));
assertThat(
diagnosis.affectedResources().get(0).getValues(),
containsInAnyOrder(firstGenerationIndex, secondGenerationIndex)
);
assertThat(diagnosis.affectedResources().get(0).getValues(), contains(firstGenerationIndex));
}

// let's mark the index as writeable and make sure it's deleted and the error store is empty
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -154,12 +154,33 @@ protected ClusterBlockException checkBlock(RolloverRequest request, ClusterState
.build(),
IndicesOptions.GatekeeperOptions.DEFAULT
);

return state.blocks()
.indicesBlockedException(
ClusterBlockLevel.METADATA_WRITE,
indexNameExpressionResolver.concreteIndexNames(state, indicesOptions, request)
);
ResolvedExpression resolvedRolloverTarget = SelectorResolver.parseExpression(request.getRolloverTarget(), request.indicesOptions());
final IndexAbstraction indexAbstraction = state.metadata().getIndicesLookup().get(resolvedRolloverTarget.resource());
final String[] indicesToCheck;
if (indexAbstraction.getType().equals(IndexAbstraction.Type.DATA_STREAM)) {
DataStream dataStream = (DataStream) indexAbstraction;
boolean targetFailureStore = resolvedRolloverTarget.selector() != null
&& resolvedRolloverTarget.selector().shouldIncludeFailures();
if (targetFailureStore == false) {
assert dataStream.getWriteIndex() != null : dataStream.getName() + " is a data stream but has no write index";
assert dataStream.getWriteIndex().getName() != null
: dataStream.getName() + " is a data stream but the write index is null";
indicesToCheck = new String[] { dataStream.getWriteIndex().getName() };
} else if (dataStream.getWriteFailureIndex() != null) {
assert dataStream.getWriteFailureIndex().getName() != null
: "the write index for the data stream " + dataStream.getName() + " is null";
indicesToCheck = new String[] { dataStream.getWriteFailureIndex().getName() };
} else {
indicesToCheck = null;
}
} else {
indicesToCheck = indexNameExpressionResolver.concreteIndexNames(state, indicesOptions, request);
}
if (indicesToCheck == null) {
return null;
} else {
return state.blocks().indicesBlockedException(ClusterBlockLevel.METADATA_WRITE, indicesToCheck);
}
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@

import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.ActionRequest;
import org.elasticsearch.action.IndicesRequest;
import org.elasticsearch.action.admin.indices.stats.CommonStats;
import org.elasticsearch.action.admin.indices.stats.IndexStats;
import org.elasticsearch.action.admin.indices.stats.IndicesStatsAction;
Expand All @@ -19,11 +20,14 @@
import org.elasticsearch.action.admin.indices.stats.ShardStats;
import org.elasticsearch.action.datastreams.autosharding.DataStreamAutoShardingService;
import org.elasticsearch.action.support.ActionFilters;
import org.elasticsearch.action.support.IndicesOptions;
import org.elasticsearch.action.support.PlainActionFuture;
import org.elasticsearch.action.support.master.AcknowledgedResponse;
import org.elasticsearch.client.internal.Client;
import org.elasticsearch.cluster.ClusterName;
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.block.ClusterBlockException;
import org.elasticsearch.cluster.block.ClusterBlocks;
import org.elasticsearch.cluster.metadata.AliasMetadata;
import org.elasticsearch.cluster.metadata.DataStream;
import org.elasticsearch.cluster.metadata.IndexMetadata;
Expand All @@ -43,6 +47,7 @@
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.unit.ByteSizeValue;
import org.elasticsearch.core.TimeValue;
import org.elasticsearch.index.Index;
import org.elasticsearch.index.IndexMode;
import org.elasticsearch.index.IndexVersion;
import org.elasticsearch.index.cache.query.QueryCacheStats;
Expand Down Expand Up @@ -578,6 +583,223 @@ public void testRolloverAliasToDataStreamFails() throws Exception {
assertThat(illegalStateException.getMessage(), containsString("Aliases to data streams cannot be rolled over."));
}

public void testCheckBlockForIndices() {
final TransportRolloverAction transportRolloverAction = new TransportRolloverAction(
mock(TransportService.class),
mockClusterService,
mockThreadPool,
mockActionFilters,
mockIndexNameExpressionResolver,
rolloverService,
mockClient,
mockAllocationService,
mockMetadataDataStreamService,
dataStreamAutoShardingService
);
final IndexMetadata.Builder indexMetadata1 = IndexMetadata.builder("my-index-1")
.putAlias(AliasMetadata.builder("my-alias").writeIndex(true).build())
.settings(settings(IndexVersion.current()))
.numberOfShards(1)
.numberOfReplicas(1);
final IndexMetadata indexMetadata2 = IndexMetadata.builder("my-index-2")
.settings(settings(IndexVersion.current()).put(IndexMetadata.INDEX_READ_ONLY_SETTING.getKey(), true))
.numberOfShards(1)
.numberOfReplicas(1)
.build();
final ClusterState stateBefore = ClusterState.builder(ClusterName.DEFAULT)
.metadata(Metadata.builder().put(indexMetadata1).put(indexMetadata2, false))
.blocks(ClusterBlocks.builder().addBlocks(indexMetadata2))
.build();
{
RolloverRequest rolloverRequest = new RolloverRequest("my-alias", "my-new-index");
when(mockIndexNameExpressionResolver.concreteIndexNames(any(), any(), (IndicesRequest) any())).thenReturn(
new String[] { "my-index-1" }
);
assertNull(transportRolloverAction.checkBlock(rolloverRequest, stateBefore));
}
{
RolloverRequest rolloverRequest = new RolloverRequest("my-index-2", "my-new-index");
when(mockIndexNameExpressionResolver.concreteIndexNames(any(), any(), (IndicesRequest) any())).thenReturn(
new String[] { "my-index-2" }
);
assertNotNull(transportRolloverAction.checkBlock(rolloverRequest, stateBefore));
}
}

public void testCheckBlockForDataStreams() {
final TransportRolloverAction transportRolloverAction = new TransportRolloverAction(
mock(TransportService.class),
mockClusterService,
mockThreadPool,
mockActionFilters,
mockIndexNameExpressionResolver,
rolloverService,
mockClient,
mockAllocationService,
mockMetadataDataStreamService,
dataStreamAutoShardingService
);
String dataStreamName = randomAlphaOfLength(20);
{
// First, make sure checkBlock returns null when there are no blocks
final ClusterState clusterState = createDataStream(
dataStreamName,
false,
false,
randomBoolean(),
randomBoolean(),
randomBoolean()
);
RolloverRequest rolloverRequest = new RolloverRequest(dataStreamName, null);
assertNull(transportRolloverAction.checkBlock(rolloverRequest, clusterState));
}
{
// Make sure checkBlock returns null when indices other than the write index have blocks
final ClusterState clusterState = createDataStream(
dataStreamName,
false,
true,
randomBoolean(),
randomBoolean(),
randomBoolean()
);
RolloverRequest rolloverRequest = new RolloverRequest(dataStreamName, null);
assertNull(transportRolloverAction.checkBlock(rolloverRequest, clusterState));
}
{
// Make sure checkBlock returns null when indices other than the write index have blocks and we use "::data"
final ClusterState clusterState = createDataStream(
dataStreamName,
false,
true,
randomBoolean(),
randomBoolean(),
randomBoolean()
);
RolloverRequest rolloverRequest = new RolloverRequest(dataStreamName + "::data", null);
assertNull(transportRolloverAction.checkBlock(rolloverRequest, clusterState));
}
{
// Make sure checkBlock returns an exception when the write index has a block
ClusterState clusterState = createDataStream(
dataStreamName,
true,
randomBoolean(),
randomBoolean(),
randomBoolean(),
randomBoolean()
);
RolloverRequest rolloverRequest = new RolloverRequest(dataStreamName, null);
if (randomBoolean()) {
rolloverRequest.setIndicesOptions(IndicesOptions.lenientExpandOpenNoSelectors());
}
ClusterBlockException e = transportRolloverAction.checkBlock(rolloverRequest, clusterState);
assertNotNull(e);
}
{
// Make sure checkBlock returns an exception when the write index has a block and we use "::data"
ClusterState clusterState = createDataStream(
dataStreamName,
true,
randomBoolean(),
randomBoolean(),
randomBoolean(),
randomBoolean()
);
RolloverRequest rolloverRequest = new RolloverRequest(dataStreamName + "::data", null);
ClusterBlockException e = transportRolloverAction.checkBlock(rolloverRequest, clusterState);
assertNotNull(e);
}
}

public void testCheckBlockForDataStreamFailureStores() {
final TransportRolloverAction transportRolloverAction = new TransportRolloverAction(
mock(TransportService.class),
mockClusterService,
mockThreadPool,
mockActionFilters,
mockIndexNameExpressionResolver,
rolloverService,
mockClient,
mockAllocationService,
mockMetadataDataStreamService,
dataStreamAutoShardingService
);
String dataStreamName = randomAlphaOfLength(20);
{
// Make sure checkBlock returns no exception when there is no failure store block
ClusterState clusterState = createDataStream(dataStreamName, randomBoolean(), randomBoolean(), true, false, false);
RolloverRequest rolloverRequest = new RolloverRequest(dataStreamName + "::failures", null);
assertNull(transportRolloverAction.checkBlock(rolloverRequest, clusterState));
}
{
// Make sure checkBlock returns an exception when the failure store write index has a block
ClusterState clusterState = createDataStream(dataStreamName, randomBoolean(), randomBoolean(), true, true, randomBoolean());
RolloverRequest rolloverRequest = new RolloverRequest(dataStreamName + "::failures", null);
assertNotNull(transportRolloverAction.checkBlock(rolloverRequest, clusterState));
}
{
// Make sure checkBlock returns no exception when failure store non-write indices have a block
ClusterState clusterState = createDataStream(dataStreamName, randomBoolean(), randomBoolean(), true, false, true);
RolloverRequest rolloverRequest = new RolloverRequest(dataStreamName + "::failures", null);
assertNull(transportRolloverAction.checkBlock(rolloverRequest, clusterState));
}
}

private ClusterState createDataStream(
String dataStreamName,
boolean blockOnWriteIndex,
boolean blocksOnNonWriteIndices,
boolean includeFailureStore,
boolean blockOnFailureStoreWriteIndex,
boolean blockOnFailureStoreNonWriteIndices
) {
ClusterState.Builder clusterStateBuilder = ClusterState.builder(ClusterName.DEFAULT);
Metadata.Builder metadataBuilder = Metadata.builder();
ClusterBlocks.Builder clusterBlocksBuilder = ClusterBlocks.builder();
List<Index> indices = new ArrayList<>();
int totalIndices = randomIntBetween(1, 20);
for (int i = 0; i < totalIndices; i++) {
Settings.Builder settingsBuilder = settings(IndexVersion.current());
if ((blockOnWriteIndex && i == totalIndices - 1) || (blocksOnNonWriteIndices && i != totalIndices - 1)) {
settingsBuilder.put(IndexMetadata.INDEX_READ_ONLY_SETTING.getKey(), true);
}
final IndexMetadata backingIndexMetadata = IndexMetadata.builder(".ds-logs-ds-00000" + (i + 1))
.settings(settingsBuilder)
.numberOfShards(1)
.numberOfReplicas(1)
.build();
metadataBuilder.put(backingIndexMetadata, false);
indices.add(backingIndexMetadata.getIndex());
clusterBlocksBuilder.addBlocks(backingIndexMetadata);
}

DataStream.Builder dataStreamBuilder = DataStream.builder(dataStreamName, indices)
.setMetadata(Map.of())
.setIndexMode(randomFrom(IndexMode.values()));
if (includeFailureStore) {
List<Index> failureStoreIndices = new ArrayList<>();
int totalFailureStoreIndices = randomIntBetween(1, 20);
for (int i = 0; i < totalFailureStoreIndices; i++) {
Settings.Builder settingsBuilder = settings(IndexVersion.current());
if ((blockOnFailureStoreWriteIndex && i == totalFailureStoreIndices - 1)
|| (blockOnFailureStoreNonWriteIndices && i != totalFailureStoreIndices - 1)) {
settingsBuilder.put(IndexMetadata.INDEX_READ_ONLY_SETTING.getKey(), true);
}
final IndexMetadata failureStoreIndexMetadata = IndexMetadata.builder(
DataStream.getDefaultFailureStoreName(dataStreamName, i + 1, randomMillisUpToYear9999())
).settings(settingsBuilder).numberOfShards(1).numberOfReplicas(1).build();
failureStoreIndices.add(failureStoreIndexMetadata.getIndex());
clusterBlocksBuilder.addBlocks(failureStoreIndexMetadata);
}
dataStreamBuilder.setFailureIndices(DataStream.DataStreamIndices.failureIndicesBuilder(failureStoreIndices).build());
}
clusterStateBuilder.blocks(clusterBlocksBuilder);
final DataStream dataStream = dataStreamBuilder.build();
metadataBuilder.put(dataStream);
return clusterStateBuilder.metadata(metadataBuilder).build();
}

private IndicesStatsResponse createIndicesStatResponse(String indexName, long totalDocs, long primariesDocs) {
final CommonStats primaryStats = mock(CommonStats.class);
when(primaryStats.getDocs()).thenReturn(new DocsStats(primariesDocs, 0, between(1, 10000)));
Expand Down