Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -111,6 +111,7 @@ protected BlobContainer createBlobContainer(
final @Nullable Integer maxRetries,
final @Nullable TimeValue readTimeout,
final @Nullable Boolean disableChunkedEncoding,
final @Nullable Integer maxConnections,
final @Nullable ByteSizeValue bufferSize,
final @Nullable Integer maxBulkDeletes,
final @Nullable BlobPath blobContainerPath
Expand Down Expand Up @@ -176,7 +177,7 @@ public void testReadLargeBlobWithRetries() throws Exception {
final int maxRetries = randomIntBetween(2, 10);
final AtomicInteger countDown = new AtomicInteger(maxRetries);

final BlobContainer blobContainer = createBlobContainer(maxRetries, null, null, null, null, null);
final BlobContainer blobContainer = blobContainerBuilder().maxRetries(maxRetries).build();

// SDK reads in 2 MB chunks so we use twice that to simulate 2 chunks
final byte[] bytes = randomBytes(1 << 22);
Expand Down Expand Up @@ -205,7 +206,7 @@ public void testWriteBlobWithRetries() throws Exception {
final int maxRetries = randomIntBetween(2, 10);
final CountDown countDown = new CountDown(maxRetries);

final BlobContainer blobContainer = createBlobContainer(maxRetries, null, null, null, null, null);
final BlobContainer blobContainer = blobContainerBuilder().maxRetries(maxRetries).build();
final byte[] bytes = randomBlobContent();
httpServer.createContext("/upload/storage/v1/b/bucket/o", safeHandler(exchange -> {
assertThat(exchange.getRequestURI().getQuery(), containsString("uploadType=multipart"));
Expand Down Expand Up @@ -247,7 +248,7 @@ public void testWriteBlobWithRetries() throws Exception {
public void testWriteBlobWithReadTimeouts() {
final byte[] bytes = randomByteArrayOfLength(randomIntBetween(10, 128));
final TimeValue readTimeout = TimeValue.timeValueMillis(randomIntBetween(100, 500));
final BlobContainer blobContainer = createBlobContainer(1, readTimeout, null, null, null, null);
final BlobContainer blobContainer = blobContainerBuilder().maxRetries(1).readTimeout(readTimeout).build();

// HTTP server does not send a response
httpServer.createContext("/upload/storage/v1/b/bucket/o", exchange -> {
Expand Down Expand Up @@ -300,7 +301,7 @@ public void testWriteLargeBlob() throws IOException {
logger.debug("starting with resumable upload id [{}]", sessionUploadId.get());

final TimeValue readTimeout = allowReadTimeout.get() ? TimeValue.timeValueSeconds(3) : null;
final BlobContainer blobContainer = createBlobContainer(nbErrors + 1, readTimeout, null, null, null, null);
final BlobContainer blobContainer = blobContainerBuilder().maxRetries(nbErrors + 1).readTimeout(readTimeout).build();

httpServer.createContext("/upload/storage/v1/b/bucket/o", safeHandler(exchange -> {
final BytesReference requestBody = Streams.readFully(exchange.getRequestBody());
Expand Down Expand Up @@ -440,7 +441,7 @@ public String next() {
return Integer.toString(totalDeletesSent++);
}
};
final BlobContainer blobContainer = createBlobContainer(1, null, null, null, null, null);
final BlobContainer blobContainer = blobContainerBuilder().maxRetries(1).build();
httpServer.createContext("/batch/storage/v1", safeHandler(exchange -> {
assert pendingDeletes.get() <= MAX_DELETES_PER_BATCH;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -81,6 +81,7 @@
import static org.elasticsearch.repositories.blobstore.BlobStoreTestUtil.randomPurpose;
import static org.elasticsearch.repositories.s3.S3ClientSettings.DISABLE_CHUNKED_ENCODING;
import static org.elasticsearch.repositories.s3.S3ClientSettings.ENDPOINT_SETTING;
import static org.elasticsearch.repositories.s3.S3ClientSettings.MAX_CONNECTIONS_SETTING;
import static org.elasticsearch.repositories.s3.S3ClientSettings.MAX_RETRIES_SETTING;
import static org.elasticsearch.repositories.s3.S3ClientSettings.READ_TIMEOUT_SETTING;
import static org.hamcrest.Matchers.allOf;
Expand Down Expand Up @@ -154,6 +155,7 @@ protected BlobContainer createBlobContainer(
final @Nullable Integer maxRetries,
final @Nullable TimeValue readTimeout,
final @Nullable Boolean disableChunkedEncoding,
final @Nullable Integer maxConnections,
final @Nullable ByteSizeValue bufferSize,
final @Nullable Integer maxBulkDeletes,
final @Nullable BlobPath blobContainerPath
Expand All @@ -174,6 +176,9 @@ protected BlobContainer createBlobContainer(
if (disableChunkedEncoding != null) {
clientSettings.put(DISABLE_CHUNKED_ENCODING.getConcreteSettingForNamespace(clientName).getKey(), disableChunkedEncoding);
}
if (maxConnections != null) {
clientSettings.put(MAX_CONNECTIONS_SETTING.getConcreteSettingForNamespace(clientName).getKey(), maxConnections);
}

final MockSecureSettings secureSettings = new MockSecureSettings();
secureSettings.setString(
Expand Down Expand Up @@ -253,7 +258,7 @@ public void testWriteBlobWithRetries() throws Exception {
final int maxRetries = randomInt(5);
final CountDown countDown = new CountDown(maxRetries + 1);

final BlobContainer blobContainer = createBlobContainer(maxRetries, null, true, null, null, null);
final BlobContainer blobContainer = blobContainerBuilder().maxRetries(maxRetries).disableChunkedEncoding(true).build();

final byte[] bytes = randomBlobContent();
httpServer.createContext(downloadStorageEndpoint(blobContainer, "write_blob_max_retries"), exchange -> {
Expand Down Expand Up @@ -301,7 +306,10 @@ public void testWriteBlobWithRetries() throws Exception {
public void testWriteBlobWithReadTimeouts() {
final byte[] bytes = randomByteArrayOfLength(randomIntBetween(10, 128));
final TimeValue readTimeout = TimeValue.timeValueMillis(randomIntBetween(100, 500));
final BlobContainer blobContainer = createBlobContainer(1, readTimeout, true, null, null, null);
final BlobContainer blobContainer = blobContainerBuilder().maxRetries(1)
.readTimeout(readTimeout)
.disableChunkedEncoding(true)
.build();

// HTTP server does not send a response
httpServer.createContext(downloadStorageEndpoint(blobContainer, "write_blob_timeout"), exchange -> {
Expand Down Expand Up @@ -335,7 +343,10 @@ public void testWriteLargeBlob() throws Exception {
final boolean useTimeout = rarely();
final TimeValue readTimeout = useTimeout ? TimeValue.timeValueMillis(randomIntBetween(100, 500)) : null;
final ByteSizeValue bufferSize = new ByteSizeValue(5, ByteSizeUnit.MB);
final BlobContainer blobContainer = createBlobContainer(null, readTimeout, true, bufferSize, null, null);
final BlobContainer blobContainer = blobContainerBuilder().readTimeout(readTimeout)
.disableChunkedEncoding(true)
.bufferSize(bufferSize)
.build();

final int parts = randomIntBetween(1, 5);
final long lastPartSize = randomLongBetween(10, 512);
Expand Down Expand Up @@ -431,7 +442,10 @@ public void testWriteLargeBlobStreaming() throws Exception {
final boolean useTimeout = rarely();
final TimeValue readTimeout = useTimeout ? TimeValue.timeValueMillis(randomIntBetween(100, 500)) : null;
final ByteSizeValue bufferSize = new ByteSizeValue(5, ByteSizeUnit.MB);
final BlobContainer blobContainer = createBlobContainer(null, readTimeout, true, bufferSize, null, null);
final BlobContainer blobContainer = blobContainerBuilder().readTimeout(readTimeout)
.disableChunkedEncoding(true)
.bufferSize(bufferSize)
.build();

final int parts = randomIntBetween(1, 5);
final long lastPartSize = randomLongBetween(10, 512);
Expand Down Expand Up @@ -540,7 +554,10 @@ public void testReadRetriesAfterMeaningfulProgress() throws Exception {
0,
randomFrom(1000, Math.toIntExact(S3Repository.BUFFER_SIZE_SETTING.get(Settings.EMPTY).getBytes()))
);
final BlobContainer blobContainer = createBlobContainer(maxRetries, null, true, ByteSizeValue.ofBytes(bufferSizeBytes), null, null);
final BlobContainer blobContainer = blobContainerBuilder().maxRetries(maxRetries)
.disableChunkedEncoding(true)
.bufferSize(ByteSizeValue.ofBytes(bufferSizeBytes))
.build();
final int meaningfulProgressBytes = Math.max(1, bufferSizeBytes / 100);

final byte[] bytes = randomBlobContent();
Expand Down Expand Up @@ -613,7 +630,10 @@ public void testReadDoesNotRetryForRepositoryAnalysis() {
0,
randomFrom(1000, Math.toIntExact(S3Repository.BUFFER_SIZE_SETTING.get(Settings.EMPTY).getBytes()))
);
final BlobContainer blobContainer = createBlobContainer(maxRetries, null, true, ByteSizeValue.ofBytes(bufferSizeBytes), null, null);
final BlobContainer blobContainer = blobContainerBuilder().maxRetries(maxRetries)
.disableChunkedEncoding(true)
.bufferSize(ByteSizeValue.ofBytes(bufferSizeBytes))
.build();

final byte[] bytes = randomBlobContent();

Expand Down Expand Up @@ -651,7 +671,10 @@ public void testReadWithIndicesPurposeRetriesForever() throws IOException {
0,
randomFrom(1000, Math.toIntExact(S3Repository.BUFFER_SIZE_SETTING.get(Settings.EMPTY).getBytes()))
);
final BlobContainer blobContainer = createBlobContainer(maxRetries, null, true, ByteSizeValue.ofBytes(bufferSizeBytes), null, null);
final BlobContainer blobContainer = blobContainerBuilder().maxRetries(maxRetries)
.disableChunkedEncoding(true)
.bufferSize(ByteSizeValue.ofBytes(bufferSizeBytes))
.build();
final int meaningfulProgressBytes = Math.max(1, bufferSizeBytes / 100);

final byte[] bytes = randomBlobContent(512);
Expand Down Expand Up @@ -744,7 +767,7 @@ public void handle(HttpExchange exchange) throws IOException {

public void testDoesNotRetryOnNotFound() {
final int maxRetries = between(3, 5);
final BlobContainer blobContainer = createBlobContainer(maxRetries, null, true, null, null, null);
final BlobContainer blobContainer = blobContainerBuilder().maxRetries(maxRetries).disableChunkedEncoding(true).build();

final AtomicInteger numberOfReads = new AtomicInteger(0);
@SuppressForbidden(reason = "use a http server")
Expand Down Expand Up @@ -777,7 +800,11 @@ public void handle(HttpExchange exchange) throws IOException {
public void testSuppressedDeletionErrorsAreCapped() {
final TimeValue readTimeout = TimeValue.timeValueMillis(randomIntBetween(100, 500));
int maxBulkDeleteSize = randomIntBetween(1, 10);
final BlobContainer blobContainer = createBlobContainer(1, readTimeout, true, null, maxBulkDeleteSize, null);
final BlobContainer blobContainer = blobContainerBuilder().maxRetries(1)
.readTimeout(readTimeout)
.disableChunkedEncoding(true)
.maxBulkDeletes(maxBulkDeleteSize)
.build();
httpServer.createContext("/", exchange -> {
if (isMultiDeleteRequest(exchange)) {
exchange.sendResponseHeaders(
Expand Down Expand Up @@ -809,7 +836,11 @@ public void testSuppressedDeletionErrorsAreCapped() {
public void testTrimmedLogAndCappedSuppressedErrorOnMultiObjectDeletionException() {
final TimeValue readTimeout = TimeValue.timeValueMillis(randomIntBetween(100, 500));
int maxBulkDeleteSize = randomIntBetween(10, 30);
final BlobContainer blobContainer = createBlobContainer(1, readTimeout, true, null, maxBulkDeleteSize, null);
final BlobContainer blobContainer = blobContainerBuilder().maxRetries(1)
.readTimeout(readTimeout)
.disableChunkedEncoding(true)
.maxBulkDeletes(maxBulkDeleteSize)
.build();

final Pattern pattern = Pattern.compile("<Key>(.+?)</Key>");
httpServer.createContext("/", exchange -> {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -77,6 +77,7 @@ protected BlobContainer createBlobContainer(
Integer maxRetries,
TimeValue readTimeout,
Boolean disableChunkedEncoding,
Integer maxConnections,
ByteSizeValue bufferSize,
Integer maxBulkDeletes,
BlobPath blobContainerPath
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -79,22 +79,13 @@ public void tearDown() throws Exception {

protected abstract Class<? extends Exception> unresponsiveExceptionType();

protected abstract BlobContainer createBlobContainer(
@Nullable Integer maxRetries,
@Nullable TimeValue readTimeout,
@Nullable Boolean disableChunkedEncoding,
@Nullable ByteSizeValue bufferSize,
@Nullable Integer maxBulkDeletes,
@Nullable BlobPath blobContainerPath
);

protected org.hamcrest.Matcher<Object> readTimeoutExceptionMatcher() {
return either(instanceOf(SocketTimeoutException.class)).or(instanceOf(ConnectionClosedException.class))
.or(instanceOf(RuntimeException.class));
}

public void testReadNonexistentBlobThrowsNoSuchFileException() {
final BlobContainer blobContainer = createBlobContainer(between(1, 5), null, null, null, null, null);
final BlobContainer blobContainer = blobContainerBuilder().maxRetries(between(1, 5)).build();
final long position = randomLongBetween(0, MAX_RANGE_VAL);
final int length = randomIntBetween(1, Math.toIntExact(Math.min(Integer.MAX_VALUE, MAX_RANGE_VAL - position)));
final Exception exception = expectThrows(NoSuchFileException.class, () -> {
Expand All @@ -121,7 +112,7 @@ public void testReadBlobWithRetries() throws Exception {

final byte[] bytes = randomBlobContent();
final TimeValue readTimeout = TimeValue.timeValueSeconds(between(1, 3));
final BlobContainer blobContainer = createBlobContainer(maxRetries, readTimeout, null, null, null, null);
final BlobContainer blobContainer = blobContainerBuilder().maxRetries(maxRetries).readTimeout(readTimeout).build();
httpServer.createContext(downloadStorageEndpoint(blobContainer, "read_blob_max_retries"), exchange -> {
Streams.readFully(exchange.getRequestBody());
if (countDown.countDown()) {
Expand Down Expand Up @@ -178,7 +169,7 @@ public void testReadRangeBlobWithRetries() throws Exception {
final CountDown countDown = new CountDown(maxRetries + 1);

final TimeValue readTimeout = TimeValue.timeValueSeconds(between(5, 10));
final BlobContainer blobContainer = createBlobContainer(maxRetries, readTimeout, null, null, null, null);
final BlobContainer blobContainer = blobContainerBuilder().maxRetries(maxRetries).readTimeout(readTimeout).build();
final byte[] bytes = randomBlobContent();
httpServer.createContext(downloadStorageEndpoint(blobContainer, "read_range_blob_max_retries"), exchange -> {
Streams.readFully(exchange.getRequestBody());
Expand Down Expand Up @@ -250,7 +241,7 @@ public void testReadRangeBlobWithRetries() throws Exception {
public void testReadBlobWithReadTimeouts() {
final int maxRetries = randomInt(5);
final TimeValue readTimeout = TimeValue.timeValueMillis(between(100, 200));
final BlobContainer blobContainer = createBlobContainer(maxRetries, readTimeout, null, null, null, null);
final BlobContainer blobContainer = blobContainerBuilder().maxRetries(maxRetries).readTimeout(readTimeout).build();

// HTTP server does not send a response
httpServer.createContext(downloadStorageEndpoint(blobContainer, "read_blob_unresponsive"), exchange -> {});
Expand Down Expand Up @@ -307,7 +298,7 @@ protected OperationPurpose randomFiniteRetryingPurpose() {

public void testReadBlobWithNoHttpResponse() {
final TimeValue readTimeout = TimeValue.timeValueMillis(between(100, 200));
final BlobContainer blobContainer = createBlobContainer(randomInt(5), readTimeout, null, null, null, null);
final BlobContainer blobContainer = blobContainerBuilder().maxRetries(randomInt(5)).readTimeout(readTimeout).build();

// HTTP server closes connection immediately
httpServer.createContext(downloadStorageEndpoint(blobContainer, "read_blob_no_response"), HttpExchange::close);
Expand All @@ -327,7 +318,7 @@ public void testReadBlobWithNoHttpResponse() {

public void testReadBlobWithPrematureConnectionClose() {
final int maxRetries = randomInt(20);
final BlobContainer blobContainer = createBlobContainer(maxRetries, null, null, null, null, null);
final BlobContainer blobContainer = blobContainerBuilder().maxRetries(maxRetries).build();

final boolean alwaysFlushBody = randomBoolean();

Expand Down Expand Up @@ -504,4 +495,89 @@ private void ensureOpen() throws IOException {
}
}
}

/**
* Method for subclasses to define how to create a {@link BlobContainer} with the given (optional) parameters. Callers should use
* {@link #blobContainerBuilder} to obtain a builder to construct the arguments to this method rather than calling it directly.
*/
protected abstract BlobContainer createBlobContainer(
@Nullable Integer maxRetries,
@Nullable TimeValue readTimeout,
@Nullable Boolean disableChunkedEncoding,
@Nullable Integer maxConnections,
@Nullable ByteSizeValue bufferSize,
@Nullable Integer maxBulkDeletes,
@Nullable BlobPath blobContainerPath
);

protected final class TestBlobContainerBuilder {
@Nullable
private Integer maxRetries;
@Nullable
private TimeValue readTimeout;
@Nullable
private Boolean disableChunkedEncoding;
@Nullable
private Integer maxConnections;
@Nullable
private ByteSizeValue bufferSize;
@Nullable
private Integer maxBulkDeletes;
@Nullable
private BlobPath blobContainerPath;

public TestBlobContainerBuilder maxRetries(@Nullable Integer maxRetries) {
this.maxRetries = maxRetries;
return this;
}

public TestBlobContainerBuilder readTimeout(@Nullable TimeValue readTimeout) {
this.readTimeout = readTimeout;
return this;
}

public TestBlobContainerBuilder disableChunkedEncoding(@Nullable Boolean disableChunkedEncoding) {
this.disableChunkedEncoding = disableChunkedEncoding;
return this;
}

public TestBlobContainerBuilder maxConnections(@Nullable Integer maxConnections) {
this.maxConnections = maxConnections;
return this;
}

public TestBlobContainerBuilder bufferSize(@Nullable ByteSizeValue bufferSize) {
this.bufferSize = bufferSize;
return this;
}

public TestBlobContainerBuilder maxBulkDeletes(@Nullable Integer maxBulkDeletes) {
this.maxBulkDeletes = maxBulkDeletes;
return this;
}

public TestBlobContainerBuilder blobContainerPath(@Nullable BlobPath blobContainerPath) {
this.blobContainerPath = blobContainerPath;
return this;
}

public BlobContainer build() {
return createBlobContainer(
maxRetries,
readTimeout,
disableChunkedEncoding,
maxConnections,
bufferSize,
maxBulkDeletes,
blobContainerPath
);
}
}

/**
* @return a {@link TestBlobContainerBuilder} to construct the arguments with which to call {@link #createBlobContainer}.
*/
protected final TestBlobContainerBuilder blobContainerBuilder() {
return new TestBlobContainerBuilder();
}
}