Skip to content

Commit 543b612

Browse files
authored
Handle Stale Requests (elastic#141301)
Handles requests (index/flush/refresh) that are "very" stale by rejecting them. This means requests that are more than 1 reshard operation behind. This includes stale requests that arrive after the resharding metadata is gone (even if a new split has not started).
1 parent 61f0762 commit 543b612

File tree

13 files changed

+153
-43
lines changed

13 files changed

+153
-43
lines changed

server/src/main/java/org/elasticsearch/ElasticsearchException.java

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,7 @@
1616
import org.apache.lucene.store.LockObtainFailedException;
1717
import org.elasticsearch.action.bulk.IndexDocFailureStoreStatus;
1818
import org.elasticsearch.action.support.replication.ReplicationOperation;
19+
import org.elasticsearch.action.support.replication.StaleRequestException;
1920
import org.elasticsearch.cluster.RemoteException;
2021
import org.elasticsearch.cluster.action.shard.ShardStateAction;
2122
import org.elasticsearch.common.io.stream.NotSerializableExceptionWrapper;
@@ -78,6 +79,7 @@
7879
import static java.util.Collections.emptyMap;
7980
import static java.util.Collections.singletonMap;
8081
import static java.util.Collections.unmodifiableMap;
82+
import static org.elasticsearch.action.support.replication.ReplicationSplitHelper.STALE_REQUEST_EXCEPTION_VERSION;
8183
import static org.elasticsearch.cluster.metadata.IndexMetadata.INDEX_UUID_NA_VALUE;
8284
import static org.elasticsearch.cluster.metadata.MetadataCreateIndexService.INDEX_LIMIT_EXCEEDED_EXCEPTION_VERSION;
8385
import static org.elasticsearch.common.xcontent.XContentParserUtils.ensureExpectedToken;
@@ -2049,7 +2051,8 @@ private enum ElasticsearchExceptionHandle {
20492051
IndexLimitExceededException::new,
20502052
186,
20512053
INDEX_LIMIT_EXCEEDED_EXCEPTION_VERSION
2052-
);
2054+
),
2055+
STALE_REQUEST_EXCEPTION(StaleRequestException.class, StaleRequestException::new, 187, STALE_REQUEST_EXCEPTION_VERSION);
20532056

20542057
final Class<? extends ElasticsearchException> exceptionClass;
20552058
final CheckedFunction<StreamInput, ? extends ElasticsearchException, IOException> constructor;

server/src/main/java/org/elasticsearch/action/admin/indices/flush/TransportShardFlushAction.java

Lines changed: 15 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -12,10 +12,11 @@
1212
import org.elasticsearch.action.ActionListener;
1313
import org.elasticsearch.action.ActionType;
1414
import org.elasticsearch.action.support.ActionFilters;
15-
import org.elasticsearch.action.support.replication.ReplicationRequestSplitHelper;
15+
import org.elasticsearch.action.support.replication.BroadcastRequestSplitHelper;
1616
import org.elasticsearch.action.support.replication.ReplicationResponse;
1717
import org.elasticsearch.action.support.replication.TransportReplicationAction;
1818
import org.elasticsearch.cluster.action.shard.ShardStateAction;
19+
import org.elasticsearch.cluster.metadata.ProjectMetadata;
1920
import org.elasticsearch.cluster.project.ProjectResolver;
2021
import org.elasticsearch.cluster.service.ClusterService;
2122
import org.elasticsearch.common.io.stream.StreamInput;
@@ -85,14 +86,20 @@ protected void shardOperationOnPrimary(
8586
}));
8687
}
8788

