2020import org .elasticsearch .common .collect .Iterators ;
2121import org .elasticsearch .common .settings .Settings ;
2222import org .elasticsearch .core .SuppressForbidden ;
23+ import org .elasticsearch .core .TimeValue ;
2324import org .elasticsearch .plugins .PluginsService ;
2425import org .elasticsearch .repositories .RepositoriesService ;
2526import org .elasticsearch .repositories .blobstore .BlobStoreRepository ;
3132import org .elasticsearch .test .ESIntegTestCase ;
3233
3334import java .io .IOException ;
35+ import java .nio .charset .StandardCharsets ;
36+ import java .util .ArrayList ;
3437import java .util .Arrays ;
3538import java .util .Collections ;
3639import java .util .List ;
3740import java .util .Map ;
3841import java .util .Queue ;
3942import java .util .concurrent .LinkedBlockingQueue ;
4043import java .util .concurrent .TimeUnit ;
44+ import java .util .stream .IntStream ;
4145
4246import static org .elasticsearch .repositories .RepositoriesMetrics .HTTP_REQUEST_TIME_IN_MILLIS_HISTOGRAM ;
4347import static org .elasticsearch .repositories .RepositoriesMetrics .METRIC_EXCEPTIONS_HISTOGRAM ;
4852import static org .elasticsearch .repositories .RepositoriesMetrics .METRIC_THROTTLES_HISTOGRAM ;
4953import static org .elasticsearch .repositories .RepositoriesMetrics .METRIC_THROTTLES_TOTAL ;
5054import static org .elasticsearch .repositories .RepositoriesMetrics .METRIC_UNSUCCESSFUL_OPERATIONS_TOTAL ;
55+ import static org .elasticsearch .repositories .s3 .S3RepositoriesMetrics .METRIC_DELETE_RETRIES_HISTOGRAM ;
5156import static org .elasticsearch .rest .RestStatus .INTERNAL_SERVER_ERROR ;
5257import static org .elasticsearch .rest .RestStatus .NOT_FOUND ;
5358import static org .elasticsearch .rest .RestStatus .REQUESTED_RANGE_NOT_SATISFIED ;
59+ import static org .elasticsearch .rest .RestStatus .SERVICE_UNAVAILABLE ;
5460import static org .elasticsearch .rest .RestStatus .TOO_MANY_REQUESTS ;
5561import static org .hamcrest .Matchers .equalTo ;
5662import static org .hamcrest .Matchers .instanceOf ;
6167@ ESIntegTestCase .ClusterScope (scope = ESIntegTestCase .Scope .TEST )
6268public class S3BlobStoreRepositoryMetricsTests extends S3BlobStoreRepositoryTests {
6369
64- private final Queue <RestStatus > errorStatusQueue = new LinkedBlockingQueue <>();
70+ private static final S3ErrorResponse S3_SLOW_DOWN_RESPONSE = new S3ErrorResponse (SERVICE_UNAVAILABLE , """
71+ <?xml version="1.0" encoding="UTF-8"?>
72+ <Error>
73+ <Code>SlowDown</Code>
74+ <Message>This is a throttling message</Message>
75+ <Resource>/bucket/</Resource>
76+ <RequestId>4442587FB7D0A2F9</RequestId>
77+ </Error>""" );
78+ private final Queue <S3ErrorResponse > errorResponseQueue = new LinkedBlockingQueue <>();
6579
6680 // Always create erroneous handler
6781 @ Override
6882 protected Map <String , HttpHandler > createHttpHandlers () {
6983 return Collections .singletonMap (
7084 "/bucket" ,
71- new S3StatsCollectorHttpHandler (new S3MetricErroneousHttpHandler (new S3BlobStoreHttpHandler ("bucket" ), errorStatusQueue ))
85+ new S3StatsCollectorHttpHandler (new S3MetricErroneousHttpHandler (new S3BlobStoreHttpHandler ("bucket" ), errorResponseQueue ))
7286 );
7387 }
7488
@@ -244,8 +258,74 @@ public void testMetricsForRequestRangeNotSatisfied() {
244258 }
245259 }
246260
261+ public void testRetrySnapshotDeleteMetricsOnEventualSuccess () throws IOException {
262+ final int maxRetries = 5 ;
263+ final String repositoryName = randomRepositoryName ();
264+ // Disable retries in the client for this repo
265+ createRepository (
266+ repositoryName ,
267+ Settings .builder ()
268+ .put (repositorySettings (repositoryName ))
269+ .put (S3ClientSettings .MAX_RETRIES_SETTING .getConcreteSettingForNamespace ("placeholder" ).getKey (), 0 )
270+ .put (S3Repository .RETRY_THROTTLED_DELETE_DELAY_INCREMENT .getKey (), TimeValue .timeValueMillis (10 ))
271+ .put (S3Repository .RETRY_THROTTLED_DELETE_MAX_NUMBER_OF_RETRIES .getKey (), maxRetries )
272+ .build (),
273+ false
274+ );
275+ final String dataNodeName = internalCluster ().getNodeNameThat (DiscoveryNode ::canContainData );
276+ final BlobContainer blobContainer = getBlobContainer (dataNodeName , repositoryName );
277+ final TestTelemetryPlugin plugin = getPlugin (dataNodeName );
278+ final int numberOfDeletes = randomIntBetween (1 , 3 );
279+ final List <Long > numberOfRetriesPerAttempt = new ArrayList <>();
280+ for (int i = 0 ; i < numberOfDeletes ; i ++) {
281+ int numFailures = randomIntBetween (1 , maxRetries );
282+ numberOfRetriesPerAttempt .add ((long ) numFailures );
283+ IntStream .range (0 , numFailures ).forEach (ignored -> addErrorStatus (S3_SLOW_DOWN_RESPONSE ));
284+ blobContainer .deleteBlobsIgnoringIfNotExists (
285+ randomFrom (OperationPurpose .SNAPSHOT_DATA , OperationPurpose .SNAPSHOT_METADATA ),
286+ List .of (randomIdentifier ()).iterator ()
287+ );
288+ }
289+ List <Measurement > longHistogramMeasurement = plugin .getLongHistogramMeasurement (METRIC_DELETE_RETRIES_HISTOGRAM );
290+ assertThat (longHistogramMeasurement .stream ().map (Measurement ::getLong ).toList (), equalTo (numberOfRetriesPerAttempt ));
291+ }
292+
293+ public void testRetrySnapshotDeleteMetricsWhenRetriesExhausted () {
294+ final String repositoryName = randomRepositoryName ();
295+ // Disable retries in the client for this repo
296+ int maxRetries = 3 ;
297+ createRepository (
298+ repositoryName ,
299+ Settings .builder ()
300+ .put (repositorySettings (repositoryName ))
301+ .put (S3ClientSettings .MAX_RETRIES_SETTING .getConcreteSettingForNamespace ("placeholder" ).getKey (), 0 )
302+ .put (S3Repository .RETRY_THROTTLED_DELETE_DELAY_INCREMENT .getKey (), TimeValue .timeValueMillis (10 ))
303+ .put (S3Repository .RETRY_THROTTLED_DELETE_MAX_NUMBER_OF_RETRIES .getKey (), maxRetries )
304+ .build (),
305+ false
306+ );
307+ final String dataNodeName = internalCluster ().getNodeNameThat (DiscoveryNode ::canContainData );
308+ final BlobContainer blobContainer = getBlobContainer (dataNodeName , repositoryName );
309+ final TestTelemetryPlugin plugin = getPlugin (dataNodeName );
310+ // Keep throttling past the max number of retries
311+ IntStream .range (0 , maxRetries + 1 ).forEach (ignored -> addErrorStatus (S3_SLOW_DOWN_RESPONSE ));
312+ assertThrows (
313+ IOException .class ,
314+ () -> blobContainer .deleteBlobsIgnoringIfNotExists (
315+ randomFrom (OperationPurpose .SNAPSHOT_DATA , OperationPurpose .SNAPSHOT_METADATA ),
316+ List .of (randomIdentifier ()).iterator ()
317+ )
318+ );
319+ List <Measurement > longHistogramMeasurement = plugin .getLongHistogramMeasurement (METRIC_DELETE_RETRIES_HISTOGRAM );
320+ assertThat (longHistogramMeasurement .get (0 ).getLong (), equalTo (3L ));
321+ }
322+
247323 private void addErrorStatus (RestStatus ... statuses ) {
248- errorStatusQueue .addAll (Arrays .asList (statuses ));
324+ errorResponseQueue .addAll (Arrays .stream (statuses ).map (S3ErrorResponse ::new ).toList ());
325+ }
326+
327+ private void addErrorStatus (S3ErrorResponse ... responses ) {
328+ errorResponseQueue .addAll (Arrays .asList (responses ));
249329 }
250330
251331 private long getLongCounterValue (TestTelemetryPlugin plugin , String instrumentName , Operation operation ) {
@@ -275,25 +355,25 @@ private long getLongHistogramValue(TestTelemetryPlugin plugin, String instrument
275355 private static class S3MetricErroneousHttpHandler implements DelegatingHttpHandler {
276356
277357 private final HttpHandler delegate ;
278- private final Queue <RestStatus > errorStatusQueue ;
358+ private final Queue <S3ErrorResponse > errorResponseQueue ;
279359
280- S3MetricErroneousHttpHandler (HttpHandler delegate , Queue <RestStatus > errorStatusQueue ) {
360+ S3MetricErroneousHttpHandler (HttpHandler delegate , Queue <S3ErrorResponse > errorResponseQueue ) {
281361 this .delegate = delegate ;
282- this .errorStatusQueue = errorStatusQueue ;
362+ this .errorResponseQueue = errorResponseQueue ;
283363 }
284364
285365 @ Override
286366 public void handle (HttpExchange exchange ) throws IOException {
287- final RestStatus status = errorStatusQueue .poll ();
288- if (status == null ) {
367+ final S3ErrorResponse errorResponse = errorResponseQueue .poll ();
368+ if (errorResponse == null ) {
289369 delegate .handle (exchange );
290- } else if (status == INTERNAL_SERVER_ERROR ) {
370+ } else if (errorResponse . status == INTERNAL_SERVER_ERROR ) {
291371 // Simulate an retryable exception
292372 throw new IOException ("ouch" );
293373 } else {
294374 try (exchange ) {
295375 drainInputStream (exchange .getRequestBody ());
296- exchange . sendResponseHeaders ( status . getStatus (), - 1 );
376+ errorResponse . writeResponse ( exchange );
297377 }
298378 }
299379 }
@@ -302,4 +382,22 @@ public HttpHandler getDelegate() {
302382 return delegate ;
303383 }
304384 }
385+
386+ record S3ErrorResponse (RestStatus status , String responseBody ) {
387+
388+ S3ErrorResponse (RestStatus status ) {
389+ this (status , null );
390+ }
391+
392+ @ SuppressForbidden (reason = "this test uses a HttpServer to emulate an S3 endpoint" )
393+ public void writeResponse (HttpExchange exchange ) throws IOException {
394+ if (responseBody != null ) {
395+ byte [] responseBytes = responseBody .getBytes (StandardCharsets .UTF_8 );
396+ exchange .sendResponseHeaders (status .getStatus (), responseBytes .length );
397+ exchange .getResponseBody ().write (responseBytes );
398+ } else {
399+ exchange .sendResponseHeaders (status .getStatus (), -1 );
400+ }
401+ }
402+ }
305403}
0 commit comments