Skip to content

Commit 30a706a

Browse files
authored
Revert "Remove INDEX_REFRESH_BLOCK after index becomes searchable (#120807)" (#121427)
This reverts commit ae0f1a6. The refresh block would be removed in a subsequent cluster state update instead of removing it immediately after an index is ready for searches. Closes ES-10697
1 parent c570950 commit 30a706a

File tree

4 files changed

+8
-217
lines changed

4 files changed

+8
-217
lines changed

docs/changelog/120807.yaml

Lines changed: 0 additions & 5 deletions
This file was deleted.

server/src/main/java/org/elasticsearch/cluster/action/shard/ShardStateAction.java

Lines changed: 0 additions & 55 deletions
Original file line numberDiff line numberDiff line change
@@ -23,8 +23,6 @@
2323
import org.elasticsearch.cluster.ClusterStateTaskExecutor;
2424
import org.elasticsearch.cluster.ClusterStateTaskListener;
2525
import org.elasticsearch.cluster.NotMasterException;
26-
import org.elasticsearch.cluster.block.ClusterBlock;
27-
import org.elasticsearch.cluster.block.ClusterBlocks;
2826
import org.elasticsearch.cluster.coordination.FailedToCommitClusterStateException;
2927
import org.elasticsearch.cluster.metadata.IndexMetadata;
3028
import org.elasticsearch.cluster.metadata.Metadata;
@@ -72,7 +70,6 @@
7270

7371
import static org.apache.logging.log4j.Level.DEBUG;
7472
import static org.apache.logging.log4j.Level.ERROR;
75-
import static org.elasticsearch.cluster.metadata.IndexMetadata.INDEX_REFRESH_BLOCK;
7673
import static org.elasticsearch.cluster.service.MasterService.isPublishFailureException;
7774
import static org.elasticsearch.core.Strings.format;
7875

@@ -622,7 +619,6 @@ public ClusterState execute(BatchExecutionContext<StartedShardUpdateTask> batchE
622619
List<TaskContext<StartedShardUpdateTask>> tasksToBeApplied = new ArrayList<>();
623620
List<ShardRouting> shardRoutingsToBeApplied = new ArrayList<>(batchExecutionContext.taskContexts().size());
624621
Set<ShardRouting> seenShardRoutings = new HashSet<>(); // to prevent duplicates
625-
Set<Index> indicesWithUnpromotableShardsStarted = null;
626622
final Map<Index, ClusterStateTimeRanges> updatedTimestampRanges = new HashMap<>();
627623
final ClusterState initialState = batchExecutionContext.initialState();
628624
for (var taskContext : batchExecutionContext.taskContexts()) {
@@ -741,14 +737,6 @@ public ClusterState execute(BatchExecutionContext<StartedShardUpdateTask> batchE
741737
new ClusterStateTimeRanges(newTimestampMillisRange, newEventIngestedMillisRange)
742738
);
743739
}
744-
745-
if (matched.isPromotableToPrimary() == false
746-
&& initialState.blocks().hasIndexBlock(index.getName(), INDEX_REFRESH_BLOCK)) {
747-
if (indicesWithUnpromotableShardsStarted == null) {
748-
indicesWithUnpromotableShardsStarted = new HashSet<>();
749-
}
750-
indicesWithUnpromotableShardsStarted.add(index);
751-
}
752740
}
753741
}
754742
}
@@ -772,10 +760,7 @@ public ClusterState execute(BatchExecutionContext<StartedShardUpdateTask> batchE
772760
maybeUpdatedState = ClusterState.builder(maybeUpdatedState).metadata(metadataBuilder).build();
773761
}
774762

775-
maybeUpdatedState = maybeRemoveIndexRefreshBlocks(maybeUpdatedState, indicesWithUnpromotableShardsStarted);
776-
777763
assert assertStartedIndicesHaveCompleteTimestampRanges(maybeUpdatedState);
778-
assert assertRefreshBlockIsNotPresentWhenTheIndexIsSearchable(maybeUpdatedState);
779764

