Skip to content

Commit 928fd32

Browse files
Merge branch 'main' into threadpool-merge-scheduler
2 parents a8f5297 + c02292f commit 928fd32

File tree

9 files changed

+186
-80
lines changed

9 files changed

+186
-80
lines changed

docs/changelog/120250.yaml

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,6 @@
1+
pr: 120250
2+
summary: "Retry internally when CAS upload is throttled [GCS]"
3+
area: Snapshot/Restore
4+
type: enhancement
5+
issues:
6+
- 116546

modules/repository-azure/src/internalClusterTest/java/org/elasticsearch/repositories/azure/AzureBlobStoreRepositoryMetricsTests.java

Lines changed: 37 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,7 @@
99

1010
package org.elasticsearch.repositories.azure;
1111

12+
import com.sun.net.httpserver.Headers;
1213
import com.sun.net.httpserver.HttpExchange;
1314
import com.sun.net.httpserver.HttpHandler;
1415

@@ -21,6 +22,7 @@
2122
import org.elasticsearch.common.bytes.BytesReference;
2223
import org.elasticsearch.common.settings.Settings;
2324
import org.elasticsearch.core.SuppressForbidden;
25+
import org.elasticsearch.http.ResponseInjectingHttpHandler;
2426
import org.elasticsearch.plugins.PluginsService;
2527
import org.elasticsearch.repositories.RepositoriesMetrics;
2628
import org.elasticsearch.repositories.RepositoriesService;
@@ -46,7 +48,6 @@
4648
import java.util.stream.IntStream;
4749

4850
import static org.elasticsearch.repositories.azure.AbstractAzureServerTestCase.randomBlobContent;
49-
import static org.elasticsearch.repositories.azure.ResponseInjectingAzureHttpHandler.createFailNRequestsHandler;
5051
import static org.elasticsearch.repositories.blobstore.BlobStoreTestUtil.randomPurpose;
5152
import static org.hamcrest.Matchers.greaterThanOrEqualTo;
5253
import static org.hamcrest.Matchers.hasSize;
@@ -60,15 +61,15 @@ public class AzureBlobStoreRepositoryMetricsTests extends AzureBlobStoreReposito
6061
);
6162
private static final int MAX_RETRIES = 3;
6263

63-
private final Queue<ResponseInjectingAzureHttpHandler.RequestHandler> requestHandlers = new ConcurrentLinkedQueue<>();
64+
private final Queue<ResponseInjectingHttpHandler.RequestHandler> requestHandlers = new ConcurrentLinkedQueue<>();
6465

6566
@Override
6667
protected Map<String, HttpHandler> createHttpHandlers() {
6768
Map<String, HttpHandler> httpHandlers = super.createHttpHandlers();
6869
assert httpHandlers.size() == 1 : "This assumes there's a single handler";
6970
return httpHandlers.entrySet()
7071
.stream()
71-
.collect(Collectors.toMap(Map.Entry::getKey, e -> new ResponseInjectingAzureHttpHandler(requestHandlers, e.getValue())));
72+
.collect(Collectors.toMap(Map.Entry::getKey, e -> new ResponseInjectingHttpHandler(requestHandlers, e.getValue())));
7273
}
7374

