- 
                Notifications
    
You must be signed in to change notification settings  - Fork 25.6k
 
Limit number of suppressed S3 deletion errors #123630
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from 3 commits
80cbf72
              296fa18
              2a9f23b
              6b85c33
              6662a8b
              9b3dac4
              80d6bd4
              File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change | 
|---|---|---|
| @@ -0,0 +1,6 @@ | ||
| pr: 123630 | ||
| summary: Limit number of suppressed S3 deletion errors | ||
| area: Snapshot/Restore | ||
| type: bug | ||
| issues: | ||
| - 123354 | 
| Original file line number | Diff line number | Diff line change | 
|---|---|---|
| 
          
            
          
           | 
    @@ -69,6 +69,8 @@ class S3BlobStore implements BlobStore { | |
| */ | ||
| static final int MAX_BULK_DELETES = 1000; | ||
| 
     | 
||
| static final int MAX_DELETE_EXCEPTIONS = 10; | ||
| 
     | 
||
| private static final Logger logger = LogManager.getLogger(S3BlobStore.class); | ||
| 
     | 
||
| private final S3Service service; | ||
| 
          
            
          
           | 
    @@ -340,6 +342,8 @@ public BlobContainer blobContainer(BlobPath path) { | |
| return new S3BlobContainer(path, this); | ||
| } | ||
| 
     | 
||
| record DeletionExceptions(Exception exception, int count) {} | ||
                
       | 
||
| 
     | 
||
| void deleteBlobs(OperationPurpose purpose, Iterator<String> blobNames) throws IOException { | ||
| if (blobNames.hasNext() == false) { | ||
| return; | ||
| 
        
          
        
         | 
    @@ -348,7 +352,7 @@ void deleteBlobs(OperationPurpose purpose, Iterator<String> blobNames) throws IO | |
| final List<String> partition = new ArrayList<>(); | ||
| try { | ||
| // S3 API only allows 1k blobs per delete so we split up the given blobs into requests of max. 1k deletes | ||
| final AtomicReference<Exception> aex = new AtomicReference<>(); | ||
| final AtomicReference<DeletionExceptions> aex = new AtomicReference<>(new DeletionExceptions(null, 0)); | ||
| blobNames.forEachRemaining(key -> { | ||
| partition.add(key); | ||
| if (partition.size() == bulkDeletionBatchSize) { | ||
| 
        
          
        
         | 
    @@ -359,8 +363,8 @@ void deleteBlobs(OperationPurpose purpose, Iterator<String> blobNames) throws IO | |
| if (partition.isEmpty() == false) { | ||
| deletePartition(purpose, partition, aex); | ||
| } | ||
| if (aex.get() != null) { | ||
| throw aex.get(); | ||
| if (aex.get().exception != null) { | ||
| throw aex.get().exception; | ||
| } | ||
| } catch (Exception e) { | ||
| throw new IOException("Failed to delete blobs " + partition.stream().limit(10).toList(), e); | ||
| 
        
          
        
         | 
    @@ -374,7 +378,7 @@ void deleteBlobs(OperationPurpose purpose, Iterator<String> blobNames) throws IO | |
| * @param partition The list of blobs to delete | ||
| * @param aex A holder for any exception(s) thrown during the deletion | ||
| */ | ||
| private void deletePartition(OperationPurpose purpose, List<String> partition, AtomicReference<Exception> aex) { | ||
| private void deletePartition(OperationPurpose purpose, List<String> partition, AtomicReference<DeletionExceptions> aex) { | ||
| final Iterator<TimeValue> retries = retryThrottledDeleteBackoffPolicy.iterator(); | ||
| int retryCounter = 0; | ||
| while (true) { | ||
| 
        
          
        
         | 
    @@ -395,7 +399,7 @@ private void deletePartition(OperationPurpose purpose, List<String> partition, A | |
| ), | ||
| e | ||
| ); | ||
| aex.set(ExceptionsHelper.useOrSuppress(aex.get(), e)); | ||
| useOrMaybeSuppress(aex, e); | ||
| return; | ||
| } catch (AmazonClientException e) { | ||
| if (shouldRetryDelete(purpose) && RetryUtils.isThrottlingException(e)) { | ||
| 
        
          
        
         | 
    @@ -404,19 +408,26 @@ private void deletePartition(OperationPurpose purpose, List<String> partition, A | |
| retryCounter++; | ||
| } else { | ||
| s3RepositoriesMetrics.retryDeletesHistogram().record(retryCounter); | ||
| aex.set(ExceptionsHelper.useOrSuppress(aex.get(), e)); | ||
| useOrMaybeSuppress(aex, e); | ||
| return; | ||
| } | ||
| } else { | ||
| // The AWS client threw any unexpected exception and did not execute the request at all so we do not | ||
| // remove any keys from the outstanding deletes set. | ||
| aex.set(ExceptionsHelper.useOrSuppress(aex.get(), e)); | ||
| useOrMaybeSuppress(aex, e); | ||
| return; | ||
| } | ||
| } | ||
| } | ||
| } | ||
| 
     | 
||
| private void useOrMaybeSuppress(AtomicReference<DeletionExceptions> aex, Exception e) { | ||
| var deletionExceptions = aex.get(); | ||
| if (deletionExceptions.count < MAX_DELETE_EXCEPTIONS) { | ||
| aex.set(new DeletionExceptions(ExceptionsHelper.useOrSuppress(deletionExceptions.exception, e), deletionExceptions.count + 1)); | ||
| } | ||
| } | ||
| 
     | 
||
| /** | ||
| * If there are remaining retries, pause for the configured interval then return true | ||
| * | ||
| 
          
            
          
           | 
    ||
| Original file line number | Diff line number | Diff line change | 
|---|---|---|
| 
          
            
          
           | 
    @@ -54,6 +54,7 @@ | |
| import org.elasticsearch.telemetry.InstrumentType; | ||
| import org.elasticsearch.telemetry.Measurement; | ||
| import org.elasticsearch.telemetry.RecordingMeterRegistry; | ||
| import org.elasticsearch.test.ESTestCase; | ||
| import org.elasticsearch.watcher.ResourceWatcherService; | ||
| import org.hamcrest.Matcher; | ||
| import org.junit.After; | ||
| 
          
            
          
           | 
    @@ -162,6 +163,16 @@ protected BlobContainer createBlobContainer( | |
| final @Nullable TimeValue readTimeout, | ||
| final @Nullable Boolean disableChunkedEncoding, | ||
| final @Nullable ByteSizeValue bufferSize | ||
| ) { | ||
| return createBlobContainer(maxRetries, readTimeout, disableChunkedEncoding, bufferSize, null); | ||
| } | ||
| 
     | 
||
| protected BlobContainer createBlobContainer( | ||
| final @Nullable Integer maxRetries, | ||
| final @Nullable TimeValue readTimeout, | ||
| final @Nullable Boolean disableChunkedEncoding, | ||
| final @Nullable ByteSizeValue bufferSize, | ||
| final @Nullable Integer maxBulkDeletes | ||
| 
         There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I'd rather just have the one method rather than having to work out which variant is being called (and what the implied default params) are in each case. I know it adds a bunch of noise here but it'll be better in the long run. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. yeah, mostly because of the noise and the fact that this way of passing these arguments is terrible to extend. Speaking of the long-run, how about replacing these with a parameter object? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Some kind of builder would seem in order here, but maybe that can be deferred to a followup. Or do that first, reducing the noise in this PR. But either way, let's not have two methods that do almost the same thing. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.  | 
||
| ) { | ||
| final Settings.Builder clientSettings = Settings.builder(); | ||
| final String clientName = randomAlphaOfLength(5).toLowerCase(Locale.ROOT); | ||
| 
          
            
          
           | 
    @@ -192,14 +203,13 @@ protected BlobContainer createBlobContainer( | |
| clientSettings.setSecureSettings(secureSettings); | ||
| service.refreshAndClearCache(S3ClientSettings.load(clientSettings.build())); | ||
| 
     | 
||
| final RepositoryMetadata repositoryMetadata = new RepositoryMetadata( | ||
| "repository", | ||
| S3Repository.TYPE, | ||
| Settings.builder() | ||
| .put(S3Repository.CLIENT_NAME.getKey(), clientName) | ||
| .put(S3Repository.GET_REGISTER_RETRY_DELAY.getKey(), TimeValue.ZERO) | ||
| .build() | ||
| ); | ||
| final var repositorySettings = Settings.builder() | ||
| .put(S3Repository.CLIENT_NAME.getKey(), clientName) | ||
| .put(S3Repository.GET_REGISTER_RETRY_DELAY.getKey(), TimeValue.ZERO); | ||
| if (maxBulkDeletes != null) { | ||
| repositorySettings.put(S3Repository.DELETION_BATCH_SIZE_SETTING.getKey(), maxBulkDeletes); | ||
| } | ||
| final RepositoryMetadata repositoryMetadata = new RepositoryMetadata("repository", S3Repository.TYPE, repositorySettings.build()); | ||
| 
     | 
||
| final S3BlobStore s3BlobStore = new S3BlobStore( | ||
| service, | ||
| 
          
            
          
           | 
    @@ -1073,6 +1083,38 @@ interface FailingHandlerFactory { | |
| } | ||
| } | ||
| 
     | 
||
| public void testSuppressedDeletionErrorsAreCapped() { | ||
| final TimeValue readTimeout = TimeValue.timeValueMillis(randomIntBetween(100, 500)); | ||
| int maxBulkDeleteSize = randomIntBetween(1, 10); | ||
| final BlobContainer blobContainer = createBlobContainer(1, readTimeout, true, null, maxBulkDeleteSize); | ||
| httpServer.createContext("/", exchange -> { | ||
| if (exchange.getRequestMethod().equals("POST") && exchange.getRequestURI().toString().startsWith("/bucket/?delete")) { | ||
| exchange.sendResponseHeaders( | ||
| randomFrom( | ||
| HttpStatus.SC_INTERNAL_SERVER_ERROR, | ||
| HttpStatus.SC_BAD_GATEWAY, | ||
| HttpStatus.SC_SERVICE_UNAVAILABLE, | ||
| HttpStatus.SC_GATEWAY_TIMEOUT, | ||
| HttpStatus.SC_NOT_FOUND, | ||
| HttpStatus.SC_UNAUTHORIZED | ||
| ), | ||
| -1 | ||
| ); | ||
| exchange.close(); | ||
| } else { | ||
| fail("expected only deletions"); | ||
| } | ||
| }); | ||
| try { | ||
| var maxNoOfDeletions = 2 * S3BlobStore.MAX_DELETE_EXCEPTIONS; | ||
| var blobs = randomList(1, maxNoOfDeletions * maxBulkDeleteSize, ESTestCase::randomIdentifier); | ||
| blobContainer.deleteBlobsIgnoringIfNotExists(randomPurpose(), blobs.iterator()); | ||
| fail("deletion should not succeed"); | ||
                
       | 
||
| } catch (IOException e) { | ||
| assertThat(e.getCause().getSuppressed().length, lessThan(S3BlobStore.MAX_DELETE_EXCEPTIONS)); | ||
| } | ||
| } | ||
| 
     | 
||
| @Override | ||
| protected Matcher<Integer> getMaxRetriesMatcher(int maxRetries) { | ||
| // some attempts make meaningful progress and do not count towards the max retry limit | ||
| 
          
            
          
           | 
    ||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I've picked this randomly more or less. Not sure what it should be. I'm not sure having more than this is useful.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
👍 fine by me