diff --git a/modules/repository-gcs/src/test/java/org/elasticsearch/repositories/gcs/GoogleCloudStorageBlobContainerRetriesTests.java b/modules/repository-gcs/src/test/java/org/elasticsearch/repositories/gcs/GoogleCloudStorageBlobContainerRetriesTests.java index 7a7c1f6b5d723..70c2927245713 100644 --- a/modules/repository-gcs/src/test/java/org/elasticsearch/repositories/gcs/GoogleCloudStorageBlobContainerRetriesTests.java +++ b/modules/repository-gcs/src/test/java/org/elasticsearch/repositories/gcs/GoogleCloudStorageBlobContainerRetriesTests.java @@ -111,6 +111,7 @@ protected BlobContainer createBlobContainer( final @Nullable Integer maxRetries, final @Nullable TimeValue readTimeout, final @Nullable Boolean disableChunkedEncoding, + final @Nullable Integer maxConnections, final @Nullable ByteSizeValue bufferSize, final @Nullable Integer maxBulkDeletes, final @Nullable BlobPath blobContainerPath @@ -176,7 +177,7 @@ public void testReadLargeBlobWithRetries() throws Exception { final int maxRetries = randomIntBetween(2, 10); final AtomicInteger countDown = new AtomicInteger(maxRetries); - final BlobContainer blobContainer = createBlobContainer(maxRetries, null, null, null, null, null); + final BlobContainer blobContainer = blobContainerBuilder().maxRetries(maxRetries).build(); // SDK reads in 2 MB chunks so we use twice that to simulate 2 chunks final byte[] bytes = randomBytes(1 << 22); @@ -205,7 +206,7 @@ public void testWriteBlobWithRetries() throws Exception { final int maxRetries = randomIntBetween(2, 10); final CountDown countDown = new CountDown(maxRetries); - final BlobContainer blobContainer = createBlobContainer(maxRetries, null, null, null, null, null); + final BlobContainer blobContainer = blobContainerBuilder().maxRetries(maxRetries).build(); final byte[] bytes = randomBlobContent(); httpServer.createContext("/upload/storage/v1/b/bucket/o", safeHandler(exchange -> { assertThat(exchange.getRequestURI().getQuery(), containsString("uploadType=multipart")); @@ -247,7 +248,7 @@ public void testWriteBlobWithRetries() throws Exception { public void testWriteBlobWithReadTimeouts() { final byte[] bytes = randomByteArrayOfLength(randomIntBetween(10, 128)); final TimeValue readTimeout = TimeValue.timeValueMillis(randomIntBetween(100, 500)); - final BlobContainer blobContainer = createBlobContainer(1, readTimeout, null, null, null, null); + final BlobContainer blobContainer = blobContainerBuilder().maxRetries(1).readTimeout(readTimeout).build(); // HTTP server does not send a response httpServer.createContext("/upload/storage/v1/b/bucket/o", exchange -> { @@ -300,7 +301,7 @@ public void testWriteLargeBlob() throws IOException { logger.debug("starting with resumable upload id [{}]", sessionUploadId.get()); final TimeValue readTimeout = allowReadTimeout.get() ? TimeValue.timeValueSeconds(3) : null; - final BlobContainer blobContainer = createBlobContainer(nbErrors + 1, readTimeout, null, null, null, null); + final BlobContainer blobContainer = blobContainerBuilder().maxRetries(nbErrors + 1).readTimeout(readTimeout).build(); httpServer.createContext("/upload/storage/v1/b/bucket/o", safeHandler(exchange -> { final BytesReference requestBody = Streams.readFully(exchange.getRequestBody()); @@ -440,7 +441,7 @@ public String next() { return Integer.toString(totalDeletesSent++); } }; - final BlobContainer blobContainer = createBlobContainer(1, null, null, null, null, null); + final BlobContainer blobContainer = blobContainerBuilder().maxRetries(1).build(); httpServer.createContext("/batch/storage/v1", safeHandler(exchange -> { assert pendingDeletes.get() <= MAX_DELETES_PER_BATCH; diff --git a/modules/repository-s3/src/test/java/org/elasticsearch/repositories/s3/S3BlobContainerRetriesTests.java b/modules/repository-s3/src/test/java/org/elasticsearch/repositories/s3/S3BlobContainerRetriesTests.java index c830b7ccb273b..c40685c26c3da 100644 --- a/modules/repository-s3/src/test/java/org/elasticsearch/repositories/s3/S3BlobContainerRetriesTests.java +++ b/modules/repository-s3/src/test/java/org/elasticsearch/repositories/s3/S3BlobContainerRetriesTests.java @@ -81,6 +81,7 @@ import static org.elasticsearch.repositories.blobstore.BlobStoreTestUtil.randomPurpose; import static org.elasticsearch.repositories.s3.S3ClientSettings.DISABLE_CHUNKED_ENCODING; import static org.elasticsearch.repositories.s3.S3ClientSettings.ENDPOINT_SETTING; +import static org.elasticsearch.repositories.s3.S3ClientSettings.MAX_CONNECTIONS_SETTING; import static org.elasticsearch.repositories.s3.S3ClientSettings.MAX_RETRIES_SETTING; import static org.elasticsearch.repositories.s3.S3ClientSettings.READ_TIMEOUT_SETTING; import static org.hamcrest.Matchers.allOf; @@ -154,6 +155,7 @@ protected BlobContainer createBlobContainer( final @Nullable Integer maxRetries, final @Nullable TimeValue readTimeout, final @Nullable Boolean disableChunkedEncoding, + final @Nullable Integer maxConnections, final @Nullable ByteSizeValue bufferSize, final @Nullable Integer maxBulkDeletes, final @Nullable BlobPath blobContainerPath @@ -174,6 +176,9 @@ protected BlobContainer createBlobContainer( if (disableChunkedEncoding != null) { clientSettings.put(DISABLE_CHUNKED_ENCODING.getConcreteSettingForNamespace(clientName).getKey(), disableChunkedEncoding); } + if (maxConnections != null) { + clientSettings.put(MAX_CONNECTIONS_SETTING.getConcreteSettingForNamespace(clientName).getKey(), maxConnections); + } final MockSecureSettings secureSettings = new MockSecureSettings(); secureSettings.setString( @@ -253,7 +258,7 @@ public void testWriteBlobWithRetries() throws Exception { final int maxRetries = randomInt(5); final CountDown countDown = new CountDown(maxRetries + 1); - final BlobContainer blobContainer = createBlobContainer(maxRetries, null, true, null, null, null); + final BlobContainer blobContainer = blobContainerBuilder().maxRetries(maxRetries).disableChunkedEncoding(true).build(); final byte[] bytes = randomBlobContent(); httpServer.createContext(downloadStorageEndpoint(blobContainer, "write_blob_max_retries"), exchange -> { @@ -301,7 +306,10 @@ public void testWriteBlobWithRetries() throws Exception { public void testWriteBlobWithReadTimeouts() { final byte[] bytes = randomByteArrayOfLength(randomIntBetween(10, 128)); final TimeValue readTimeout = TimeValue.timeValueMillis(randomIntBetween(100, 500)); - final BlobContainer blobContainer = createBlobContainer(1, readTimeout, true, null, null, null); + final BlobContainer blobContainer = blobContainerBuilder().maxRetries(1) + .readTimeout(readTimeout) + .disableChunkedEncoding(true) + .build(); // HTTP server does not send a response httpServer.createContext(downloadStorageEndpoint(blobContainer, "write_blob_timeout"), exchange -> { @@ -335,7 +343,10 @@ public void testWriteLargeBlob() throws Exception { final boolean useTimeout = rarely(); final TimeValue readTimeout = useTimeout ? TimeValue.timeValueMillis(randomIntBetween(100, 500)) : null; final ByteSizeValue bufferSize = new ByteSizeValue(5, ByteSizeUnit.MB); - final BlobContainer blobContainer = createBlobContainer(null, readTimeout, true, bufferSize, null, null); + final BlobContainer blobContainer = blobContainerBuilder().readTimeout(readTimeout) + .disableChunkedEncoding(true) + .bufferSize(bufferSize) + .build(); final int parts = randomIntBetween(1, 5); final long lastPartSize = randomLongBetween(10, 512); @@ -431,7 +442,10 @@ public void testWriteLargeBlobStreaming() throws Exception { final boolean useTimeout = rarely(); final TimeValue readTimeout = useTimeout ? TimeValue.timeValueMillis(randomIntBetween(100, 500)) : null; final ByteSizeValue bufferSize = new ByteSizeValue(5, ByteSizeUnit.MB); - final BlobContainer blobContainer = createBlobContainer(null, readTimeout, true, bufferSize, null, null); + final BlobContainer blobContainer = blobContainerBuilder().readTimeout(readTimeout) + .disableChunkedEncoding(true) + .bufferSize(bufferSize) + .build(); final int parts = randomIntBetween(1, 5); final long lastPartSize = randomLongBetween(10, 512); @@ -540,7 +554,10 @@ public void testReadRetriesAfterMeaningfulProgress() throws Exception { 0, randomFrom(1000, Math.toIntExact(S3Repository.BUFFER_SIZE_SETTING.get(Settings.EMPTY).getBytes())) ); - final BlobContainer blobContainer = createBlobContainer(maxRetries, null, true, ByteSizeValue.ofBytes(bufferSizeBytes), null, null); + final BlobContainer blobContainer = blobContainerBuilder().maxRetries(maxRetries) + .disableChunkedEncoding(true) + .bufferSize(ByteSizeValue.ofBytes(bufferSizeBytes)) + .build(); final int meaningfulProgressBytes = Math.max(1, bufferSizeBytes / 100); final byte[] bytes = randomBlobContent(); @@ -613,7 +630,10 @@ public void testReadDoesNotRetryForRepositoryAnalysis() { 0, randomFrom(1000, Math.toIntExact(S3Repository.BUFFER_SIZE_SETTING.get(Settings.EMPTY).getBytes())) ); - final BlobContainer blobContainer = createBlobContainer(maxRetries, null, true, ByteSizeValue.ofBytes(bufferSizeBytes), null, null); + final BlobContainer blobContainer = blobContainerBuilder().maxRetries(maxRetries) + .disableChunkedEncoding(true) + .bufferSize(ByteSizeValue.ofBytes(bufferSizeBytes)) + .build(); final byte[] bytes = randomBlobContent(); @@ -651,7 +671,10 @@ public void testReadWithIndicesPurposeRetriesForever() throws IOException { 0, randomFrom(1000, Math.toIntExact(S3Repository.BUFFER_SIZE_SETTING.get(Settings.EMPTY).getBytes())) ); - final BlobContainer blobContainer = createBlobContainer(maxRetries, null, true, ByteSizeValue.ofBytes(bufferSizeBytes), null, null); + final BlobContainer blobContainer = blobContainerBuilder().maxRetries(maxRetries) + .disableChunkedEncoding(true) + .bufferSize(ByteSizeValue.ofBytes(bufferSizeBytes)) + .build(); final int meaningfulProgressBytes = Math.max(1, bufferSizeBytes / 100); final byte[] bytes = randomBlobContent(512); @@ -744,7 +767,7 @@ public void handle(HttpExchange exchange) throws IOException { public void testDoesNotRetryOnNotFound() { final int maxRetries = between(3, 5); - final BlobContainer blobContainer = createBlobContainer(maxRetries, null, true, null, null, null); + final BlobContainer blobContainer = blobContainerBuilder().maxRetries(maxRetries).disableChunkedEncoding(true).build(); final AtomicInteger numberOfReads = new AtomicInteger(0); @SuppressForbidden(reason = "use a http server") @@ -777,7 +800,11 @@ public void handle(HttpExchange exchange) throws IOException { public void testSuppressedDeletionErrorsAreCapped() { final TimeValue readTimeout = TimeValue.timeValueMillis(randomIntBetween(100, 500)); int maxBulkDeleteSize = randomIntBetween(1, 10); - final BlobContainer blobContainer = createBlobContainer(1, readTimeout, true, null, maxBulkDeleteSize, null); + final BlobContainer blobContainer = blobContainerBuilder().maxRetries(1) + .readTimeout(readTimeout) + .disableChunkedEncoding(true) + .maxBulkDeletes(maxBulkDeleteSize) + .build(); httpServer.createContext("/", exchange -> { if (isMultiDeleteRequest(exchange)) { exchange.sendResponseHeaders( @@ -809,7 +836,11 @@ public void testSuppressedDeletionErrorsAreCapped() { public void testTrimmedLogAndCappedSuppressedErrorOnMultiObjectDeletionException() { final TimeValue readTimeout = TimeValue.timeValueMillis(randomIntBetween(100, 500)); int maxBulkDeleteSize = randomIntBetween(10, 30); - final BlobContainer blobContainer = createBlobContainer(1, readTimeout, true, null, maxBulkDeleteSize, null); + final BlobContainer blobContainer = blobContainerBuilder().maxRetries(1) + .readTimeout(readTimeout) + .disableChunkedEncoding(true) + .maxBulkDeletes(maxBulkDeleteSize) + .build(); final Pattern pattern = Pattern.compile("(.+?)"); httpServer.createContext("/", exchange -> { diff --git a/modules/repository-url/src/test/java/org/elasticsearch/common/blobstore/url/URLBlobContainerRetriesTests.java b/modules/repository-url/src/test/java/org/elasticsearch/common/blobstore/url/URLBlobContainerRetriesTests.java index 4b0125d351943..e0df31e0d5297 100644 --- a/modules/repository-url/src/test/java/org/elasticsearch/common/blobstore/url/URLBlobContainerRetriesTests.java +++ b/modules/repository-url/src/test/java/org/elasticsearch/common/blobstore/url/URLBlobContainerRetriesTests.java @@ -77,6 +77,7 @@ protected BlobContainer createBlobContainer( Integer maxRetries, TimeValue readTimeout, Boolean disableChunkedEncoding, + Integer maxConnections, ByteSizeValue bufferSize, Integer maxBulkDeletes, BlobPath blobContainerPath diff --git a/test/framework/src/main/java/org/elasticsearch/repositories/blobstore/AbstractBlobContainerRetriesTestCase.java b/test/framework/src/main/java/org/elasticsearch/repositories/blobstore/AbstractBlobContainerRetriesTestCase.java index 6d9c41362e2e3..398a7b3db3eef 100644 --- a/test/framework/src/main/java/org/elasticsearch/repositories/blobstore/AbstractBlobContainerRetriesTestCase.java +++ b/test/framework/src/main/java/org/elasticsearch/repositories/blobstore/AbstractBlobContainerRetriesTestCase.java @@ -79,22 +79,13 @@ public void tearDown() throws Exception { protected abstract Class unresponsiveExceptionType(); - protected abstract BlobContainer createBlobContainer( - @Nullable Integer maxRetries, - @Nullable TimeValue readTimeout, - @Nullable Boolean disableChunkedEncoding, - @Nullable ByteSizeValue bufferSize, - @Nullable Integer maxBulkDeletes, - @Nullable BlobPath blobContainerPath - ); - protected org.hamcrest.Matcher readTimeoutExceptionMatcher() { return either(instanceOf(SocketTimeoutException.class)).or(instanceOf(ConnectionClosedException.class)) .or(instanceOf(RuntimeException.class)); } public void testReadNonexistentBlobThrowsNoSuchFileException() { - final BlobContainer blobContainer = createBlobContainer(between(1, 5), null, null, null, null, null); + final BlobContainer blobContainer = blobContainerBuilder().maxRetries(between(1, 5)).build(); final long position = randomLongBetween(0, MAX_RANGE_VAL); final int length = randomIntBetween(1, Math.toIntExact(Math.min(Integer.MAX_VALUE, MAX_RANGE_VAL - position))); final Exception exception = expectThrows(NoSuchFileException.class, () -> { @@ -121,7 +112,7 @@ public void testReadBlobWithRetries() throws Exception { final byte[] bytes = randomBlobContent(); final TimeValue readTimeout = TimeValue.timeValueSeconds(between(1, 3)); - final BlobContainer blobContainer = createBlobContainer(maxRetries, readTimeout, null, null, null, null); + final BlobContainer blobContainer = blobContainerBuilder().maxRetries(maxRetries).readTimeout(readTimeout).build(); httpServer.createContext(downloadStorageEndpoint(blobContainer, "read_blob_max_retries"), exchange -> { Streams.readFully(exchange.getRequestBody()); if (countDown.countDown()) { @@ -178,7 +169,7 @@ public void testReadRangeBlobWithRetries() throws Exception { final CountDown countDown = new CountDown(maxRetries + 1); final TimeValue readTimeout = TimeValue.timeValueSeconds(between(5, 10)); - final BlobContainer blobContainer = createBlobContainer(maxRetries, readTimeout, null, null, null, null); + final BlobContainer blobContainer = blobContainerBuilder().maxRetries(maxRetries).readTimeout(readTimeout).build(); final byte[] bytes = randomBlobContent(); httpServer.createContext(downloadStorageEndpoint(blobContainer, "read_range_blob_max_retries"), exchange -> { Streams.readFully(exchange.getRequestBody()); @@ -250,7 +241,7 @@ public void testReadRangeBlobWithRetries() throws Exception { public void testReadBlobWithReadTimeouts() { final int maxRetries = randomInt(5); final TimeValue readTimeout = TimeValue.timeValueMillis(between(100, 200)); - final BlobContainer blobContainer = createBlobContainer(maxRetries, readTimeout, null, null, null, null); + final BlobContainer blobContainer = blobContainerBuilder().maxRetries(maxRetries).readTimeout(readTimeout).build(); // HTTP server does not send a response httpServer.createContext(downloadStorageEndpoint(blobContainer, "read_blob_unresponsive"), exchange -> {}); @@ -307,7 +298,7 @@ protected OperationPurpose randomFiniteRetryingPurpose() { public void testReadBlobWithNoHttpResponse() { final TimeValue readTimeout = TimeValue.timeValueMillis(between(100, 200)); - final BlobContainer blobContainer = createBlobContainer(randomInt(5), readTimeout, null, null, null, null); + final BlobContainer blobContainer = blobContainerBuilder().maxRetries(randomInt(5)).readTimeout(readTimeout).build(); // HTTP server closes connection immediately httpServer.createContext(downloadStorageEndpoint(blobContainer, "read_blob_no_response"), HttpExchange::close); @@ -327,7 +318,7 @@ public void testReadBlobWithNoHttpResponse() { public void testReadBlobWithPrematureConnectionClose() { final int maxRetries = randomInt(20); - final BlobContainer blobContainer = createBlobContainer(maxRetries, null, null, null, null, null); + final BlobContainer blobContainer = blobContainerBuilder().maxRetries(maxRetries).build(); final boolean alwaysFlushBody = randomBoolean(); @@ -504,4 +495,89 @@ private void ensureOpen() throws IOException { } } } + + /** + * Method for subclasses to define how to create a {@link BlobContainer} with the given (optional) parameters. Callers should use + * {@link #blobContainerBuilder} to obtain a builder to construct the arguments to this method rather than calling it directly. + */ + protected abstract BlobContainer createBlobContainer( + @Nullable Integer maxRetries, + @Nullable TimeValue readTimeout, + @Nullable Boolean disableChunkedEncoding, + @Nullable Integer maxConnections, + @Nullable ByteSizeValue bufferSize, + @Nullable Integer maxBulkDeletes, + @Nullable BlobPath blobContainerPath + ); + + protected final class TestBlobContainerBuilder { + @Nullable + private Integer maxRetries; + @Nullable + private TimeValue readTimeout; + @Nullable + private Boolean disableChunkedEncoding; + @Nullable + private Integer maxConnections; + @Nullable + private ByteSizeValue bufferSize; + @Nullable + private Integer maxBulkDeletes; + @Nullable + private BlobPath blobContainerPath; + + public TestBlobContainerBuilder maxRetries(@Nullable Integer maxRetries) { + this.maxRetries = maxRetries; + return this; + } + + public TestBlobContainerBuilder readTimeout(@Nullable TimeValue readTimeout) { + this.readTimeout = readTimeout; + return this; + } + + public TestBlobContainerBuilder disableChunkedEncoding(@Nullable Boolean disableChunkedEncoding) { + this.disableChunkedEncoding = disableChunkedEncoding; + return this; + } + + public TestBlobContainerBuilder maxConnections(@Nullable Integer maxConnections) { + this.maxConnections = maxConnections; + return this; + } + + public TestBlobContainerBuilder bufferSize(@Nullable ByteSizeValue bufferSize) { + this.bufferSize = bufferSize; + return this; + } + + public TestBlobContainerBuilder maxBulkDeletes(@Nullable Integer maxBulkDeletes) { + this.maxBulkDeletes = maxBulkDeletes; + return this; + } + + public TestBlobContainerBuilder blobContainerPath(@Nullable BlobPath blobContainerPath) { + this.blobContainerPath = blobContainerPath; + return this; + } + + public BlobContainer build() { + return createBlobContainer( + maxRetries, + readTimeout, + disableChunkedEncoding, + maxConnections, + bufferSize, + maxBulkDeletes, + blobContainerPath + ); + } + } + + /** + * @return a {@link TestBlobContainerBuilder} to construct the arguments with which to call {@link #createBlobContainer}. + */ + protected final TestBlobContainerBuilder blobContainerBuilder() { + return new TestBlobContainerBuilder(); + } }