7475
/**
@@ -106,7 +107,7 @@ public void testThrottleResponsesAreCountedInMetrics() throws IOException {
106107
// Queue up some throttle responses
107108
final int numThrottles = randomIntBetween(1, MAX_RETRIES);
108109
IntStream.range(0, numThrottles)
109-
.forEach(i -> requestHandlers.offer(new ResponseInjectingAzureHttpHandler.FixedRequestHandler(RestStatus.TOO_MANY_REQUESTS)));
110+
.forEach(i -> requestHandlers.offer(new ResponseInjectingHttpHandler.FixedRequestHandler(RestStatus.TOO_MANY_REQUESTS)));
110111

111112
// Check that the blob exists
112113
blobContainer.blobExists(purpose, blobName);
@@ -132,11 +133,7 @@ public void testRangeNotSatisfiedAreCountedInMetrics() throws IOException {
132133

133134
// Queue up a range-not-satisfied error
134135
requestHandlers.offer(
135-
new ResponseInjectingAzureHttpHandler.FixedRequestHandler(
136-
RestStatus.REQUESTED_RANGE_NOT_SATISFIED,
137-
null,
138-
GET_BLOB_REQUEST_PREDICATE
139-
)
136+
new ResponseInjectingHttpHandler.FixedRequestHandler(RestStatus.REQUESTED_RANGE_NOT_SATISFIED, null, GET_BLOB_REQUEST_PREDICATE)
140137
);
141138

142139
// Attempt to read the blob
@@ -169,7 +166,7 @@ public void testErrorResponsesAreCountedInMetrics() throws IOException {
169166
if (status == RestStatus.TOO_MANY_REQUESTS) {
170167
throttles.incrementAndGet();
171168
}
172-
requestHandlers.offer(new ResponseInjectingAzureHttpHandler.FixedRequestHandler(status));
169+
requestHandlers.offer(new ResponseInjectingHttpHandler.FixedRequestHandler(status));
173170
});
174171

175172
// Check that the blob exists
@@ -265,7 +262,7 @@ public void testBatchDeleteFailure() throws IOException {
265262
clearMetrics(dataNodeName);
266263

267264
// Handler will fail one or more of the batch requests
268-
final ResponseInjectingAzureHttpHandler.RequestHandler failNRequestRequestHandler = createFailNRequestsHandler(failedBatches);
265+
final ResponseInjectingHttpHandler.RequestHandler failNRequestRequestHandler = createFailNRequestsHandler(failedBatches);
269266

270267
// Exhaust the retries
271268
IntStream.range(0, (numberOfBatches - failedBatches) + (failedBatches * (MAX_RETRIES + 1)))
@@ -308,6 +305,35 @@ private MetricsAsserter metricsAsserter(
308305
return new MetricsAsserter(dataNodeName, operationPurpose, operation, repository);
309306
}
310307

308+
/**
309+
* Creates a {@link ResponseInjectingHttpHandler.RequestHandler} that will persistently fail the first <code>numberToFail</code>
310+
* distinct requests it sees. Any other requests are passed through to the delegate.
311+
*
312+
* @param numberToFail The number of requests to fail
313+
* @return the handler
314+
*/
315+
private static ResponseInjectingHttpHandler.RequestHandler createFailNRequestsHandler(int numberToFail) {
316+
final List<String> requestsToFail = new ArrayList<>(numberToFail);
317+
return (exchange, delegate) -> {
318+
final Headers requestHeaders = exchange.getRequestHeaders();
319+
final String requestId = requestHeaders.get("X-ms-client-request-id").get(0);
320+
boolean failRequest = false;
321+
synchronized (requestsToFail) {
322+
if (requestsToFail.contains(requestId)) {
323+
failRequest = true;
324+
} else if (requestsToFail.size() < numberToFail) {
325+
requestsToFail.add(requestId);
326+
failRequest = true;
327+
}
328+
}
329+
if (failRequest) {
330+
exchange.sendResponseHeaders(500, -1);
331+
} else {
332+
delegate.handle(exchange);
333+
}
334+
};
335+
}
336+
311337
private class MetricsAsserter {
312338
private final String dataNodeName;
313339
private final OperationPurpose purpose;

modules/repository-azure/src/test/java/org/elasticsearch/repositories/azure/AzureBlobContainerStatsTests.java

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,7 @@
1616
import org.elasticsearch.common.blobstore.OperationPurpose;
1717
import org.elasticsearch.common.bytes.BytesReference;
1818
import org.elasticsearch.core.SuppressForbidden;
19+
import org.elasticsearch.http.ResponseInjectingHttpHandler;
1920
import org.elasticsearch.rest.RestStatus;
2021
import org.junit.Before;
2122

@@ -34,14 +35,14 @@
3435

3536
public class AzureBlobContainerStatsTests extends AbstractAzureServerTestCase {
3637

37-
private final Queue<ResponseInjectingAzureHttpHandler.RequestHandler> requestHandlers = new ConcurrentLinkedQueue<>();
38+
private final Queue<ResponseInjectingHttpHandler.RequestHandler> requestHandlers = new ConcurrentLinkedQueue<>();
3839

3940
@SuppressForbidden(reason = "use a http server")
4041
@Before
4142
public void configureAzureHandler() {
4243
httpServer.createContext(
4344
"/",
44-
new ResponseInjectingAzureHttpHandler(
45+
new ResponseInjectingHttpHandler(
4546
requestHandlers,
4647
new AzureHttpHandler(ACCOUNT, CONTAINER, null, MockAzureBlobStore.LeaseExpiryPredicate.NEVER_EXPIRE)
4748
)
@@ -61,7 +62,7 @@ public void testRetriesAndOperationsAreTrackedSeparately() throws IOException {
6162
for (int i = 0; i < randomIntBetween(10, 50); i++) {
6263
final boolean triggerRetry = randomBoolean();
6364
if (triggerRetry) {
64-
requestHandlers.offer(new ResponseInjectingAzureHttpHandler.FixedRequestHandler(RestStatus.TOO_MANY_REQUESTS));
65+
requestHandlers.offer(new ResponseInjectingHttpHandler.FixedRequestHandler(RestStatus.TOO_MANY_REQUESTS));
6566
}
6667
final AzureBlobStore.Operation operation = randomFrom(supportedOperations);
6768
switch (operation) {

modules/repository-gcs/src/internalClusterTest/java/org/elasticsearch/repositories/gcs/GoogleCloudStorageBlobStoreRepositoryTests.java

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,7 @@
2525
import org.apache.lucene.util.BytesRefBuilder;
2626
import org.elasticsearch.cluster.metadata.RepositoryMetadata;
2727
import org.elasticsearch.cluster.service.ClusterService;
28+
import org.elasticsearch.common.BackoffPolicy;
2829
import org.elasticsearch.common.blobstore.BlobContainer;
2930
import org.elasticsearch.common.blobstore.BlobPath;
3031
import org.elasticsearch.common.blobstore.BlobStore;
@@ -268,7 +269,8 @@ protected GoogleCloudStorageBlobStore createBlobStore() {
268269
metadata.name(),
269270
storageService,
270271
bigArrays,
271-
randomIntBetween(1, 8) * 1024
272+
randomIntBetween(1, 8) * 1024,
273+
BackoffPolicy.noBackoff()
272274
) {
273275
@Override
274276
long getLargeBlobThresholdInBytes() {

modules/repository-gcs/src/main/java/org/elasticsearch/repositories/gcs/GoogleCloudStorageBlobStore.java

Lines changed: 41 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,7 @@
2424
import org.apache.logging.log4j.LogManager;
2525
import org.apache.logging.log4j.Logger;
2626
import org.elasticsearch.ExceptionsHelper;
27+
import org.elasticsearch.common.BackoffPolicy;
2728
import org.elasticsearch.common.blobstore.BlobContainer;
2829
import org.elasticsearch.common.blobstore.BlobPath;
2930
import org.elasticsearch.common.blobstore.BlobStore;
@@ -41,6 +42,7 @@
4142
import org.elasticsearch.core.CheckedConsumer;
4243
import org.elasticsearch.core.Streams;
4344
import org.elasticsearch.core.SuppressForbidden;
45+
import org.elasticsearch.core.TimeValue;
4446
import org.elasticsearch.rest.RestStatus;
4547

4648
import java.io.ByteArrayInputStream;
@@ -105,14 +107,16 @@ class GoogleCloudStorageBlobStore implements BlobStore {
105107
private final GoogleCloudStorageOperationsStats stats;
106108
private final int bufferSize;
107109
private final BigArrays bigArrays;
110+
private final BackoffPolicy casBackoffPolicy;
108111

109112
GoogleCloudStorageBlobStore(
110113
String bucketName,
111114
String clientName,
112115
String repositoryName,
113116
GoogleCloudStorageService storageService,
114117
BigArrays bigArrays,
115-
int bufferSize
118+
int bufferSize,
119+
BackoffPolicy casBackoffPolicy
116120
) {
117121
this.bucketName = bucketName;
118122
this.clientName = clientName;
@@ -121,6 +125,7 @@ class GoogleCloudStorageBlobStore implements BlobStore {
121125
this.bigArrays = bigArrays;
122126
this.stats = new GoogleCloudStorageOperationsStats(bucketName);
123127
this.bufferSize = bufferSize;
128+
this.casBackoffPolicy = casBackoffPolicy;
124129
}
125130

126131
private Storage client() throws IOException {
@@ -691,28 +696,46 @@ OptionalBytesReference compareAndExchangeRegister(
691696
.setMd5(Base64.getEncoder().encodeToString(MessageDigests.digest(updated, MessageDigests.md5())))
692697
.build();
693698
final var bytesRef = updated.toBytesRef();
694-
try {
695-
SocketAccess.doPrivilegedVoidIOException(
696-
() -> client().create(
697-
blobInfo,
698-
bytesRef.bytes,
699-
bytesRef.offset,
700-
bytesRef.length,
701-
Storage.BlobTargetOption.generationMatch()
702-
)
703-
);
704-
} catch (Exception e) {
705-
final var serviceException = unwrapServiceException(e);
706-
if (serviceException != null) {
699+
700+
final Iterator<TimeValue> retries = casBackoffPolicy.iterator();
701+
BaseServiceException finalException = null;
702+
while (true) {
703+
try {
704+
SocketAccess.doPrivilegedVoidIOException(
705+
() -> client().create(
706+
blobInfo,
707+
bytesRef.bytes,
708+
bytesRef.offset,
709+
bytesRef.length,
710+
Storage.BlobTargetOption.generationMatch()
711+
)
712+
);
713+
return OptionalBytesReference.of(expected);
714+
} catch (Exception e) {
715+
final var serviceException = unwrapServiceException(e);
716+
if (serviceException == null) {
717+
throw e;
718+
}
707719
final var statusCode = serviceException.getCode();
708-
if (statusCode == RestStatus.PRECONDITION_FAILED.getStatus() || statusCode == RestStatus.TOO_MANY_REQUESTS.getStatus()) {
720+
if (statusCode == RestStatus.PRECONDITION_FAILED.getStatus()) {
709721
return OptionalBytesReference.MISSING;
710722
}
723+
if (statusCode == RestStatus.TOO_MANY_REQUESTS.getStatus()) {
724+
finalException = ExceptionsHelper.useOrSuppress(finalException, serviceException);
725+
if (retries.hasNext()) {
726+
try {
727+
// noinspection BusyWait
728+
Thread.sleep(retries.next().millis());
729+
} catch (InterruptedException iex) {
730+
Thread.currentThread().interrupt();
731+
finalException.addSuppressed(iex);
732+
}
733+
} else {
734+
throw finalException;
735+
}
736+
}
711737
}
712-
throw e;
713738
}
714-
715-
return OptionalBytesReference.of(expected);
716739
}
717740

718741
private static BaseServiceException unwrapServiceException(Throwable t) {

modules/repository-gcs/src/main/java/org/elasticsearch/repositories/gcs/GoogleCloudStorageRepository.java

Lines changed: 37 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -13,12 +13,14 @@
1313
import org.apache.logging.log4j.Logger;
1414
import org.elasticsearch.cluster.metadata.RepositoryMetadata;
1515
import org.elasticsearch.cluster.service.ClusterService;
16+
import org.elasticsearch.common.BackoffPolicy;
1617
import org.elasticsearch.common.Strings;
1718
import org.elasticsearch.common.blobstore.BlobPath;
1819
import org.elasticsearch.common.settings.Setting;
1920
import org.elasticsearch.common.unit.ByteSizeUnit;
2021
import org.elasticsearch.common.unit.ByteSizeValue;
2122
import org.elasticsearch.common.util.BigArrays;
23+
import org.elasticsearch.core.TimeValue;
2224
import org.elasticsearch.indices.recovery.RecoverySettings;
2325
import org.elasticsearch.repositories.RepositoryException;
2426
import org.elasticsearch.repositories.blobstore.MeteredBlobStoreRepository;
@@ -56,10 +58,33 @@ class GoogleCloudStorageRepository extends MeteredBlobStoreRepository {
5658
);
5759
static final Setting<String> CLIENT_NAME = Setting.simpleString("client", "default");
5860

61+
/**
62+
* We will retry CASes that fail due to throttling. We use an {@link BackoffPolicy#linearBackoff(TimeValue, int, TimeValue)}
63+
* with the following parameters
64+
*/
65+
static final Setting<TimeValue> RETRY_THROTTLED_CAS_DELAY_INCREMENT = Setting.timeSetting(
66+
"throttled_cas_retry.delay_increment",
67+
TimeValue.timeValueMillis(100),
68+
TimeValue.ZERO
69+
);
70+
static final Setting<Integer> RETRY_THROTTLED_CAS_MAX_NUMBER_OF_RETRIES = Setting.intSetting(
71+
"throttled_cas_retry.maximum_number_of_retries",
72+
2,
73+
0
74+
);
75+
static final Setting<TimeValue> RETRY_THROTTLED_CAS_MAXIMUM_DELAY = Setting.timeSetting(
76+
"throttled_cas_retry.maximum_delay",
77+
TimeValue.timeValueSeconds(5),
78+
TimeValue.ZERO
79+
);
80+
5981
private final GoogleCloudStorageService storageService;
6082
private final ByteSizeValue chunkSize;
6183
private final String bucket;
6284
private final String clientName;
85+
private final TimeValue retryThrottledCasDelayIncrement;
86+
private final int retryThrottledCasMaxNumberOfRetries;
87+
private final TimeValue retryThrottledCasMaxDelay;
6388

6489
GoogleCloudStorageRepository(
6590
final RepositoryMetadata metadata,
@@ -83,6 +108,9 @@ class GoogleCloudStorageRepository extends MeteredBlobStoreRepository {
83108
this.chunkSize = getSetting(CHUNK_SIZE, metadata);
84109
this.bucket = getSetting(BUCKET, metadata);
85110
this.clientName = CLIENT_NAME.get(metadata.settings());
111+
this.retryThrottledCasDelayIncrement = RETRY_THROTTLED_CAS_DELAY_INCREMENT.get(metadata.settings());
112+
this.retryThrottledCasMaxNumberOfRetries = RETRY_THROTTLED_CAS_MAX_NUMBER_OF_RETRIES.get(metadata.settings());
113+
this.retryThrottledCasMaxDelay = RETRY_THROTTLED_CAS_MAXIMUM_DELAY.get(metadata.settings());
86114
logger.debug("using bucket [{}], base_path [{}], chunk_size [{}], compress [{}]", bucket, basePath(), chunkSize, isCompress());
87115
}
88116

@@ -105,7 +133,15 @@ private static Map<String, String> buildLocation(RepositoryMetadata metadata) {
105133

106134
@Override
107135
protected GoogleCloudStorageBlobStore createBlobStore() {
108-
return new GoogleCloudStorageBlobStore(bucket, clientName, metadata.name(), storageService, bigArrays, bufferSize);
136+
return new GoogleCloudStorageBlobStore(
137+
bucket,
138+
clientName,
139+
metadata.name(),
140+
storageService,
141+
bigArrays,
142+
bufferSize,
143+
BackoffPolicy.linearBackoff(retryThrottledCasDelayIncrement, retryThrottledCasMaxNumberOfRetries, retryThrottledCasMaxDelay)
144+
);
109145
}
110146

111147
@Override

0 commit comments

Comments
 (0)