780765
for (final var taskContext : tasksToBeApplied) {
781766
final var task = taskContext.getTask();
@@ -791,36 +776,6 @@ public ClusterState execute(BatchExecutionContext<StartedShardUpdateTask> batchE
791776
return maybeUpdatedState;
792777
}
793778

794-
private static ClusterState maybeRemoveIndexRefreshBlocks(
795-
ClusterState clusterState,
796-
@Nullable Set<Index> indicesWithUnpromotableShardsStarted
797-
) {
798-
// The provided cluster state must include the newly STARTED unpromotable shards
799-
if (indicesWithUnpromotableShardsStarted == null) {
800-
return clusterState;
801-
}
802-
803-
ClusterBlocks.Builder clusterBlocksBuilder = null;
804-
for (Index indexWithUnpromotableShardsStarted : indicesWithUnpromotableShardsStarted) {
805-
String indexName = indexWithUnpromotableShardsStarted.getName();
806-
assert clusterState.blocks().hasIndexBlock(indexName, INDEX_REFRESH_BLOCK) : indexWithUnpromotableShardsStarted;
807-
808-
var indexRoutingTable = clusterState.routingTable().index(indexWithUnpromotableShardsStarted);
809-
if (indexRoutingTable.readyForSearch()) {
810-
if (clusterBlocksBuilder == null) {
811-
clusterBlocksBuilder = ClusterBlocks.builder(clusterState.blocks());
812-
}
813-
clusterBlocksBuilder.removeIndexBlock(indexName, INDEX_REFRESH_BLOCK);
814-
}
815-
}
816-
817-
if (clusterBlocksBuilder == null) {
818-
return clusterState;
819-
}
820-
821-
return ClusterState.builder(clusterState).blocks(clusterBlocksBuilder).build();
822-
}
823-
824779
private static boolean assertStartedIndicesHaveCompleteTimestampRanges(ClusterState clusterState) {
825780
for (Map.Entry<String, IndexRoutingTable> cursor : clusterState.getRoutingTable().getIndicesRouting().entrySet()) {
826781
assert cursor.getValue().allPrimaryShardsActive() == false
@@ -844,16 +799,6 @@ private static boolean assertStartedIndicesHaveCompleteTimestampRanges(ClusterSt
844799
return true;
845800
}
846801

847-
private static boolean assertRefreshBlockIsNotPresentWhenTheIndexIsSearchable(ClusterState clusterState) {
848-
for (Map.Entry<String, Set<ClusterBlock>> indexBlock : clusterState.blocks().indices().entrySet()) {
849-
if (indexBlock.getValue().contains(INDEX_REFRESH_BLOCK)) {
850-
assert clusterState.routingTable().index(indexBlock.getKey()).readyForSearch() == false
851-
: "Index [" + indexBlock.getKey() + "] is searchable but has an INDEX_REFRESH_BLOCK";
852-
}
853-
}
854-
return true;
855-
}
856-
857802
@Override
858803
public void clusterStatePublished(ClusterState newClusterState) {
859804
rerouteService.reroute(

server/src/test/java/org/elasticsearch/cluster/action/shard/ShardStartedClusterStateTaskExecutorTests.java

Lines changed: 0 additions & 117 deletions
Original file line numberDiff line numberDiff line change
@@ -12,43 +12,34 @@
1212
import org.elasticsearch.TransportVersions;
1313
import org.elasticsearch.action.ActionListener;
1414
import org.elasticsearch.action.support.ActionTestUtils;
15-
import org.elasticsearch.action.support.replication.ClusterStateCreationUtils;
1615
import org.elasticsearch.cluster.ClusterState;
1716
import org.elasticsearch.cluster.ESAllocationTestCase;
1817
import org.elasticsearch.cluster.action.shard.ShardStateAction.StartedShardEntry;
1918
import org.elasticsearch.cluster.action.shard.ShardStateAction.StartedShardUpdateTask;
20-
import org.elasticsearch.cluster.block.ClusterBlocks;
2119
import org.elasticsearch.cluster.metadata.IndexMetadata;
2220
import org.elasticsearch.cluster.metadata.Metadata;
23-
import org.elasticsearch.cluster.routing.AllocationId;
24-
import org.elasticsearch.cluster.routing.IndexRoutingTable;
2521
import org.elasticsearch.cluster.routing.IndexShardRoutingTable;
26-
import org.elasticsearch.cluster.routing.RoutingTable;
2722
import org.elasticsearch.cluster.routing.ShardRouting;
2823
import org.elasticsearch.cluster.routing.ShardRoutingState;
2924
import org.elasticsearch.cluster.routing.allocation.AllocationService;
3025
import org.elasticsearch.cluster.service.ClusterStateTaskExecutorUtils;
3126
import org.elasticsearch.common.Priority;
3227
import org.elasticsearch.common.settings.Settings;
33-
import org.elasticsearch.core.Tuple;
3428
import org.elasticsearch.index.shard.IndexLongFieldRange;
3529
import org.elasticsearch.index.shard.ShardId;
3630
import org.elasticsearch.index.shard.ShardLongFieldRange;
3731

3832
import java.util.List;
39-
import java.util.stream.Collectors;
4033
import java.util.stream.IntStream;
4134
import java.util.stream.Stream;
4235

4336
import static org.elasticsearch.action.support.replication.ClusterStateCreationUtils.state;
4437
import static org.elasticsearch.action.support.replication.ClusterStateCreationUtils.stateWithActivePrimary;
4538
import static org.elasticsearch.action.support.replication.ClusterStateCreationUtils.stateWithAssignedPrimariesAndReplicas;
4639
import static org.elasticsearch.action.support.replication.ClusterStateCreationUtils.stateWithNoShard;
47-
import static org.elasticsearch.cluster.metadata.IndexMetadata.INDEX_REFRESH_BLOCK;
4840
import static org.elasticsearch.cluster.routing.allocation.decider.ThrottlingAllocationDecider.CLUSTER_ROUTING_ALLOCATION_NODE_CONCURRENT_RECOVERIES_SETTING;
4941
import static org.hamcrest.Matchers.equalTo;
5042
import static org.hamcrest.Matchers.is;
51-
import static org.hamcrest.Matchers.notNullValue;
5243
import static org.hamcrest.Matchers.sameInstance;
5344

5445
public class ShardStartedClusterStateTaskExecutorTests extends ESAllocationTestCase {
@@ -488,114 +479,6 @@ public void testExpandsTimestampRangeForReplica() throws Exception {
488479
assertThat(latestIndexMetadata.getEventIngestedRange(), sameInstance(IndexLongFieldRange.UNKNOWN));
489480
}
490481

491-
public void testIndexRefreshBlockIsClearedOnceTheIndexIsReadyToBeSearched() throws Exception {
492-
final var indexName = "test";
493-
final var numberOfShards = randomIntBetween(1, 4);
494-
final var numberOfReplicas = randomIntBetween(1, 4);
495-
var clusterState = ClusterStateCreationUtils.stateWithAssignedPrimariesAndReplicasWithState(
496-
new String[] { indexName },
497-
numberOfShards,
498-
ShardRouting.Role.INDEX_ONLY,
499-
IntStream.range(0, numberOfReplicas)
500-
.mapToObj(unused -> Tuple.tuple(ShardRoutingState.UNASSIGNED, ShardRouting.Role.SEARCH_ONLY))
501-
.toList()
502-
);
503-
504-
clusterState = ClusterState.builder(clusterState)
505-
.metadata(Metadata.builder(clusterState.metadata()).put(withActiveShardsInSyncAllocationIds(clusterState, indexName)))
506-
.blocks(ClusterBlocks.builder(clusterState.blocks()).addIndexBlock(indexName, INDEX_REFRESH_BLOCK))
507-
.build();
508-
509-
while (clusterState.blocks().hasIndexBlock(indexName, INDEX_REFRESH_BLOCK)) {
510-
clusterState = maybeInitializeUnassignedReplicaShard(clusterState);
511-
512-
final IndexMetadata indexMetadata = clusterState.metadata().index(indexName);
513-
514-
final var initializingReplicaShardOpt = clusterState.routingTable()
515-
.allShards()
516-
.filter(shardRouting -> shardRouting.isPromotableToPrimary() == false)
517-
.filter(shardRouting -> shardRouting.state().equals(ShardRoutingState.INITIALIZING))
518-
.findFirst();
519-
520-
assertThat(clusterState.routingTable().allShards().toList().toString(), initializingReplicaShardOpt.isPresent(), is(true));
521-
522-
var initializingReplicaShard = initializingReplicaShardOpt.get();
523-
524-
final var shardId = initializingReplicaShard.shardId();
525-
final var primaryTerm = indexMetadata.primaryTerm(shardId.id());
526-
final var replicaAllocationId = initializingReplicaShard.allocationId().getId();
527-
final var task = new StartedShardUpdateTask(
528-
new StartedShardEntry(
529-
shardId,
530-
replicaAllocationId,
531-
primaryTerm,
532-
"test",
533-
ShardLongFieldRange.UNKNOWN,
534-
ShardLongFieldRange.UNKNOWN
535-
),
536-
createTestListener()
537-
);
538-
539-
final var resultingState = executeTasks(clusterState, List.of(task));
540-
assertNotSame(clusterState, resultingState);
541-
542-
clusterState = resultingState;
543-
}
544-
545-
var indexRoutingTable = clusterState.routingTable().index(indexName);
546-
assertThat(indexRoutingTable.readyForSearch(), is(true));
547-
for (int i = 0; i < numberOfShards; i++) {
548-
var shardRoutingTable = indexRoutingTable.shard(i);
549-
assertThat(shardRoutingTable, is(notNullValue()));
550-
// Ensure that at least one unpromotable shard is either STARTED or RELOCATING
551-
assertThat(shardRoutingTable.unpromotableShards().isEmpty(), is(false));
552-
}
553-
assertThat(clusterState.blocks().hasIndexBlock(indexName, INDEX_REFRESH_BLOCK), is(false));
554-
}
555-
556-
private static ClusterState maybeInitializeUnassignedReplicaShard(ClusterState clusterState) {
557-
var unassignedShardRoutingOpt = clusterState.routingTable()
558-
.allShards()
559-
.filter(shardRouting -> shardRouting.state().equals(ShardRoutingState.UNASSIGNED))
560-
.findFirst();
561-
562-
if (unassignedShardRoutingOpt.isEmpty()) {
563-
return clusterState;
564-
}
565-
566-
var unassignedShardRouting = unassignedShardRoutingOpt.get();
567-
var initializedShard = unassignedShardRouting.initialize(randomUUID(), null, 1);
568-
569-
RoutingTable routingTable = clusterState.routingTable();
570-
IndexRoutingTable indexRoutingTable = routingTable.index(unassignedShardRouting.getIndexName());
571-
IndexRoutingTable.Builder newIndexRoutingTable = IndexRoutingTable.builder(indexRoutingTable.getIndex());
572-
for (int shardId = 0; shardId < indexRoutingTable.size(); shardId++) {
573-
IndexShardRoutingTable shardRoutingTable = indexRoutingTable.shard(shardId);
574-
for (int copy = 0; copy < shardRoutingTable.size(); copy++) {
575-
ShardRouting shardRouting = shardRoutingTable.shard(copy);
576-
newIndexRoutingTable.addShard(shardRouting == unassignedShardRouting ? initializedShard : shardRouting);
577-
}
578-
}
579-
routingTable = RoutingTable.builder(routingTable).add(newIndexRoutingTable).build();
580-
return ClusterState.builder(clusterState).routingTable(routingTable).build();
581-
}
582-
583-
private static IndexMetadata.Builder withActiveShardsInSyncAllocationIds(ClusterState clusterState, String indexName) {
584-
IndexMetadata.Builder indexMetadataBuilder = new IndexMetadata.Builder(clusterState.metadata().index(indexName));
585-
var indexRoutingTable = clusterState.routingTable().index(indexName);
586-
for (IndexShardRoutingTable indexShardRoutingTable : indexRoutingTable.allShards().toList()) {
587-
indexMetadataBuilder.putInSyncAllocationIds(
588-
indexShardRoutingTable.shardId().getId(),
589-
indexShardRoutingTable.activeShards()
590-
.stream()
591-
.map(ShardRouting::allocationId)
592-
.map(AllocationId::getId)
593-
.collect(Collectors.toSet())
594-
);
595-
}
596-
return indexMetadataBuilder;
597-
}
598-
599482
private ClusterState executeTasks(final ClusterState state, final List<StartedShardUpdateTask> tasks) throws Exception {
600483
return ClusterStateTaskExecutorUtils.executeAndAssertSuccessful(state, executor, tasks);
601484
}

test/framework/src/main/java/org/elasticsearch/action/support/replication/ClusterStateCreationUtils.java

Lines changed: 8 additions & 40 deletions
Original file line numberDiff line numberDiff line change
@@ -363,34 +363,7 @@ public static ClusterState stateWithAssignedPrimariesAndReplicas(
363363
int numberOfShards,
364364
List<ShardRouting.Role> replicaRoles
365365
) {
366-
return stateWithAssignedPrimariesAndReplicasWithState(
367-
indices,
368-
numberOfShards,
369-
replicaRoles.stream().map(role -> Tuple.tuple(ShardRoutingState.STARTED, role)).toList()
370-
);
371-
}
372-
373-
/**
374-
* Creates cluster state with several indexes, shards and replicas (with given roles and state) and all primary shards STARTED.
375-
*/
376-
public static ClusterState stateWithAssignedPrimariesAndReplicasWithState(
377-
String[] indices,
378-
int numberOfShards,
379-
List<Tuple<ShardRoutingState, ShardRouting.Role>> replicaRoleAndStates
380-
) {
381-
return stateWithAssignedPrimariesAndReplicasWithState(indices, numberOfShards, ShardRouting.Role.DEFAULT, replicaRoleAndStates);
382-
}
383-
384-
/**
385-
* Creates cluster state with several indexes, shards and replicas (with given roles and state) and all primary shards STARTED.
386-
*/
387-
public static ClusterState stateWithAssignedPrimariesAndReplicasWithState(
388-
String[] indices,
389-
int numberOfShards,
390-
ShardRouting.Role primaryRole,
391-
List<Tuple<ShardRoutingState, ShardRouting.Role>> replicasStateAndRoles
392-
) {
393-
int numberOfDataNodes = replicasStateAndRoles.size() + 1;
366+
int numberOfDataNodes = replicaRoles.size() + 1;
394367
DiscoveryNodes.Builder discoBuilder = DiscoveryNodes.builder();
395368
for (int i = 0; i < numberOfDataNodes + 1; i++) {
396369
final DiscoveryNode node = newNode(i);
@@ -410,7 +383,7 @@ public static ClusterState stateWithAssignedPrimariesAndReplicasWithState(
410383
for (String index : indices) {
411384
IndexMetadata indexMetadata = IndexMetadata.builder(index)
412385
.settings(
413-
indexSettings(IndexVersion.current(), numberOfShards, replicasStateAndRoles.size()).put(
386+
indexSettings(IndexVersion.current(), numberOfShards, replicaRoles.size()).put(
414387
SETTING_CREATION_DATE,
415388
System.currentTimeMillis()
416389
)
@@ -424,19 +397,14 @@ public static ClusterState stateWithAssignedPrimariesAndReplicasWithState(
424397
final ShardId shardId = new ShardId(index, "_na_", i);
425398
IndexShardRoutingTable.Builder indexShardRoutingBuilder = IndexShardRoutingTable.builder(shardId);
426399
indexShardRoutingBuilder.addShard(
427-
shardRoutingBuilder(index, i, newNode(0).getId(), true, ShardRoutingState.STARTED).withRole(primaryRole).build()
400+
TestShardRouting.newShardRouting(index, i, newNode(0).getId(), null, true, ShardRoutingState.STARTED)
428401
);
429-
for (int replica = 0; replica < replicasStateAndRoles.size(); replica++) {
430-
var replicaStateAndRole = replicasStateAndRoles.get(replica);
431-
ShardRoutingState shardRoutingState = replicaStateAndRole.v1();
432-
String currentNodeId = shardRoutingState.equals(ShardRoutingState.UNASSIGNED) ? null : newNode(replica + 1).getId();
433-
var shardRoutingBuilder = shardRoutingBuilder(index, i, currentNodeId, false, shardRoutingState).withRole(
434-
replicaStateAndRole.v2()
402+
for (int replica = 0; replica < replicaRoles.size(); replica++) {
403+
indexShardRoutingBuilder.addShard(
404+
shardRoutingBuilder(index, i, newNode(replica + 1).getId(), false, ShardRoutingState.STARTED).withRole(
405+
replicaRoles.get(replica)
406+
).build()
435407
);
436-
if (shardRoutingState.equals(ShardRoutingState.RELOCATING)) {
437-
shardRoutingBuilder.withRelocatingNodeId(DiscoveryNodeUtils.create("relocating_" + replica).getId());
438-
}
439-
indexShardRoutingBuilder.addShard(shardRoutingBuilder.build());
440408
}
441409
indexRoutingTableBuilder.addIndexShard(indexShardRoutingBuilder);
442410
}

0 commit comments

Comments
 (0)