88-
// We are here because there was a mismatch between the SplitShardCountSummary in the request
89-
// and that on the primary shard node. We assume that the request is exactly 1 reshard split behind
90-
// the current state.
89+
/**
90+
* We are here because there was mismatch between the SplitShardCountSummary in the request
91+
* and that on the primary shard node. In other words, the primary shard has moved ahead due to a split reshard
92+
* operation after the request was created by the coordinator.
93+
* We can assume that the request is exactly 1 reshard split behind the current state of the primary shard.
94+
* This is because requests that are more than 1 reshard operation behind are rejected in
95+
* {@link org.elasticsearch.action.support.replication.ReplicationSplitHelper
96+
* #needsSplitCoordination(org.apache.logging.log4j.Logger, ReplicationRequest, IndexMetadata)}
97+
*/
9198
@Override
92-
protected Map<ShardId, ShardFlushRequest> splitRequestOnPrimary(ShardFlushRequest request) {
93-
return ReplicationRequestSplitHelper.splitRequest(
99+
protected Map<ShardId, ShardFlushRequest> splitRequestOnPrimary(ShardFlushRequest request, ProjectMetadata project) {
100+
return BroadcastRequestSplitHelper.splitRequest(
94101
request,
95-
projectResolver.getProjectMetadata(clusterService.state()),
102+
project,
96103
(targetShard, shardCountSummary) -> new ShardFlushRequest(request.getRequest(), targetShard, shardCountSummary)
97104
);
98105
}
@@ -103,7 +110,7 @@ protected Tuple<ReplicationResponse, Exception> combineSplitResponses(
103110
Map<ShardId, ShardFlushRequest> splitRequests,
104111
Map<ShardId, Tuple<ReplicationResponse, Exception>> responses
105112
) {
106-
return ReplicationRequestSplitHelper.combineSplitResponses(originalRequest, splitRequests, responses);
113+
return BroadcastRequestSplitHelper.combineSplitResponses(originalRequest, splitRequests, responses);
107114
}
108115

109116
@Override

server/src/main/java/org/elasticsearch/action/admin/indices/refresh/TransportShardRefreshAction.java

Lines changed: 15 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -15,11 +15,12 @@
1515
import org.elasticsearch.action.ActionType;
1616
import org.elasticsearch.action.support.ActionFilters;
1717
import org.elasticsearch.action.support.replication.BasicReplicationRequest;
18+
import org.elasticsearch.action.support.replication.BroadcastRequestSplitHelper;
1819
import org.elasticsearch.action.support.replication.ReplicationOperation;
19-
import org.elasticsearch.action.support.replication.ReplicationRequestSplitHelper;
2020
import org.elasticsearch.action.support.replication.ReplicationResponse;
2121
import org.elasticsearch.action.support.replication.TransportReplicationAction;
2222
import org.elasticsearch.cluster.action.shard.ShardStateAction;
23+
import org.elasticsearch.cluster.metadata.ProjectMetadata;
2324
import org.elasticsearch.cluster.project.ProjectResolver;
2425
import org.elasticsearch.cluster.routing.IndexShardRoutingTable;
2526
import org.elasticsearch.cluster.service.ClusterService;
@@ -112,14 +113,20 @@ protected void shardOperationOnPrimary(
112113
}));
113114
}
114115

