diff --git a/server/src/main/java/org/elasticsearch/cluster/routing/IndexRouting.java b/server/src/main/java/org/elasticsearch/cluster/routing/IndexRouting.java index 3df0d2d65b657..050181802af8d 100644 --- a/server/src/main/java/org/elasticsearch/cluster/routing/IndexRouting.java +++ b/server/src/main/java/org/elasticsearch/cluster/routing/IndexRouting.java @@ -15,6 +15,8 @@ import org.elasticsearch.action.RoutingMissingException; import org.elasticsearch.action.index.IndexRequest; import org.elasticsearch.cluster.metadata.IndexMetadata; +import org.elasticsearch.cluster.metadata.IndexReshardingMetadata; +import org.elasticsearch.cluster.metadata.IndexReshardingState; import org.elasticsearch.cluster.metadata.MappingMetadata; import org.elasticsearch.common.ParsingException; import org.elasticsearch.common.Strings; @@ -73,11 +75,13 @@ public static IndexRouting fromIndexMetadata(IndexMetadata metadata) { protected final String indexName; private final int routingNumShards; private final int routingFactor; + private final IndexReshardingMetadata indexReshardingMetadata; private IndexRouting(IndexMetadata metadata) { this.indexName = metadata.getIndex().getName(); this.routingNumShards = metadata.getRoutingNumShards(); this.routingFactor = metadata.getRoutingFactor(); + this.indexReshardingMetadata = metadata.getReshardingMetadata(); } /** @@ -149,6 +153,23 @@ private static int effectiveRoutingToHash(String effectiveRouting) { */ public void checkIndexSplitAllowed() {} + /** + * If this index is in the process of resharding, and the shard to which this request is being routed, + * is a target shard that is not yet in HANDOFF state, then route it to the source shard. + * @param shardId shardId to which the current document is routed based on hashing + * @return Updated shardId + */ + protected final int rerouteIfResharding(int shardId) { + if (indexReshardingMetadata != null && indexReshardingMetadata.getSplit().isTargetShard(shardId)) { + assert indexReshardingMetadata.isSplit() : "Index resharding state is not a split"; + if (indexReshardingMetadata.getSplit() + .targetStateAtLeast(shardId, IndexReshardingState.Split.TargetShardState.HANDOFF) == false) { + return indexReshardingMetadata.getSplit().sourceShard(shardId); + } + } + return shardId; + } + private abstract static class IdAndRoutingOnly extends IndexRouting { private final boolean routingRequired; private final IndexVersion creationVersion; @@ -195,19 +216,22 @@ public int indexShard(String id, @Nullable String routing, XContentType sourceTy throw new IllegalStateException("id is required and should have been set by process"); } checkRoutingRequired(id, routing); - return shardId(id, routing); + int shardId = shardId(id, routing); + return rerouteIfResharding(shardId); } @Override public int updateShard(String id, @Nullable String routing) { checkRoutingRequired(id, routing); - return shardId(id, routing); + int shardId = shardId(id, routing); + return rerouteIfResharding(shardId); } @Override public int deleteShard(String id, @Nullable String routing) { checkRoutingRequired(id, routing); - return shardId(id, routing); + int shardId = shardId(id, routing); + return rerouteIfResharding(shardId); } @Override @@ -314,7 +338,8 @@ public int indexShard(String id, @Nullable String routing, XContentType sourceTy assert Transports.assertNotTransportThread("parsing the _source can get slow"); checkNoRouting(routing); hash = hashSource(sourceType, source).buildHash(IndexRouting.ExtractFromSource::defaultOnEmpty); - return hashToShardId(hash); + int shardId = hashToShardId(hash); + return (rerouteIfResharding(shardId)); } public String createId(XContentType sourceType, BytesReference source, byte[] suffix) { @@ -454,13 +479,15 @@ public int updateShard(String id, @Nullable String routing) { @Override public int deleteShard(String id, @Nullable String routing) { checkNoRouting(routing); - return idToHash(id); + int shardId = idToHash(id); + return (rerouteIfResharding(shardId)); } @Override public int getShard(String id, @Nullable String routing) { checkNoRouting(routing); - return idToHash(id); + int shardId = idToHash(id); + return (rerouteIfResharding(shardId)); } private void checkNoRouting(@Nullable String routing) {