Skip to content
Merged
Show file tree
Hide file tree
Changes from 4 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions docs/changelog/123630.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
pr: 123630
summary: Limit number of suppressed S3 deletion errors
area: Snapshot/Restore
type: bug
issues:
- 123354
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,6 @@
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.Executor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicReference;
import java.util.concurrent.atomic.LongAdder;
import java.util.stream.Collectors;

Expand All @@ -69,6 +68,8 @@ class S3BlobStore implements BlobStore {
*/
static final int MAX_BULK_DELETES = 1000;

static final int MAX_DELETE_EXCEPTIONS = 10;
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I've picked this randomly more or less. Not sure what it should be. I'm not sure having more than this is useful.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

👍 fine by me


private static final Logger logger = LogManager.getLogger(S3BlobStore.class);

private final S3Service service;
Expand Down Expand Up @@ -340,6 +341,18 @@ public BlobContainer blobContainer(BlobPath path) {
return new S3BlobContainer(path, this);
}

private static class DeletionExceptions {
private Exception exception = null;
private int count = 0;

void useOrMaybeSuppress(Exception e) {
if (count < MAX_DELETE_EXCEPTIONS) {
exception = ExceptionsHelper.useOrSuppress(exception, e);
count++;
}
}
}

void deleteBlobs(OperationPurpose purpose, Iterator<String> blobNames) throws IOException {
if (blobNames.hasNext() == false) {
return;
Expand All @@ -348,19 +361,19 @@ void deleteBlobs(OperationPurpose purpose, Iterator<String> blobNames) throws IO
final List<String> partition = new ArrayList<>();
try {
// S3 API only allows 1k blobs per delete so we split up the given blobs into requests of max. 1k deletes
final AtomicReference<Exception> aex = new AtomicReference<>();
final var deletionExceptions = new DeletionExceptions();
blobNames.forEachRemaining(key -> {
partition.add(key);
if (partition.size() == bulkDeletionBatchSize) {
deletePartition(purpose, partition, aex);
deletePartition(purpose, partition, deletionExceptions);
partition.clear();
}
});
if (partition.isEmpty() == false) {
deletePartition(purpose, partition, aex);
deletePartition(purpose, partition, deletionExceptions);
}
if (aex.get() != null) {
throw aex.get();
if (deletionExceptions.exception != null) {
throw deletionExceptions.exception;
}
} catch (Exception e) {
throw new IOException("Failed to delete blobs " + partition.stream().limit(10).toList(), e);
Expand All @@ -372,9 +385,9 @@ void deleteBlobs(OperationPurpose purpose, Iterator<String> blobNames) throws IO
*
* @param purpose The {@link OperationPurpose} of the deletion
* @param partition The list of blobs to delete
* @param aex A holder for any exception(s) thrown during the deletion
* @param deletionExceptions A holder for any exception(s) thrown during the deletion
*/
private void deletePartition(OperationPurpose purpose, List<String> partition, AtomicReference<Exception> aex) {
private void deletePartition(OperationPurpose purpose, List<String> partition, DeletionExceptions deletionExceptions) {
final Iterator<TimeValue> retries = retryThrottledDeleteBackoffPolicy.iterator();
int retryCounter = 0;
while (true) {
Expand All @@ -395,7 +408,7 @@ private void deletePartition(OperationPurpose purpose, List<String> partition, A
),
e
);
aex.set(ExceptionsHelper.useOrSuppress(aex.get(), e));
deletionExceptions.useOrMaybeSuppress(e);
return;
} catch (AmazonClientException e) {
if (shouldRetryDelete(purpose) && RetryUtils.isThrottlingException(e)) {
Expand All @@ -404,13 +417,13 @@ private void deletePartition(OperationPurpose purpose, List<String> partition, A
retryCounter++;
} else {
s3RepositoriesMetrics.retryDeletesHistogram().record(retryCounter);
aex.set(ExceptionsHelper.useOrSuppress(aex.get(), e));
deletionExceptions.useOrMaybeSuppress(e);
return;
}
} else {
// The AWS client threw any unexpected exception and did not execute the request at all so we do not
// remove any keys from the outstanding deletes set.
aex.set(ExceptionsHelper.useOrSuppress(aex.get(), e));
deletionExceptions.useOrMaybeSuppress(e);
return;
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,7 @@
import org.elasticsearch.telemetry.InstrumentType;
import org.elasticsearch.telemetry.Measurement;
import org.elasticsearch.telemetry.RecordingMeterRegistry;
import org.elasticsearch.test.ESTestCase;
import org.elasticsearch.watcher.ResourceWatcherService;
import org.hamcrest.Matcher;
import org.junit.After;
Expand Down Expand Up @@ -162,6 +163,16 @@ protected BlobContainer createBlobContainer(
final @Nullable TimeValue readTimeout,
final @Nullable Boolean disableChunkedEncoding,
final @Nullable ByteSizeValue bufferSize
) {
return createBlobContainer(maxRetries, readTimeout, disableChunkedEncoding, bufferSize, null);
}

protected BlobContainer createBlobContainer(
final @Nullable Integer maxRetries,
final @Nullable TimeValue readTimeout,
final @Nullable Boolean disableChunkedEncoding,
final @Nullable ByteSizeValue bufferSize,
final @Nullable Integer maxBulkDeletes
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'd rather just have the one method rather than having to work out which variant is being called (and what the implied default params) are in each case. I know it adds a bunch of noise here but it'll be better in the long run.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yeah, mostly because of the noise and the fact that this way of passing these arguments is terrible to extend. Speaking of the long-run, how about replacing these with a parameter object?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Some kind of builder would seem in order here, but maybe that can be deferred to a followup. Or do that first, reducing the noise in this PR. But either way, let's not have two methods that do almost the same thing.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

) {
final Settings.Builder clientSettings = Settings.builder();
final String clientName = randomAlphaOfLength(5).toLowerCase(Locale.ROOT);
Expand Down Expand Up @@ -192,14 +203,13 @@ protected BlobContainer createBlobContainer(
clientSettings.setSecureSettings(secureSettings);
service.refreshAndClearCache(S3ClientSettings.load(clientSettings.build()));

final RepositoryMetadata repositoryMetadata = new RepositoryMetadata(
"repository",
S3Repository.TYPE,
Settings.builder()
.put(S3Repository.CLIENT_NAME.getKey(), clientName)
.put(S3Repository.GET_REGISTER_RETRY_DELAY.getKey(), TimeValue.ZERO)
.build()
);
final var repositorySettings = Settings.builder()
.put(S3Repository.CLIENT_NAME.getKey(), clientName)
.put(S3Repository.GET_REGISTER_RETRY_DELAY.getKey(), TimeValue.ZERO);
if (maxBulkDeletes != null) {
repositorySettings.put(S3Repository.DELETION_BATCH_SIZE_SETTING.getKey(), maxBulkDeletes);
}
final RepositoryMetadata repositoryMetadata = new RepositoryMetadata("repository", S3Repository.TYPE, repositorySettings.build());

final S3BlobStore s3BlobStore = new S3BlobStore(
service,
Expand Down Expand Up @@ -1073,6 +1083,38 @@ interface FailingHandlerFactory {
}
}

public void testSuppressedDeletionErrorsAreCapped() {
final TimeValue readTimeout = TimeValue.timeValueMillis(randomIntBetween(100, 500));
int maxBulkDeleteSize = randomIntBetween(1, 10);
final BlobContainer blobContainer = createBlobContainer(1, readTimeout, true, null, maxBulkDeleteSize);
httpServer.createContext("/", exchange -> {
if (exchange.getRequestMethod().equals("POST") && exchange.getRequestURI().toString().startsWith("/bucket/?delete")) {
exchange.sendResponseHeaders(
randomFrom(
HttpStatus.SC_INTERNAL_SERVER_ERROR,
HttpStatus.SC_BAD_GATEWAY,
HttpStatus.SC_SERVICE_UNAVAILABLE,
HttpStatus.SC_GATEWAY_TIMEOUT,
HttpStatus.SC_NOT_FOUND,
HttpStatus.SC_UNAUTHORIZED
),
-1
);
exchange.close();
} else {
fail("expected only deletions");
}
});
var maxNoOfDeletions = 2 * S3BlobStore.MAX_DELETE_EXCEPTIONS;
var blobs = randomList(1, maxNoOfDeletions * maxBulkDeleteSize, ESTestCase::randomIdentifier);
var exception = expectThrows(
IOException.class,
"deletion should not succeed",
() -> blobContainer.deleteBlobsIgnoringIfNotExists(randomPurpose(), blobs.iterator())
);
assertThat(exception.getCause().getSuppressed().length, lessThan(S3BlobStore.MAX_DELETE_EXCEPTIONS));
}

@Override
protected Matcher<Integer> getMaxRetriesMatcher(int maxRetries) {
// some attempts make meaningful progress and do not count towards the max retry limit
Expand Down
Loading