Skip to content

Commit aefb309

Browse files
authored
Fail realtime get request if it is stale due to resharding (elastic#141604)
During resharding, a get request may become stale if the shard that owns the requested document changes between the time that the request is formed and when it is processed. We can detect whether this has happened by comparing the current routing of the document based on the routing table to the shard that has actually processed the document. If there is a mismatch, we fail the request. Later this failure should be handled by internally retrying the request with a fresh route, but for correctness we do need to start by rejecting the request if it has been routed to the wrong shard.
1 parent f959a06 commit aefb309

File tree

6 files changed

+76
-32
lines changed

6 files changed

+76
-32
lines changed

server/src/main/java/org/elasticsearch/action/get/TransportGetFromTranslogAction.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -72,7 +72,8 @@ protected void doExecute(Task task, Request request, ActionListener<Response> li
7272
getRequest.version(),
7373
getRequest.versionType(),
7474
getRequest.fetchSourceContext(),
75-
getRequest.isForceSyntheticSource()
75+
getRequest.isForceSyntheticSource(),
76+
getRequest.getSplitShardCountSummary()
7677
);
7778
long segmentGeneration = -1;
7879
if (result == null) {

server/src/main/java/org/elasticsearch/action/get/TransportShardMultiGetFomTranslogAction.java

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,7 @@
1616
import org.elasticsearch.action.support.ActionFilters;
1717
import org.elasticsearch.action.support.HandledTransportAction;
1818
import org.elasticsearch.action.support.TransportActions;
19+
import org.elasticsearch.cluster.routing.SplitShardCountSummary;
1920
import org.elasticsearch.common.Strings;
2021
import org.elasticsearch.common.io.stream.StreamInput;
2122
import org.elasticsearch.common.io.stream.StreamOutput;
@@ -75,7 +76,8 @@ protected void doExecute(Task task, Request request, ActionListener<Response> li
7576
item.version(),
7677
item.versionType(),
7778
item.fetchSourceContext(),
78-
multiGetShardRequest.isForceSyntheticSource()
79+
multiGetShardRequest.isForceSyntheticSource(),
80+
SplitShardCountSummary.UNSET
7981
);
8082
GetResponse getResponse = null;
8183
if (result == null) {

server/src/main/java/org/elasticsearch/cluster/routing/IndexRouting.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -276,7 +276,8 @@ public int deleteShard(String id, @Nullable String routing) {
276276
@Override
277277
public int getShard(String id, @Nullable String routing) {
278278
checkRoutingRequired(id, routing);
279-
return shardId(id, routing);
279+
int shardId = shardId(id, routing);
280+
return rerouteSearchIfResharding(shardId);
280281
}
281282

282283
private void checkRoutingRequired(String id, @Nullable String routing) {

server/src/main/java/org/elasticsearch/index/get/ShardGetService.java

Lines changed: 22 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -222,36 +222,35 @@ private GetResult doGet(
222222
} else {
223223
missingMetric.inc(System.nanoTime() - now);
224224
}
225-
if (getResult == null || getResult.isExists() == false) {
226-
// during resharding, a coordinating node may route a get request to a shard that is the source of a split
227-
// before the target shard has taken over that document, but by the time the request is processed on the
228-
// source shard, handoff has occurred. If the get succeeds, this is fine - we block refreshes during this
229-
// period so although indexing may have occured on the target shard, the source shard's copy is still valid
230-
// because refresh hasn't happened. However, if the source shard doesn't have the document in this case, it
231-
// may be that it is because the shard has deleted unowned documents after the split. Normally this shouldn't
232-
// occur because we will delay deleting those documents for some grace period, but if it does happen we should
233-
// fail the request rather than possibly incorrectly reporting that the document doesn't exist, when it may
234-
// in fact be present on a different shard.
235-
// We can defer this check until we know whether the response is missing, so that the common case doesn't
236-
// have to resolve the current split shard summary. This way also means that a race between the get and handoff state
237-
// can only cause us to throw an exception unnecessarily, rather than incorrectly returning a missing document.
238-
final var indexMetadata = mapperService.getIndexSettings().getIndexMetadata();
239-
final var currentSummary = SplitShardCountSummary.forSearch(indexMetadata, shardId().getId());
225+
if (getResult == null || getResult.isExists() == false || realtime) {
240226
if (splitShardCountSummary.equals(SplitShardCountSummary.UNSET)) {
241227
// TODO, this should only be possible temporarily, until we've ensured that all callers provide a valid summary.
242228
return getResult;
243229
}
230+
// during resharding, a coordinating node may route a get request to a shard that is the source of a split
231+
// before the target shard has taken over that document, but by the time the request is processed on the
232+
// source shard, handoff has occurred. If a non-realtime get succeeds, this is fine - we block refreshes during this
233+
// period so although the document may have been updated on the target shard, the source shard's copy is still valid
234+
// because refresh hasn't happened. However, if the get is a realtime get, or if the source shard doesn't have the
235+
// document (perhaps because the shard has deleted unowned documents after the split) then the answer may be
236+
// incorrect. So, if the request's split shard count summary indicates that routing has changed since the request
237+
// was formulated, we double check that the requested document still maps to this shard.
238+
final var indexMetadata = mapperService.getIndexSettings().getIndexMetadata();
239+
// For realtime get, correct results depend on index routing (the new shard may accept updates at handoff) and consult
240+
// the index shard's translog.
241+
// Regular search happens on the search shard and uses search routing.
242+
final var currentSummary = realtime
243+
? SplitShardCountSummary.forIndexing(indexMetadata, shardId().getId())
244+
: SplitShardCountSummary.forSearch(indexMetadata, shardId().getId());
244245
if (splitShardCountSummary.compareTo(currentSummary) >= 0) {
245246
// coordinator is current, so response is valid
246247
return getResult;
247248
}
248249
// Otherwise, recompute the route of the requested document based on current metadata and fail the request if it
249-
// doesn't map to this shard anymore, since delete unowned may have removed it by this point. This is conservative:
250-
// the document may genuinely not have existed, but unless we can be certain that delete-unowned hasn't run yet
251-
// (which is difficult, because the index shard may see the target move to done before the search shard) it is safer
252-
// to fail the request.
250+
// doesn't map to this shard anymore.
253251
final var indexRouting = IndexRouting.fromIndexMetadata(indexMetadata);
254-
final var docShard = indexRouting.getShard(id, routing);
252+
// see currentSummary above
253+
final var docShard = realtime ? indexRouting.updateShard(id, routing) : indexRouting.getShard(id, routing);
255254
if (docShard != shardId().getId()) {
256255
// XXX we may want a more specific exception type here
257256
throw new ElasticsearchStatusException("stale get request for document [" + id + "]", RestStatus.SERVICE_UNAVAILABLE);
@@ -272,7 +271,8 @@ public GetResult getFromTranslog(
272271
long version,
273272
VersionType versionType,
274273
FetchSourceContext fetchSourceContext,
275-
boolean forceSyntheticSource
274+
boolean forceSyntheticSource,
275+
SplitShardCountSummary splitShardCountSummary
276276
) throws IOException {
277277
return doGet(
278278
id,
@@ -285,7 +285,7 @@ public GetResult getFromTranslog(
285285
UNASSIGNED_PRIMARY_TERM,
286286
fetchSourceContext,
287287
forceSyntheticSource,
288-
SplitShardCountSummary.UNSET,
288+
splitShardCountSummary,
289289
indexShard::getFromTranslog
290290
);
291291
}

server/src/test/java/org/elasticsearch/cluster/routing/IndexRoutingTests.java

Lines changed: 5 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -850,7 +850,7 @@ private static IndexRouting getSplitRouting(IndexMetadata startingMetadata) {
850850
);
851851
}
852852

853-
public void testCollectSearchShardsUnpartitionedWithResharding() throws IOException {
853+
public void testCollectSearchShardsUnpartitionedAndGetWithResharding() {
854854
int shards = 1;
855855
int newShardCount = 2;
856856
var initialRouting = IndexRouting.fromIndexMetadata(
@@ -885,6 +885,7 @@ public void testCollectSearchShardsUnpartitionedWithResharding() throws IOExcept
885885
assertEquals(1, collectedShards.size());
886886
// Rerouting is in effect due to resharding metadata having a shard in CLONE state.
887887
assertEquals(0, collectedShards.get(0));
888+
assertEquals(0, initialReshardingRouting.getShard(shardAndRouting.getValue(), null));
888889
}
889890

890891
var reshardingRoutingWithShardInHandoff = IndexRouting.fromIndexMetadata(
@@ -903,8 +904,9 @@ public void testCollectSearchShardsUnpartitionedWithResharding() throws IOExcept
903904
var collectedShards = new ArrayList<>();
904905
reshardingRoutingWithShardInHandoff.collectSearchShards(shardAndRouting.getValue(), collectedShards::add);
905906
assertEquals(1, collectedShards.size());
906-
// Rerouting is in effect due to resharding metadata having a shard in CLONE state.
907+
// Rerouting is in effect due to resharding metadata having a shard in HANDOFF state.
907908
assertEquals(0, collectedShards.get(0));
909+
assertEquals(0, reshardingRoutingWithShardInHandoff.getShard(shardAndRouting.getValue(), null));
908910
}
909911

910912
var reshardingRoutingWithShardInSplit = IndexRouting.fromIndexMetadata(
@@ -926,6 +928,7 @@ public void testCollectSearchShardsUnpartitionedWithResharding() throws IOExcept
926928
assertEquals(1, collectedShards.size());
927929
// Rerouting is no longer in effect since resharding metadata has SPLIT state for this shard
928930
assertEquals(shardAndRouting.getKey(), collectedShards.get(0));
931+
assertEquals(shardAndRouting.getKey().intValue(), reshardingRoutingWithShardInSplit.getShard(shardAndRouting.getValue(), null));
929932
}
930933
}
931934

server/src/test/java/org/elasticsearch/index/shard/ShardGetServiceTests.java

Lines changed: 42 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -363,7 +363,16 @@ public void testGetFromTranslog() throws IOException {
363363

364364
// Issue a get that would enforce safe access mode and switches the maps from unsafe to safe
365365
var getResult = primary.getService()
366-
.getFromTranslog("2", new String[] { "foo" }, true, 1, VersionType.INTERNAL, FetchSourceContext.FETCH_SOURCE, false);
366+
.getFromTranslog(
367+
"2",
368+
new String[] { "foo" },
369+
true,
370+
1,
371+
VersionType.INTERNAL,
372+
FetchSourceContext.FETCH_SOURCE,
373+
false,
374+
SplitShardCountSummary.UNSET
375+
);
367376
assertNull(getResult);
368377
var lastUnsafeGeneration = engine.getLastUnsafeSegmentGenerationForGets();
369378
// last unsafe generation is set to last committed gen after the refresh triggered by realtime get
@@ -387,7 +396,8 @@ public void testGetFromTranslog() throws IOException {
387396
1,
388397
VersionType.INTERNAL,
389398
FetchSourceContext.FETCH_SOURCE,
390-
false
399+
false,
400+
SplitShardCountSummary.UNSET
391401
);
392402
assertNull(getResult);
393403
// But normal get would still work!
@@ -413,11 +423,29 @@ public void testGetFromTranslog() throws IOException {
413423
indexDoc(primary, "1", "{\"foo\" : \"baz\"}", XContentType.JSON, "foobar");
414424
// The first get in safe mode, would trigger a refresh, since we need to start tracking translog locations in the live version map
415425
getResult = primary.getService()
416-
.getFromTranslog("1", new String[] { "foo" }, true, 1, VersionType.INTERNAL, FetchSourceContext.FETCH_SOURCE, false);
426+
.getFromTranslog(
427+
"1",
428+
new String[] { "foo" },
429+
true,
430+
1,
431+
VersionType.INTERNAL,
432+
FetchSourceContext.FETCH_SOURCE,
433+
false,
434+
SplitShardCountSummary.UNSET
435+
);
417436
assertTrue(getResult.isExists());
418437
assertEquals(engine.getLastUnsafeSegmentGenerationForGets(), lastUnsafeGeneration);
419438
getResult = primary.getService()
420-
.getFromTranslog("2", new String[] { "foo" }, true, 1, VersionType.INTERNAL, FetchSourceContext.FETCH_SOURCE, false);
439+
.getFromTranslog(
440+
"2",
441+
new String[] { "foo" },
442+
true,
443+
1,
444+
VersionType.INTERNAL,
445+
FetchSourceContext.FETCH_SOURCE,
446+
false,
447+
SplitShardCountSummary.UNSET
448+
);
421449
assertNull(getResult);
422450
assertEquals(engine.getLastUnsafeSegmentGenerationForGets(), lastUnsafeGeneration);
423451

@@ -435,7 +463,16 @@ public void testGetFromTranslog() throws IOException {
435463
assertFalse(LiveVersionMapTestUtils.isSafeAccessRequired(map));
436464
assertTrue(LiveVersionMapTestUtils.isUnsafe(map));
437465
getResult = primary.getService()
438-
.getFromTranslog("2", new String[] { "foo" }, true, 1, VersionType.INTERNAL, FetchSourceContext.FETCH_SOURCE, false);
466+
.getFromTranslog(
467+
"2",
468+
new String[] { "foo" },
469+
true,
470+
1,
471+
VersionType.INTERNAL,
472+
FetchSourceContext.FETCH_SOURCE,
473+
false,
474+
SplitShardCountSummary.UNSET
475+
);
439476
assertNull(getResult);
440477
var lastUnsafeGeneration2 = engine.getLastUnsafeSegmentGenerationForGets();
441478
assertTrue(lastUnsafeGeneration2 > lastUnsafeGeneration);

0 commit comments

Comments
 (0)