Skip to content
Merged
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -114,19 +114,19 @@ public class S3BlobContainerRetriesTests extends AbstractBlobContainerRetriesTes

private static final int MAX_NUMBER_SNAPSHOT_DELETE_RETRIES = 10;
private S3Service service;
private AtomicBoolean shouldErrorOnDns;
private volatile boolean shouldErrorOnDns;
private RecordingMeterRegistry recordingMeterRegistry;

@Before
public void setUp() throws Exception {
shouldErrorOnDns = new AtomicBoolean(false);
shouldErrorOnDns = false;
service = new S3Service(Mockito.mock(Environment.class), Settings.EMPTY, Mockito.mock(ResourceWatcherService.class)) {
@Override
protected AmazonS3ClientBuilder buildClientBuilder(S3ClientSettings clientSettings) {
final AmazonS3ClientBuilder builder = super.buildClientBuilder(clientSettings);
final DnsResolver defaultDnsResolver = builder.getClientConfiguration().getDnsResolver();
builder.getClientConfiguration().setDnsResolver(host -> {
if (shouldErrorOnDns.get() && randomBoolean() && randomBoolean()) {
if (shouldErrorOnDns && randomBoolean() && randomBoolean()) {
throw new UnknownHostException(host);
}
return defaultDnsResolver.resolve(host);
Expand Down Expand Up @@ -667,7 +667,7 @@ public void testReadWithIndicesPurposeRetriesForever() throws IOException {

final byte[] bytes = randomBlobContent(512);

shouldErrorOnDns.set(true);
shouldErrorOnDns = true;
final AtomicInteger failures = new AtomicInteger();
@SuppressForbidden(reason = "use a http server")
class FlakyReadHandler implements HttpHandler {
Expand Down Expand Up @@ -895,7 +895,7 @@ private int expectedNumberOfBatches(int blobsToDelete) {
}

@SuppressForbidden(reason = "use a http server")
private class ThrottlingDeleteHandler extends S3HttpHandler {
private static class ThrottlingDeleteHandler extends S3HttpHandler {

private static final String THROTTLING_ERROR_CODE = "SlowDown";

Expand All @@ -920,7 +920,7 @@ private class ThrottlingDeleteHandler extends S3HttpHandler {

@Override
public void handle(HttpExchange exchange) throws IOException {
if (exchange.getRequestMethod().equals("POST") && exchange.getRequestURI().toString().startsWith("/bucket/?delete")) {
if (isMultiDeleteRequest(exchange)) {
onAttemptCallback.accept(numberOfDeleteAttempts.get());
numberOfDeleteAttempts.incrementAndGet();
if (throttleTimesBeforeSuccess.getAndDecrement() > 0) {
Expand Down Expand Up @@ -1024,7 +1024,7 @@ public void testSuppressedDeletionErrorsAreCapped() {
int maxBulkDeleteSize = randomIntBetween(1, 10);
final BlobContainer blobContainer = createBlobContainer(1, readTimeout, true, null, maxBulkDeleteSize);
httpServer.createContext("/", exchange -> {
if (exchange.getRequestMethod().equals("POST") && exchange.getRequestURI().toString().startsWith("/bucket/?delete")) {
if (isMultiDeleteRequest(exchange)) {
exchange.sendResponseHeaders(
randomFrom(
HttpStatus.SC_INTERNAL_SERVER_ERROR,
Expand Down Expand Up @@ -1058,7 +1058,7 @@ public void testTrimmedLogAndCappedSuppressedErrorOnMultiObjectDeletionException

final Pattern pattern = Pattern.compile("<Key>(.+?)</Key>");
httpServer.createContext("/", exchange -> {
if (exchange.getRequestMethod().equals("POST") && exchange.getRequestURI().toString().startsWith("/bucket/?delete")) {
if (isMultiDeleteRequest(exchange)) {
final String requestBody = Streams.copyToString(new InputStreamReader(exchange.getRequestBody(), StandardCharsets.UTF_8));
final var matcher = pattern.matcher(requestBody);
final StringBuilder deletes = new StringBuilder();
Expand Down Expand Up @@ -1195,6 +1195,10 @@ private Map<String, Object> metricAttributes(String action) {
return Map.of("repo_type", "s3", "repo_name", "repository", "operation", "GetObject", "purpose", "Indices", "action", action);
}

private static boolean isMultiDeleteRequest(HttpExchange exchange) {
return new S3HttpHandler("bucket").parseRequest(exchange).isMultiObjectDeleteRequest();
}

/**
* Asserts that an InputStream is fully consumed, or aborted, when it is closed
*/
Expand Down