Skip to content

Commit 6ae508a

Browse files
committed
Feedback and tests
1 parent 77a3ae2 commit 6ae508a

File tree

4 files changed

+357
-9
lines changed

4 files changed

+357
-9
lines changed

server/src/main/java/org/elasticsearch/action/support/replication/TransportBroadcastReplicationAction.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -100,8 +100,8 @@ public void accept(ActionListener<Response> listener) {
100100
assert totalShardCopyCount == 0 && successShardCopyCount == 0 && allFailures.isEmpty() : "shouldn't call this twice";
101101

102102
final ClusterState clusterState = clusterService.state();
103-
final ProjectMetadata project = projectResolver.getProjectMetadata(clusterState);
104103
final ProjectState projectState = projectResolver.getProjectState(clusterState);
104+
final ProjectMetadata project = projectState.metadata();
105105
final List<ShardId> shards = shards(request, projectState);
106106
final Map<String, IndexMetadata> indexMetadataByName = project.indices();
107107

server/src/main/java/org/elasticsearch/cluster/routing/OperationRouting.java

Lines changed: 11 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -116,7 +116,7 @@ public List<ShardIterator> searchShards(
116116
}
117117

118118
public Iterator<IndexShardRoutingTable> allWritableShards(ProjectState projectState, String index) {
119-
return allShardsReadyForWrites(projectState, index);
119+
return allWriteAddressableShards(projectState, index);
120120
}
121121

122122
public static ShardIterator getShards(RoutingTable routingTable, ShardId shardId) {
@@ -163,29 +163,32 @@ private static void collectTargetShardsWithRouting(
163163

164164
private static void collectTargetShardsNoRouting(ProjectState projectState, String[] concreteIndices, Set<IndexShardRoutingTable> set) {
165165
for (String index : concreteIndices) {
166-
Iterator<IndexShardRoutingTable> iterator = allShardsReadyForSearch(projectState, index);
166+
Iterator<IndexShardRoutingTable> iterator = allSearchAddressableShards(projectState, index);
167167
while (iterator.hasNext()) {
168168
set.add(iterator.next());
169169
}
170170
}
171171
}
172172

173173
/**
174-
* Returns an iterator of shards of the index that are ready to execute search requests.
175-
* A shard may not be ready to execute these operations during processes like resharding.
174+
* Returns an iterator of shards that can possibly serve searches. A shard may not be addressable during processes like resharding.
175+
* This logic is not related to shard state or a recovery process. A shard returned here may f.e. be unassigned.
176176
*/
177-
private static Iterator<IndexShardRoutingTable> allShardsReadyForSearch(ProjectState projectState, String index) {
177+
private static Iterator<IndexShardRoutingTable> allSearchAddressableShards(ProjectState projectState, String index) {
178178
return allShardsExceptSplitTargetsInStateBefore(projectState, index, IndexReshardingState.Split.TargetShardState.SPLIT);
179179
}
180180

181181
/**
182-
* Returns an iterator of shards of the index that are ready to execute write requests.
183-
* A shard may not be ready to execute these operations during processes like resharding.
182+
* Returns an iterator of shards that can possibly serve writes. A shard may not be addressable during processes like resharding.
183+
* This logic is not related to shard state or a recovery process. A shard returned here may f.e. be unassigned.
184184
*/
185-
private static Iterator<IndexShardRoutingTable> allShardsReadyForWrites(ProjectState projectState, String index) {
185+
private static Iterator<IndexShardRoutingTable> allWriteAddressableShards(ProjectState projectState, String index) {
186186
return allShardsExceptSplitTargetsInStateBefore(projectState, index, IndexReshardingState.Split.TargetShardState.HANDOFF);
187187
}
188188

189+
/**
190+
* Filters shards based on their state in resharding metadata. If resharing metadata is not present returns all shards.
191+
*/
189192
private static Iterator<IndexShardRoutingTable> allShardsExceptSplitTargetsInStateBefore(
190193
ProjectState projectState,
191194
String index,

server/src/test/java/org/elasticsearch/cluster/routing/IndexRoutingTests.java

Lines changed: 190 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,8 @@
1414
import org.elasticsearch.action.RoutingMissingException;
1515
import org.elasticsearch.action.index.IndexRequest;
1616
import org.elasticsearch.cluster.metadata.IndexMetadata;
17+
import org.elasticsearch.cluster.metadata.IndexReshardingMetadata;
18+
import org.elasticsearch.cluster.metadata.IndexReshardingState;
1719
import org.elasticsearch.common.bytes.BytesReference;
1820
import org.elasticsearch.core.Nullable;
1921
import org.elasticsearch.index.IndexMode;
@@ -30,6 +32,7 @@
3032
import org.elasticsearch.xcontent.support.MapXContentParser;
3133

3234
import java.io.IOException;
35+
import java.util.ArrayList;
3336
import java.util.Arrays;
3437
import java.util.HashMap;
3538
import java.util.HashSet;
@@ -38,6 +41,7 @@
3841
import java.util.Set;
3942
import java.util.TreeMap;
4043
import java.util.concurrent.atomic.AtomicInteger;
44+
import java.util.function.Function;
4145

4246
import static org.hamcrest.Matchers.equalTo;
4347
import static org.hamcrest.Matchers.hasSize;
@@ -687,6 +691,192 @@ public void testRoutingPathLogsdb() throws IOException {
687691
assertEquals(expectedShard, routing.getShard(req.id(), null));
688692
}
689693

694+
public void testCollectSearchShardsUnpartitionedWithResharding() throws IOException {
695+
int shards = 1;
696+
int newShardCount = 2;
697+
var initialRouting = IndexRouting.fromIndexMetadata(
698+
IndexMetadata.builder("test")
699+
.settings(indexSettings(IndexVersion.current(), 2, 1))
700+
.numberOfShards(newShardCount)
701+
.numberOfReplicas(1)
702+
.build()
703+
);
704+
705+
var shardToRouting = new HashMap<Integer, String>();
706+
do {
707+
var routing = randomAlphaOfLength(5);
708+
var shard = initialRouting.indexShard("dummy", routing, null, null);
709+
if (shardToRouting.containsKey(shard) == false) {
710+
shardToRouting.put(shard, routing);
711+
}
712+
} while (shardToRouting.size() < newShardCount);
713+
714+
var initialReshardingRouting = IndexRouting.fromIndexMetadata(
715+
IndexMetadata.builder("test")
716+
.settings(indexSettings(IndexVersion.current(), newShardCount, 1))
717+
.numberOfShards(newShardCount)
718+
.numberOfReplicas(1)
719+
.reshardingMetadata(IndexReshardingMetadata.newSplitByMultiple(shards, 2))
720+
.build()
721+
);
722+
723+
for (var shardAndRouting : shardToRouting.entrySet()) {
724+
var collectedShards = new ArrayList<>();
725+
initialReshardingRouting.collectSearchShards(shardAndRouting.getValue(), collectedShards::add);
726+
assertEquals(1, collectedShards.size());
727+
// Rerouting is in effect due to resharding metadata having a shard in CLONE state.
728+
assertEquals(0, collectedShards.get(0));
729+
}
730+
731+
var reshardingRoutingWithShardInHandoff = IndexRouting.fromIndexMetadata(
732+
IndexMetadata.builder("test")
733+
.settings(indexSettings(IndexVersion.current(), newShardCount, 1))
734+
.numberOfShards(newShardCount)
735+
.numberOfReplicas(1)
736+
.reshardingMetadata(
737+
IndexReshardingMetadata.newSplitByMultiple(shards, 2)
738+
.transitionSplitTargetToNewState(new ShardId("test", "na", 1), IndexReshardingState.Split.TargetShardState.HANDOFF)
739+
)
740+
.build()
741+
);
742+
743+
for (var shardAndRouting : shardToRouting.entrySet()) {
744+
var collectedShards = new ArrayList<>();
745+
reshardingRoutingWithShardInHandoff.collectSearchShards(shardAndRouting.getValue(), collectedShards::add);
746+
assertEquals(1, collectedShards.size());
747+
// Rerouting is in effect due to resharding metadata having a shard in CLONE state.
748+
assertEquals(0, collectedShards.get(0));
749+
}
750+
751+
var reshardingRoutingWithShardInSplit = IndexRouting.fromIndexMetadata(
752+
IndexMetadata.builder("test")
753+
.settings(indexSettings(IndexVersion.current(), newShardCount, 1))
754+
.numberOfShards(newShardCount)
755+
.numberOfReplicas(1)
756+
.reshardingMetadata(
757+
IndexReshardingMetadata.newSplitByMultiple(shards, 2)
758+
.transitionSplitTargetToNewState(new ShardId("test", "na", 1), IndexReshardingState.Split.TargetShardState.HANDOFF)
759+
.transitionSplitTargetToNewState(new ShardId("test", "na", 1), IndexReshardingState.Split.TargetShardState.SPLIT)
760+
)
761+
.build()
762+
);
763+
764+
for (var shardAndRouting : shardToRouting.entrySet()) {
765+
var collectedShards = new ArrayList<>();
766+
reshardingRoutingWithShardInSplit.collectSearchShards(shardAndRouting.getValue(), collectedShards::add);
767+
assertEquals(1, collectedShards.size());
768+
// Rerouting is no longer in effect since resharding metadata has SPLIT state for this shard
769+
assertEquals(shardAndRouting.getKey(), collectedShards.get(0));
770+
}
771+
}
772+
773+
public void testCollectSearchShardsPartitionedWithResharding() throws IOException {
774+
int preReshardShards = 4;
775+
int postReshardShards = 8;
776+
var initialRouting = IndexRouting.fromIndexMetadata(
777+
IndexMetadata.builder("test")
778+
.settings(indexSettings(IndexVersion.current(), 8, 1))
779+
.numberOfShards(postReshardShards)
780+
.numberOfReplicas(1)
781+
.routingPartitionSize(2)
782+
.build()
783+
);
784+
785+
var shardToRouting = new TreeMap<Integer, String>();
786+
do {
787+
var routing = randomAlphaOfLength(5);
788+
var shard = initialRouting.indexShard("dummy", routing, null, null);
789+
if (shardToRouting.containsKey(shard) == false) {
790+
shardToRouting.put(shard, routing);
791+
}
792+
} while (shardToRouting.size() < postReshardShards);
793+
794+
var initialReshardingRouting = IndexRouting.fromIndexMetadata(
795+
IndexMetadata.builder("test")
796+
.settings(indexSettings(IndexVersion.current(), postReshardShards, 1))
797+
.numberOfShards(postReshardShards)
798+
.numberOfReplicas(1)
799+
.routingPartitionSize(2)
800+
.reshardingMetadata(IndexReshardingMetadata.newSplitByMultiple(preReshardShards, 2))
801+
.build()
802+
);
803+
804+
// Since we increased number of shard by two this is the adjustment logic.
805+
Function<Integer, Integer> adjustForResharding = i -> i < preReshardShards
806+
? i
807+
: IndexReshardingMetadata.newSplitByMultiple(preReshardShards, 2).getSplit().sourceShard(i);
808+
809+
// Rerouting is in effect due to resharding metadata having a shard in CLONE state.
810+
// We won't see shard 4 and above.
811+
for (var shardAndRouting : shardToRouting.entrySet()) {
812+
var collectedShards = new ArrayList<Integer>();
813+
initialReshardingRouting.collectSearchShards(shardAndRouting.getValue(), collectedShards::add);
814+
assertEquals(2, collectedShards.size());
815+
816+
var expected = new ArrayList<Integer>();
817+
expected.add(adjustForResharding.apply(shardAndRouting.getKey()));
818+
expected.add(adjustForResharding.apply(shardAndRouting.getKey() + 1));
819+
820+
assertEquals(expected, collectedShards);
821+
}
822+
823+
var reshardingRoutingWithShardInHandoff = IndexRouting.fromIndexMetadata(
824+
IndexMetadata.builder("test")
825+
.settings(indexSettings(IndexVersion.current(), postReshardShards, 1))
826+
.numberOfShards(postReshardShards)
827+
.numberOfReplicas(1)
828+
.routingPartitionSize(2)
829+
.reshardingMetadata(
830+
IndexReshardingMetadata.newSplitByMultiple(preReshardShards, 2)
831+
.transitionSplitTargetToNewState(new ShardId("test", "na", 4), IndexReshardingState.Split.TargetShardState.HANDOFF)
832+
)
833+
.build()
834+
);
835+
836+
for (var shardAndRouting : shardToRouting.entrySet()) {
837+
var collectedShards = new ArrayList<Integer>();
838+
reshardingRoutingWithShardInHandoff.collectSearchShards(shardAndRouting.getValue(), collectedShards::add);
839+
assertEquals(2, collectedShards.size());
840+
// Rerouting is in effect due to resharding metadata having a shard in CLONE state.
841+
// We won't see shard 4 and above.
842+
843+
var expected = new ArrayList<Integer>();
844+
expected.add(adjustForResharding.apply(shardAndRouting.getKey()));
845+
expected.add(adjustForResharding.apply(shardAndRouting.getKey() + 1));
846+
847+
assertEquals(expected, collectedShards);
848+
}
849+
850+
var reshardingRoutingWithShardInSplit = IndexRouting.fromIndexMetadata(
851+
IndexMetadata.builder("test")
852+
.settings(indexSettings(IndexVersion.current(), postReshardShards, 1))
853+
.numberOfShards(postReshardShards)
854+
.numberOfReplicas(1)
855+
.routingPartitionSize(2)
856+
.reshardingMetadata(
857+
IndexReshardingMetadata.newSplitByMultiple(preReshardShards, 2)
858+
.transitionSplitTargetToNewState(new ShardId("test", "na", 4), IndexReshardingState.Split.TargetShardState.HANDOFF)
859+
.transitionSplitTargetToNewState(new ShardId("test", "na", 4), IndexReshardingState.Split.TargetShardState.SPLIT)
860+
)
861+
.build()
862+
);
863+
864+
// Shard 4 can now be included in routing too based on the resharding metadata, adjust the rule.
865+
adjustForResharding = i -> i < 5 ? i : IndexReshardingMetadata.newSplitByMultiple(preReshardShards, 2).getSplit().sourceShard(i);
866+
867+
for (var shardAndRouting : shardToRouting.entrySet()) {
868+
var collectedShards = new ArrayList<Integer>();
869+
reshardingRoutingWithShardInSplit.collectSearchShards(shardAndRouting.getValue(), collectedShards::add);
870+
assertEquals(2, collectedShards.size());
871+
872+
var expected = new ArrayList<Integer>();
873+
expected.add(adjustForResharding.apply(shardAndRouting.getKey()));
874+
expected.add(adjustForResharding.apply(shardAndRouting.getKey() + 1));
875+
876+
assertEquals(expected, collectedShards);
877+
}
878+
}
879+
690880
/**
691881
* Extract a shardId from an {@link IndexRouting} that extracts routingusing a randomly
692882
* chosen method. All of the random methods <strong>should</strong> return the

0 commit comments

Comments
 (0)