diff --git a/modules/repository-gcs/src/test/java/org/elasticsearch/repositories/gcs/GoogleCloudStorageBlobContainerRetriesTests.java b/modules/repository-gcs/src/test/java/org/elasticsearch/repositories/gcs/GoogleCloudStorageBlobContainerRetriesTests.java index 2e801be139499..f42dde4dcb091 100644 --- a/modules/repository-gcs/src/test/java/org/elasticsearch/repositories/gcs/GoogleCloudStorageBlobContainerRetriesTests.java +++ b/modules/repository-gcs/src/test/java/org/elasticsearch/repositories/gcs/GoogleCloudStorageBlobContainerRetriesTests.java @@ -126,6 +126,7 @@ protected BlobContainer createBlobContainer( final @Nullable Integer maxRetries, final @Nullable TimeValue readTimeout, final @Nullable Boolean disableChunkedEncoding, + final @Nullable Integer maxConnections, final @Nullable ByteSizeValue bufferSize, final @Nullable Integer maxBulkDeletes, final @Nullable BlobPath blobContainerPath @@ -228,7 +229,7 @@ public void testShouldRetryOnUnresolvableHost() { private void executeListBlobsAndAssertRetries() { final int maxRetries = randomIntBetween(3, 5); - final BlobContainer blobContainer = createBlobContainer(maxRetries, null, null, null, null, null); + final BlobContainer blobContainer = createBlobContainer(maxRetries, null, null, null, null, null, null); expectThrows(StorageException.class, () -> blobContainer.listBlobs(randomPurpose())); assertEquals(maxRetries + 1, requestCounters.get("/storage/v1/b/bucket/o").get()); } @@ -237,7 +238,7 @@ public void testReadLargeBlobWithRetries() throws Exception { final int maxRetries = randomIntBetween(2, 10); final AtomicInteger countDown = new AtomicInteger(maxRetries); - final BlobContainer blobContainer = createBlobContainer(maxRetries, null, null, null, null, null); + final BlobContainer blobContainer = createBlobContainer(maxRetries, null, null, null, null, null, null); // SDK reads in 2 MB chunks so we use twice that to simulate 2 chunks final byte[] bytes = randomBytes(1 << 22); @@ -266,7 +267,7 @@ public void testWriteBlobWithRetries() throws Exception { final int maxRetries = randomIntBetween(2, 10); final CountDown countDown = new CountDown(maxRetries); - final BlobContainer blobContainer = createBlobContainer(maxRetries, null, null, null, null, null); + final BlobContainer blobContainer = createBlobContainer(maxRetries, null, null, null, null, null, null); final byte[] bytes = randomBlobContent(); httpServer.createContext("/upload/storage/v1/b/bucket/o", safeHandler(exchange -> { assertThat(exchange.getRequestURI().getQuery(), containsString("uploadType=multipart")); @@ -308,7 +309,7 @@ public void testWriteBlobWithRetries() throws Exception { public void testWriteBlobWithReadTimeouts() { final byte[] bytes = randomByteArrayOfLength(randomIntBetween(10, 128)); final TimeValue readTimeout = TimeValue.timeValueMillis(randomIntBetween(100, 500)); - final BlobContainer blobContainer = createBlobContainer(1, readTimeout, null, null, null, null); + final BlobContainer blobContainer = createBlobContainer(1, readTimeout, null, null, null, null, null); // HTTP server does not send a response httpServer.createContext("/upload/storage/v1/b/bucket/o", exchange -> { @@ -360,7 +361,7 @@ public void testWriteLargeBlob() throws IOException { logger.debug("starting with resumable upload id [{}]", sessionUploadId.get()); final TimeValue readTimeout = allowReadTimeout.get() ? TimeValue.timeValueSeconds(3) : null; - final BlobContainer blobContainer = createBlobContainer(nbErrors + 1, readTimeout, null, null, null, null); + final BlobContainer blobContainer = createBlobContainer(nbErrors + 1, readTimeout, null, null, null, null, null); httpServer.createContext("/upload/storage/v1/b/bucket/o", safeHandler(exchange -> { final BytesReference requestBody = Streams.readFully(exchange.getRequestBody()); @@ -507,7 +508,7 @@ public String next() { return Integer.toString(totalDeletesSent++); } }; - final BlobContainer blobContainer = createBlobContainer(1, null, null, null, null, null); + final BlobContainer blobContainer = createBlobContainer(1, null, null, null, null, null, null); httpServer.createContext("/batch/storage/v1", safeHandler(exchange -> { assert pendingDeletes.get() <= MAX_DELETES_PER_BATCH; @@ -543,7 +544,7 @@ public void testCompareAndExchangeWhenThrottled() throws IOException { httpServer.createContext("/", new ResponseInjectingHttpHandler(requestHandlers, new GoogleCloudStorageHttpHandler("bucket"))); final int maxRetries = randomIntBetween(1, 3); - final BlobContainer container = createBlobContainer(maxRetries, null, null, null, null, null); + final BlobContainer container = createBlobContainer(maxRetries, null, null, null, null, null, null); final byte[] data = randomBytes(randomIntBetween(1, BlobContainerUtils.MAX_REGISTER_CONTENT_LENGTH)); final String key = randomIdentifier(); diff --git a/modules/repository-s3/src/test/java/org/elasticsearch/repositories/s3/S3BlobContainerRetriesTests.java b/modules/repository-s3/src/test/java/org/elasticsearch/repositories/s3/S3BlobContainerRetriesTests.java index 7b3d96b878ac3..cce6e14c9dbb4 100644 --- a/modules/repository-s3/src/test/java/org/elasticsearch/repositories/s3/S3BlobContainerRetriesTests.java +++ b/modules/repository-s3/src/test/java/org/elasticsearch/repositories/s3/S3BlobContainerRetriesTests.java @@ -89,6 +89,7 @@ import static org.elasticsearch.repositories.blobstore.BlobStoreTestUtil.randomPurpose; import static org.elasticsearch.repositories.s3.S3ClientSettings.DISABLE_CHUNKED_ENCODING; import static org.elasticsearch.repositories.s3.S3ClientSettings.ENDPOINT_SETTING; +import static org.elasticsearch.repositories.s3.S3ClientSettings.MAX_CONNECTIONS_SETTING; import static org.elasticsearch.repositories.s3.S3ClientSettings.MAX_RETRIES_SETTING; import static org.elasticsearch.repositories.s3.S3ClientSettings.READ_TIMEOUT_SETTING; import static org.hamcrest.Matchers.allOf; @@ -163,6 +164,7 @@ protected BlobContainer createBlobContainer( final @Nullable Integer maxRetries, final @Nullable TimeValue readTimeout, final @Nullable Boolean disableChunkedEncoding, + final @Nullable Integer maxConnections, final @Nullable ByteSizeValue bufferSize, final @Nullable Integer maxBulkDeletes, final @Nullable BlobPath blobContainerPath @@ -183,6 +185,9 @@ protected BlobContainer createBlobContainer( if (disableChunkedEncoding != null) { clientSettings.put(DISABLE_CHUNKED_ENCODING.getConcreteSettingForNamespace(clientName).getKey(), disableChunkedEncoding); } + if (maxConnections != null) { + clientSettings.put(MAX_CONNECTIONS_SETTING.getConcreteSettingForNamespace(clientName).getKey(), maxConnections); + } final MockSecureSettings secureSettings = new MockSecureSettings(); secureSettings.setString( @@ -265,7 +270,7 @@ public void testWriteBlobWithRetries() throws Exception { final int maxRetries = randomInt(5); final CountDown countDown = new CountDown(maxRetries + 1); - final BlobContainer blobContainer = createBlobContainer(maxRetries, null, true, null, null, null); + final BlobContainer blobContainer = createBlobContainer(maxRetries, null, true, null, null, null, null); final byte[] bytes = randomBlobContent(); httpServer.createContext(downloadStorageEndpoint(blobContainer, "write_blob_max_retries"), exchange -> { @@ -313,7 +318,7 @@ public void testWriteBlobWithRetries() throws Exception { public void testWriteBlobWithReadTimeouts() { final byte[] bytes = randomByteArrayOfLength(randomIntBetween(10, 128)); final TimeValue readTimeout = TimeValue.timeValueMillis(randomIntBetween(100, 500)); - final BlobContainer blobContainer = createBlobContainer(1, readTimeout, true, null, null, null); + final BlobContainer blobContainer = createBlobContainer(1, readTimeout, true, null, null, null, null); // HTTP server does not send a response httpServer.createContext(downloadStorageEndpoint(blobContainer, "write_blob_timeout"), exchange -> { @@ -347,7 +352,7 @@ public void testWriteLargeBlob() throws Exception { final boolean useTimeout = rarely(); final TimeValue readTimeout = useTimeout ? TimeValue.timeValueMillis(randomIntBetween(100, 500)) : null; final ByteSizeValue bufferSize = ByteSizeValue.of(5, ByteSizeUnit.MB); - final BlobContainer blobContainer = createBlobContainer(null, readTimeout, true, bufferSize, null, null); + final BlobContainer blobContainer = createBlobContainer(null, readTimeout, true, null, bufferSize, null, null); final int parts = randomIntBetween(1, 5); final long lastPartSize = randomLongBetween(10, 512); @@ -443,7 +448,7 @@ public void testWriteLargeBlobStreaming() throws Exception { final boolean useTimeout = rarely(); final TimeValue readTimeout = useTimeout ? TimeValue.timeValueMillis(randomIntBetween(100, 500)) : null; final ByteSizeValue bufferSize = ByteSizeValue.of(5, ByteSizeUnit.MB); - final BlobContainer blobContainer = createBlobContainer(null, readTimeout, true, bufferSize, null, null); + final BlobContainer blobContainer = createBlobContainer(null, readTimeout, true, null, bufferSize, null, null); final int parts = randomIntBetween(1, 5); final long lastPartSize = randomLongBetween(10, 512); @@ -552,7 +557,15 @@ public void testReadRetriesAfterMeaningfulProgress() throws Exception { 0, randomFrom(1000, Math.toIntExact(S3Repository.BUFFER_SIZE_SETTING.get(Settings.EMPTY).getBytes())) ); - final BlobContainer blobContainer = createBlobContainer(maxRetries, null, true, ByteSizeValue.ofBytes(bufferSizeBytes), null, null); + final BlobContainer blobContainer = createBlobContainer( + maxRetries, + null, + true, + null, + ByteSizeValue.ofBytes(bufferSizeBytes), + null, + null + ); final int meaningfulProgressBytes = Math.max(1, bufferSizeBytes / 100); final byte[] bytes = randomBlobContent(); @@ -625,7 +638,15 @@ public void testReadDoesNotRetryForRepositoryAnalysis() { 0, randomFrom(1000, Math.toIntExact(S3Repository.BUFFER_SIZE_SETTING.get(Settings.EMPTY).getBytes())) ); - final BlobContainer blobContainer = createBlobContainer(maxRetries, null, true, ByteSizeValue.ofBytes(bufferSizeBytes), null, null); + final BlobContainer blobContainer = createBlobContainer( + maxRetries, + null, + true, + null, + ByteSizeValue.ofBytes(bufferSizeBytes), + null, + null + ); final byte[] bytes = randomBlobContent(); @@ -663,7 +684,15 @@ public void testReadWithIndicesPurposeRetriesForever() throws IOException { 0, randomFrom(1000, Math.toIntExact(S3Repository.BUFFER_SIZE_SETTING.get(Settings.EMPTY).getBytes())) ); - final BlobContainer blobContainer = createBlobContainer(maxRetries, null, true, ByteSizeValue.ofBytes(bufferSizeBytes), null, null); + final BlobContainer blobContainer = createBlobContainer( + maxRetries, + null, + true, + null, + ByteSizeValue.ofBytes(bufferSizeBytes), + null, + null + ); final int meaningfulProgressBytes = Math.max(1, bufferSizeBytes / 100); final byte[] bytes = randomBlobContent(512); @@ -756,7 +785,7 @@ public void handle(HttpExchange exchange) throws IOException { public void testDoesNotRetryOnNotFound() { final int maxRetries = between(3, 5); - final BlobContainer blobContainer = createBlobContainer(maxRetries, null, true, null, null, null); + final BlobContainer blobContainer = createBlobContainer(maxRetries, null, true, null, null, null, null); final AtomicInteger numberOfReads = new AtomicInteger(0); @SuppressForbidden(reason = "use a http server") @@ -788,7 +817,7 @@ public void handle(HttpExchange exchange) throws IOException { public void testSnapshotDeletesRetryOnThrottlingError() throws IOException { // disable AWS-client retries - final BlobContainer blobContainer = createBlobContainer(0, null, true, null, null, null); + final BlobContainer blobContainer = createBlobContainer(0, null, true, null, null, null, null); int numBlobsToDelete = randomIntBetween(500, 3000); List blobsToDelete = new ArrayList<>(); @@ -808,7 +837,7 @@ public void testSnapshotDeletesRetryOnThrottlingError() throws IOException { public void testSnapshotDeletesAbortRetriesWhenThreadIsInterrupted() { // disable AWS-client retries - final BlobContainer blobContainer = createBlobContainer(0, null, true, null, null, null); + final BlobContainer blobContainer = createBlobContainer(0, null, true, null, null, null, null); int numBlobsToDelete = randomIntBetween(500, 3000); List blobsToDelete = new ArrayList<>(); @@ -845,7 +874,7 @@ public void testSnapshotDeletesAbortRetriesWhenThreadIsInterrupted() { public void testNonSnapshotDeletesAreNotRetried() { // disable AWS-client retries - final BlobContainer blobContainer = createBlobContainer(0, null, true, null, null, null); + final BlobContainer blobContainer = createBlobContainer(0, null, true, null, null, null, null); int numBlobsToDelete = randomIntBetween(500, 3000); List blobsToDelete = new ArrayList<>(); @@ -874,7 +903,7 @@ public void testNonSnapshotDeletesAreNotRetried() { public void testNonThrottlingErrorsAreNotRetried() { // disable AWS-client retries - final BlobContainer blobContainer = createBlobContainer(0, null, true, null, null, null); + final BlobContainer blobContainer = createBlobContainer(0, null, true, null, null, null, null); int numBlobsToDelete = randomIntBetween(500, 3000); List blobsToDelete = new ArrayList<>(); @@ -953,7 +982,7 @@ private Set operationPurposesThatRetryOnDelete() { public void testGetRegisterRetries() { final var maxRetries = between(0, 3); - final BlobContainer blobContainer = createBlobContainer(maxRetries, null, null, null, null, null); + final BlobContainer blobContainer = createBlobContainer(maxRetries, null, null, null, null, null, null); interface FailingHandlerFactory { void addHandler(String blobName, Integer... responseCodes); @@ -1023,7 +1052,7 @@ interface FailingHandlerFactory { public void testSuppressedDeletionErrorsAreCapped() { final TimeValue readTimeout = TimeValue.timeValueMillis(randomIntBetween(100, 500)); int maxBulkDeleteSize = randomIntBetween(1, 10); - final BlobContainer blobContainer = createBlobContainer(1, readTimeout, true, null, maxBulkDeleteSize, null); + final BlobContainer blobContainer = createBlobContainer(1, readTimeout, true, null, null, maxBulkDeleteSize, null); httpServer.createContext("/", exchange -> { if (isMultiDeleteRequest(exchange)) { exchange.sendResponseHeaders( @@ -1055,7 +1084,7 @@ public void testSuppressedDeletionErrorsAreCapped() { public void testTrimmedLogAndCappedSuppressedErrorOnMultiObjectDeletionException() { final TimeValue readTimeout = TimeValue.timeValueMillis(randomIntBetween(100, 500)); int maxBulkDeleteSize = randomIntBetween(10, 30); - final BlobContainer blobContainer = createBlobContainer(1, readTimeout, true, null, maxBulkDeleteSize, null); + final BlobContainer blobContainer = createBlobContainer(1, readTimeout, true, null, null, maxBulkDeleteSize, null); final Pattern pattern = Pattern.compile("(.+?)"); httpServer.createContext("/", exchange -> { diff --git a/modules/repository-url/src/test/java/org/elasticsearch/common/blobstore/url/URLBlobContainerRetriesTests.java b/modules/repository-url/src/test/java/org/elasticsearch/common/blobstore/url/URLBlobContainerRetriesTests.java index 4b0125d351943..e0df31e0d5297 100644 --- a/modules/repository-url/src/test/java/org/elasticsearch/common/blobstore/url/URLBlobContainerRetriesTests.java +++ b/modules/repository-url/src/test/java/org/elasticsearch/common/blobstore/url/URLBlobContainerRetriesTests.java @@ -77,6 +77,7 @@ protected BlobContainer createBlobContainer( Integer maxRetries, TimeValue readTimeout, Boolean disableChunkedEncoding, + Integer maxConnections, ByteSizeValue bufferSize, Integer maxBulkDeletes, BlobPath blobContainerPath diff --git a/test/framework/src/main/java/org/elasticsearch/repositories/blobstore/AbstractBlobContainerRetriesTestCase.java b/test/framework/src/main/java/org/elasticsearch/repositories/blobstore/AbstractBlobContainerRetriesTestCase.java index 6d9c41362e2e3..8fcee49478305 100644 --- a/test/framework/src/main/java/org/elasticsearch/repositories/blobstore/AbstractBlobContainerRetriesTestCase.java +++ b/test/framework/src/main/java/org/elasticsearch/repositories/blobstore/AbstractBlobContainerRetriesTestCase.java @@ -83,6 +83,7 @@ protected abstract BlobContainer createBlobContainer( @Nullable Integer maxRetries, @Nullable TimeValue readTimeout, @Nullable Boolean disableChunkedEncoding, + @Nullable Integer maxConnections, @Nullable ByteSizeValue bufferSize, @Nullable Integer maxBulkDeletes, @Nullable BlobPath blobContainerPath @@ -94,7 +95,7 @@ protected org.hamcrest.Matcher readTimeoutExceptionMatcher() { } public void testReadNonexistentBlobThrowsNoSuchFileException() { - final BlobContainer blobContainer = createBlobContainer(between(1, 5), null, null, null, null, null); + final BlobContainer blobContainer = createBlobContainer(between(1, 5), null, null, null, null, null, null); final long position = randomLongBetween(0, MAX_RANGE_VAL); final int length = randomIntBetween(1, Math.toIntExact(Math.min(Integer.MAX_VALUE, MAX_RANGE_VAL - position))); final Exception exception = expectThrows(NoSuchFileException.class, () -> { @@ -121,7 +122,7 @@ public void testReadBlobWithRetries() throws Exception { final byte[] bytes = randomBlobContent(); final TimeValue readTimeout = TimeValue.timeValueSeconds(between(1, 3)); - final BlobContainer blobContainer = createBlobContainer(maxRetries, readTimeout, null, null, null, null); + final BlobContainer blobContainer = createBlobContainer(maxRetries, readTimeout, null, null, null, null, null); httpServer.createContext(downloadStorageEndpoint(blobContainer, "read_blob_max_retries"), exchange -> { Streams.readFully(exchange.getRequestBody()); if (countDown.countDown()) { @@ -178,7 +179,7 @@ public void testReadRangeBlobWithRetries() throws Exception { final CountDown countDown = new CountDown(maxRetries + 1); final TimeValue readTimeout = TimeValue.timeValueSeconds(between(5, 10)); - final BlobContainer blobContainer = createBlobContainer(maxRetries, readTimeout, null, null, null, null); + final BlobContainer blobContainer = createBlobContainer(maxRetries, readTimeout, null, null, null, null, null); final byte[] bytes = randomBlobContent(); httpServer.createContext(downloadStorageEndpoint(blobContainer, "read_range_blob_max_retries"), exchange -> { Streams.readFully(exchange.getRequestBody()); @@ -250,7 +251,7 @@ public void testReadRangeBlobWithRetries() throws Exception { public void testReadBlobWithReadTimeouts() { final int maxRetries = randomInt(5); final TimeValue readTimeout = TimeValue.timeValueMillis(between(100, 200)); - final BlobContainer blobContainer = createBlobContainer(maxRetries, readTimeout, null, null, null, null); + final BlobContainer blobContainer = createBlobContainer(maxRetries, readTimeout, null, null, null, null, null); // HTTP server does not send a response httpServer.createContext(downloadStorageEndpoint(blobContainer, "read_blob_unresponsive"), exchange -> {}); @@ -307,7 +308,7 @@ protected OperationPurpose randomFiniteRetryingPurpose() { public void testReadBlobWithNoHttpResponse() { final TimeValue readTimeout = TimeValue.timeValueMillis(between(100, 200)); - final BlobContainer blobContainer = createBlobContainer(randomInt(5), readTimeout, null, null, null, null); + final BlobContainer blobContainer = createBlobContainer(randomInt(5), readTimeout, null, null, null, null, null); // HTTP server closes connection immediately httpServer.createContext(downloadStorageEndpoint(blobContainer, "read_blob_no_response"), HttpExchange::close); @@ -327,7 +328,7 @@ public void testReadBlobWithNoHttpResponse() { public void testReadBlobWithPrematureConnectionClose() { final int maxRetries = randomInt(20); - final BlobContainer blobContainer = createBlobContainer(maxRetries, null, null, null, null, null); + final BlobContainer blobContainer = createBlobContainer(maxRetries, null, null, null, null, null, null); final boolean alwaysFlushBody = randomBoolean();