|
15 | 15 | import org.elasticsearch.action.RoutingMissingException; |
16 | 16 | import org.elasticsearch.action.index.IndexRequest; |
17 | 17 | import org.elasticsearch.cluster.metadata.IndexMetadata; |
| 18 | +import org.elasticsearch.cluster.metadata.IndexReshardingMetadata; |
| 19 | +import org.elasticsearch.cluster.metadata.IndexReshardingState; |
18 | 20 | import org.elasticsearch.cluster.metadata.MappingMetadata; |
19 | 21 | import org.elasticsearch.common.ParsingException; |
20 | 22 | import org.elasticsearch.common.Strings; |
@@ -73,11 +75,13 @@ public static IndexRouting fromIndexMetadata(IndexMetadata metadata) { |
73 | 75 | protected final String indexName; |
74 | 76 | private final int routingNumShards; |
75 | 77 | private final int routingFactor; |
| 78 | + private final IndexReshardingMetadata indexReshardingMetadata; |
76 | 79 |
|
77 | 80 | private IndexRouting(IndexMetadata metadata) { |
78 | 81 | this.indexName = metadata.getIndex().getName(); |
79 | 82 | this.routingNumShards = metadata.getRoutingNumShards(); |
80 | 83 | this.routingFactor = metadata.getRoutingFactor(); |
| 84 | + this.indexReshardingMetadata = metadata.getReshardingMetadata(); |
81 | 85 | } |
82 | 86 |
|
83 | 87 | /** |
@@ -149,6 +153,23 @@ private static int effectiveRoutingToHash(String effectiveRouting) { |
149 | 153 | */ |
150 | 154 | public void checkIndexSplitAllowed() {} |
151 | 155 |
|
| 156 | + /** |
| 157 | + * If this index is in the process of resharding, and the shard to which this request is being routed, |
| 158 | + * is a target shard that is not yet in HANDOFF state, then route it to the source shard. |
| 159 | + * @param shardId shardId to which the current document is routed based on hashing |
| 160 | + * @return Updated shardId |
| 161 | + */ |
| 162 | + protected final int rerouteIfResharding(int shardId) { |
| 163 | + if (indexReshardingMetadata != null && indexReshardingMetadata.getSplit().isTargetShard(shardId)) { |
| 164 | + assert indexReshardingMetadata.isSplit() : "Index resharding state is not a split"; |
| 165 | + if (indexReshardingMetadata.getSplit() |
| 166 | + .targetStateAtLeast(shardId, IndexReshardingState.Split.TargetShardState.HANDOFF) == false) { |
| 167 | + return indexReshardingMetadata.getSplit().sourceShard(shardId); |
| 168 | + } |
| 169 | + } |
| 170 | + return shardId; |
| 171 | + } |
| 172 | + |
152 | 173 | private abstract static class IdAndRoutingOnly extends IndexRouting { |
153 | 174 | private final boolean routingRequired; |
154 | 175 | private final IndexVersion creationVersion; |
@@ -195,19 +216,22 @@ public int indexShard(String id, @Nullable String routing, XContentType sourceTy |
195 | 216 | throw new IllegalStateException("id is required and should have been set by process"); |
196 | 217 | } |
197 | 218 | checkRoutingRequired(id, routing); |
198 | | - return shardId(id, routing); |
| 219 | + int shardId = shardId(id, routing); |
| 220 | + return rerouteIfResharding(shardId); |
199 | 221 | } |
200 | 222 |
|
201 | 223 | @Override |
202 | 224 | public int updateShard(String id, @Nullable String routing) { |
203 | 225 | checkRoutingRequired(id, routing); |
204 | | - return shardId(id, routing); |
| 226 | + int shardId = shardId(id, routing); |
| 227 | + return rerouteIfResharding(shardId); |
205 | 228 | } |
206 | 229 |
|
207 | 230 | @Override |
208 | 231 | public int deleteShard(String id, @Nullable String routing) { |
209 | 232 | checkRoutingRequired(id, routing); |
210 | | - return shardId(id, routing); |
| 233 | + int shardId = shardId(id, routing); |
| 234 | + return rerouteIfResharding(shardId); |
211 | 235 | } |
212 | 236 |
|
213 | 237 | @Override |
@@ -314,7 +338,8 @@ public int indexShard(String id, @Nullable String routing, XContentType sourceTy |
314 | 338 | assert Transports.assertNotTransportThread("parsing the _source can get slow"); |
315 | 339 | checkNoRouting(routing); |
316 | 340 | hash = hashSource(sourceType, source).buildHash(IndexRouting.ExtractFromSource::defaultOnEmpty); |
317 | | - return hashToShardId(hash); |
| 341 | + int shardId = hashToShardId(hash); |
| 342 | + return (rerouteIfResharding(shardId)); |
318 | 343 | } |
319 | 344 |
|
320 | 345 | public String createId(XContentType sourceType, BytesReference source, byte[] suffix) { |
@@ -454,13 +479,15 @@ public int updateShard(String id, @Nullable String routing) { |
454 | 479 | @Override |
455 | 480 | public int deleteShard(String id, @Nullable String routing) { |
456 | 481 | checkNoRouting(routing); |
457 | | - return idToHash(id); |
| 482 | + int shardId = idToHash(id); |
| 483 | + return (rerouteIfResharding(shardId)); |
458 | 484 | } |
459 | 485 |
|
460 | 486 | @Override |
461 | 487 | public int getShard(String id, @Nullable String routing) { |
462 | 488 | checkNoRouting(routing); |
463 | | - return idToHash(id); |
| 489 | + int shardId = idToHash(id); |
| 490 | + return (rerouteIfResharding(shardId)); |
464 | 491 | } |
465 | 492 |
|
466 | 493 | private void checkNoRouting(@Nullable String routing) { |
|
0 commit comments