115-
// We are here because there was mismatch between the SplitShardCountSummary in the request
116-
// and that on the primary shard node. We assume that the request is exactly 1 reshard split behind
117-
// the current state.
116+
/**
117+
* We are here because there was mismatch between the SplitShardCountSummary in the request
118+
* and that on the primary shard node. In other words, the primary shard has moved ahead due to a split reshard
119+
* operation after the request was created by the coordinator.
120+
* We can assume that the request is exactly 1 reshard split behind the current state of the primary shard.
121+
* This is because requests that are more than 1 reshard operation behind are rejected in
122+
* {@link org.elasticsearch.action.support.replication.ReplicationSplitHelper
123+
* #needsSplitCoordination(org.apache.logging.log4j.Logger, ReplicationRequest, IndexMetadata)}
124+
*/
118125
@Override
119-
protected Map<ShardId, BasicReplicationRequest> splitRequestOnPrimary(BasicReplicationRequest request) {
120-
return ReplicationRequestSplitHelper.splitRequest(
126+
protected Map<ShardId, BasicReplicationRequest> splitRequestOnPrimary(BasicReplicationRequest request, ProjectMetadata project) {
127+
return BroadcastRequestSplitHelper.splitRequest(
121128
request,
122-
projectResolver.getProjectMetadata(clusterService.state()),
129+
project,
123130
(targetShard, shardCountSummary) -> new BasicReplicationRequest(targetShard, shardCountSummary)
124131
);
125132
}
@@ -130,7 +137,7 @@ protected Tuple<ReplicationResponse, Exception> combineSplitResponses(
130137
Map<ShardId, BasicReplicationRequest> splitRequests,
131138
Map<ShardId, Tuple<ReplicationResponse, Exception>> responses
132139
) {
133-
return ReplicationRequestSplitHelper.combineSplitResponses(originalRequest, splitRequests, responses);
140+
return BroadcastRequestSplitHelper.combineSplitResponses(originalRequest, splitRequests, responses);
134141
}
135142

136143
@Override

server/src/main/java/org/elasticsearch/action/bulk/TransportShardBulkAction.java

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -33,6 +33,7 @@
3333
import org.elasticsearch.cluster.ClusterStateObserver;
3434
import org.elasticsearch.cluster.action.index.MappingUpdatedAction;
3535
import org.elasticsearch.cluster.action.shard.ShardStateAction;
36+
import org.elasticsearch.cluster.metadata.ProjectMetadata;
3637
import org.elasticsearch.cluster.project.ProjectResolver;
3738
import org.elasticsearch.cluster.service.ClusterService;
3839
import org.elasticsearch.common.bytes.BytesReference;
@@ -165,8 +166,8 @@ protected void shardOperationOnPrimary(
165166
}
166167

167168
@Override
168-
protected Map<ShardId, BulkShardRequest> splitRequestOnPrimary(BulkShardRequest request) {
169-
return ShardBulkSplitHelper.splitRequests(request, projectResolver.getProjectMetadata(clusterService.state()));
169+
protected Map<ShardId, BulkShardRequest> splitRequestOnPrimary(BulkShardRequest request, ProjectMetadata project) {
170+
return ShardBulkSplitHelper.splitRequests(request, project);
170171
}
171172

172173
@Override

server/src/main/java/org/elasticsearch/action/support/replication/ReplicationRequestSplitHelper.java renamed to server/src/main/java/org/elasticsearch/action/support/replication/BroadcastRequestSplitHelper.java

Lines changed: 18 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,7 @@
99

1010
package org.elasticsearch.action.support.replication;
1111

12+
import org.apache.logging.log4j.Logger;
1213
import org.elasticsearch.cluster.metadata.IndexMetadata;
1314
import org.elasticsearch.cluster.metadata.ProjectMetadata;
1415
import org.elasticsearch.cluster.routing.SplitShardCountSummary;
@@ -22,8 +23,17 @@
2223
import java.util.Map;
2324
import java.util.function.BiFunction;
2425

25-
public final class ReplicationRequestSplitHelper {
26-
private ReplicationRequestSplitHelper() {}
26+
/**
27+
* This class implements the logic to "split", rather forward, Broadcast requests like refresh and flush
28+
* to the target nodes after a Resharding operation. It also implements the logic to combine responses from
29+
* source and target shards.
30+
*
31+
* The logic to split indexing requests is different because each document id has to be inspected to route it
32+
* to the correct shard post Resharding. That logic is implemented in
33+
* {@link org.elasticsearch.action.bulk.ShardBulkSplitHelper}
34+
*/
35+
public final class BroadcastRequestSplitHelper {
36+
private BroadcastRequestSplitHelper() {}
2737

2838
/**
2939
* Given a stale Replication Request, like flush or refresh, split it into multiple requests,
@@ -32,6 +42,11 @@ private ReplicationRequestSplitHelper() {}
3242
* {@link org.elasticsearch.action.bulk.BulkShardRequest}
3343
* We are here because there was a mismatch between the SplitShardCountSummary in the request
3444
* and that on the primary shard node.
45+
*
46+
* Note that {@link org.elasticsearch.cluster.metadata.IndexReshardingMetadata} cannot be NULL here
47+
* because it is evaluated in {@link ReplicationSplitHelper#needsSplitCoordination(Logger, ReplicationRequest, IndexMetadata)}
48+
* with the same metadata before arriving here. The request would have been rejected there if there was no Resharding metadata found.
49+
*
3550
* TODO:
3651
* We assume here that the request is exactly 1 reshard split behind
3752
* the current state. We might either revise this assumption or enforce it
@@ -52,7 +67,7 @@ public static <T extends ReplicationRequest<T>> Map<ShardId, T> splitRequest(
5267
// Create a request for original source shard and for each target shard.
5368
// New requests that are to be handled by target shards should contain the
5469
// latest ShardCountSummary.
55-
// TODO: This will not work if the reshard metadata is gone
70+
assert indexMetadata.getReshardingMetadata() != null;
5671
int targetShardId = indexMetadata.getReshardingMetadata().getSplit().targetShard(sourceShard.id());
5772
ShardId targetShard = new ShardId(sourceShard.getIndex(), targetShardId);
5873

server/src/main/java/org/elasticsearch/action/support/replication/ReplicationSplitHelper.java

Lines changed: 32 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -11,10 +11,12 @@
1111

1212
import org.apache.logging.log4j.Logger;
1313
import org.elasticsearch.ExceptionsHelper;
14+
import org.elasticsearch.TransportVersion;
1415
import org.elasticsearch.action.ActionListener;
1516
import org.elasticsearch.action.support.RetryableAction;
1617
import org.elasticsearch.cluster.ClusterState;
1718
import org.elasticsearch.cluster.metadata.IndexMetadata;
19+
import org.elasticsearch.cluster.metadata.IndexReshardingMetadata;
1820
import org.elasticsearch.cluster.metadata.ProjectId;
1921
import org.elasticsearch.cluster.metadata.ProjectMetadata;
2022
import org.elasticsearch.cluster.node.DiscoveryNode;
@@ -42,6 +44,7 @@ public class ReplicationSplitHelper<
4244
ReplicaRequest extends ReplicationRequest<ReplicaRequest>,
4345
Response extends ReplicationResponse> {
4446

47+
public static TransportVersion STALE_REQUEST_EXCEPTION_VERSION = TransportVersion.fromName("stale_request_exception");
4548
private final Logger logger;
4649
private final ClusterService clusterService;
4750
private final TimeValue initialRetryBackoffBound;
@@ -66,14 +69,38 @@ public ReplicationSplitHelper(
6669
}
6770

6871
public static <Request extends ReplicationRequest<Request>> boolean needsSplitCoordination(
72+
final Logger logger,
6973
final Request primaryRequest,
7074
final IndexMetadata indexMetadata
71-
) {
75+
) throws Exception {
7276
SplitShardCountSummary requestSplitSummary = primaryRequest.reshardSplitShardCountSummary();
73-
// TODO: We currently only set the request split summary transport shard bulk. Only evaluate this at the moment or else every
77+
// TODO: We currently only set the request split summary for certain Replication Requests
78+
// like refresh, flush and shard bulk requests. Only evaluate this when set or else every
7479
// request would say it needs a split.
75-
return requestSplitSummary.isUnset() == false
76-
&& requestSplitSummary.equals(SplitShardCountSummary.forIndexing(indexMetadata, primaryRequest.shardId().getId())) == false;
80+
if (requestSplitSummary.isUnset()) { // no split coordination required
81+
return false;
82+
}
83+
84+
SplitShardCountSummary latestSplitSummary = SplitShardCountSummary.forIndexing(indexMetadata, primaryRequest.shardId().getId());
85+
if (requestSplitSummary.equals(latestSplitSummary)) { // no split coordination required
86+
return false;
87+
} else { // check that resharding is ongoing and the latest shard count is exactly 2 times the shard count in the request
88+
if (indexMetadata.getReshardingMetadata() == null
89+
|| latestSplitSummary.asInt() != IndexReshardingMetadata.RESHARD_SPLIT_FACTOR * requestSplitSummary.asInt()) {
90+
if (indexMetadata.getReshardingMetadata() == null) {
91+
logger.debug("Request is stale, expected resharding metadata but none found");
92+
} else {
93+
logger.debug(
94+
"Request is stale due to concurrent reshard operation, expected shardCountSummary [{}] but found [{}]",
95+
requestSplitSummary.asInt(),
96+
latestSplitSummary.asInt()
97+
);
98+
}
99+
throw new StaleRequestException(primaryRequest.index());
100+
}
101+
return true;
102+
}
103+
77104
}
78105

79106
@FunctionalInterface
@@ -137,7 +164,7 @@ public SplitCoordinator(
137164
}
138165

139166
public void coordinate() throws Exception {
140-
Map<ShardId, Request> splitRequests = action.splitRequestOnPrimary(originalRequest);
167+
Map<ShardId, Request> splitRequests = action.splitRequestOnPrimary(originalRequest, project);
141168

142169
int numSplitRequests = splitRequests.size();
143170

Lines changed: 35 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,35 @@
1+
/*
2+
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
3+
* or more contributor license agreements. Licensed under the "Elastic License
4+
* 2.0", the "GNU Affero General Public License v3.0 only", and the "Server Side
5+
* Public License v 1"; you may not use this file except in compliance with, at
6+
* your election, the "Elastic License 2.0", the "GNU Affero General Public
7+
* License v3.0 only", or the "Server Side Public License, v 1".
8+
*/
9+
10+
package org.elasticsearch.action.support.replication;
11+
12+
import org.elasticsearch.ElasticsearchException;
13+
import org.elasticsearch.common.io.stream.StreamInput;
14+
import org.elasticsearch.rest.RestStatus;
15+
16+
import java.io.IOException;
17+
18+
/**
19+
* An exception indicating a stale request during resharding.
20+
*/
21+
public class StaleRequestException extends ElasticsearchException {
22+
23+
public StaleRequestException(String index) {
24+
super("Request for index [{}] is stale due to concurrent reshard operation, retry after sometime", index);
25+
}
26+
27+
public StaleRequestException(StreamInput in) throws IOException {
28+
super(in);
29+
}
30+
31+
@Override
32+
public final RestStatus status() {
33+
return RestStatus.SERVICE_UNAVAILABLE;
34+
}
35+
}

server/src/main/java/org/elasticsearch/action/support/replication/TransportReplicationAction.java

Lines changed: 8 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -339,10 +339,14 @@ protected abstract void shardOperationOnReplica(
339339
/**
340340
* During Resharding, we might need to split the primary request.
341341
* We are here because there was mismatch between the SplitShardCountSummary in the request
342-
* and that on the primary shard node. We assume that the request is exactly 1 reshard split behind
343-
* the current state.
342+
* and that on the primary shard node. In other words, the primary shard has moved ahead due to a split reshard
343+
* operation after the request was created by the coordinator.
344+
* We can assume that the request is exactly 1 reshard split behind the current state of the primary shard.
345+
* This is because requests that are more than 1 reshard operation behind are rejected in
346+
* {@link org.elasticsearch.action.support.replication.ReplicationSplitHelper
347+
* #needsSplitCoordination(org.apache.logging.log4j.Logger, ReplicationRequest, IndexMetadata)}
344348
*/
345-
protected Map<ShardId, Request> splitRequestOnPrimary(Request request) {
349+
protected Map<ShardId, Request> splitRequestOnPrimary(Request request, ProjectMetadata project) {
346350
return Map.of(request.shardId(), request);
347351
}
348352

@@ -533,7 +537,7 @@ void runWithPrimaryShardReference(final PrimaryShardReference primaryShardRefere
533537
TransportResponseHandler.TRANSPORT_WORKER
534538
)
535539
);
536-
} else if (ReplicationSplitHelper.needsSplitCoordination(primaryRequest.getRequest(), indexMetadata)) {
540+
} else if (ReplicationSplitHelper.needsSplitCoordination(logger, primaryRequest.getRequest(), indexMetadata)) {
537541
ReplicationSplitHelper<Request, ReplicaRequest, Response>.SplitCoordinator splitCoordinator = splitHelper
538542
.newSplitRequest(
539543
TransportReplicationAction.this,

server/src/main/java/org/elasticsearch/cluster/metadata/IndexReshardingMetadata.java

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -91,6 +91,11 @@
9191
public class IndexReshardingMetadata implements ToXContentFragment, Writeable {
9292
private static final String SPLIT_FIELD_NAME = "split";
9393
private static final ParseField SPLIT_FIELD = new ParseField(SPLIT_FIELD_NAME);
94+
95+
// During a reshard split operation, we restrict the split factor to be exactly 2 i.e
96+
// we can go from 1 -> 2 shards, or 2 -> 4 shards, and so on.
97+
public static final int RESHARD_SPLIT_FACTOR = 2;
98+
9499
// This exists only so that tests can verify that IndexReshardingMetadata supports more than one kind of operation.
95100
// It can be removed when we have defined a second real operation, such as shrink.
96101
private static final String NOOP_FIELD_NAME = "noop";
Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1 @@
1+
9274000

0 commit comments

Comments
 (0)