From a9650d7d6c4b1a003c06cc33811e52c9ab22ea53 Mon Sep 17 00:00:00 2001 From: tlrx Date: Wed, 26 Feb 2025 15:53:45 +0100 Subject: [PATCH 1/2] [Test] Add test for ES-10931 --- .../s3/S3BlobContainerRetriesTests.java | 144 ++++++++++++------ 1 file changed, 99 insertions(+), 45 deletions(-) diff --git a/modules/repository-s3/src/test/java/org/elasticsearch/repositories/s3/S3BlobContainerRetriesTests.java b/modules/repository-s3/src/test/java/org/elasticsearch/repositories/s3/S3BlobContainerRetriesTests.java index fc9b2141a30a6..4d73ee324992a 100644 --- a/modules/repository-s3/src/test/java/org/elasticsearch/repositories/s3/S3BlobContainerRetriesTests.java +++ b/modules/repository-s3/src/test/java/org/elasticsearch/repositories/s3/S3BlobContainerRetriesTests.java @@ -21,6 +21,8 @@ import com.sun.net.httpserver.HttpHandler; import org.apache.http.HttpStatus; +import org.apache.lucene.index.CorruptIndexException; +import org.apache.lucene.store.AlreadyClosedException; import org.elasticsearch.ExceptionsHelper; import org.elasticsearch.cluster.metadata.RepositoryMetadata; import org.elasticsearch.common.BackoffPolicy; @@ -78,6 +80,7 @@ import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicLong; +import java.util.concurrent.atomic.AtomicReference; import java.util.function.IntConsumer; import static org.elasticsearch.repositories.blobstore.BlobStoreTestUtil.randomNonDataPurpose; @@ -332,6 +335,57 @@ public void testWriteBlobWithReadTimeouts() { assertThat(exception.getCause().getCause().getMessage().toLowerCase(Locale.ROOT), containsString("read timed out")); } + /** + * This test shows that the AWS SDKv1 defers the closing of the InputStream used to upload a blob after the HTTP request has been sent + * to S3, swallowing any exception thrown at closing time. + */ + public void testWriteBlobWithExceptionThrownAtClosingTime() throws Exception { + var maxRetries = randomInt(3); + var blobLength = randomIntBetween(1, 4096 * 3); + var blobName = getTestName().toLowerCase(Locale.ROOT); + var blobContainer = createBlobContainer(maxRetries, null, true, null); + + var uploadedBytes = new AtomicReference(); + httpServer.createContext(downloadStorageEndpoint(blobContainer, blobName), exchange -> { + var requestComponents = S3HttpHandler.parseRequestComponents(S3HttpHandler.getRawRequestString(exchange)); + if ("PUT".equals(requestComponents.method()) && requestComponents.query().isEmpty()) { + var body = Streams.readFully(exchange.getRequestBody()); + if (uploadedBytes.compareAndSet(null, body)) { + exchange.sendResponseHeaders(HttpStatus.SC_OK, -1); + exchange.close(); + return; + } + } + exchange.sendResponseHeaders(HttpStatus.SC_BAD_REQUEST, -1); + exchange.close(); + }); + + final byte[] bytes = randomByteArrayOfLength(blobLength); + + var exceptionThrown = new AtomicBoolean(); + blobContainer.writeBlobAtomic(randomPurpose(), blobName, new FilterInputStream(new ByteArrayInputStream(bytes)) { + @Override + public void close() throws IOException { + if (exceptionThrown.compareAndSet(false, true)) { + switch (randomInt(3)) { + case 0: + throw new CorruptIndexException("simulated", blobName); + case 1: + throw new AlreadyClosedException("simulated"); + case 2: + throw new RuntimeException("simulated"); + case 3: + default: + throw new IOException("simulated"); + } + } + } + }, blobLength, true); + + assertThat(exceptionThrown.get(), is(true)); + assertArrayEquals(bytes, BytesReference.toBytes(uploadedBytes.get())); + } + public void testWriteLargeBlob() throws Exception { final boolean useTimeout = rarely(); final TimeValue readTimeout = useTimeout ? TimeValue.timeValueMillis(randomIntBetween(100, 500)) : null; @@ -372,36 +426,36 @@ public void testWriteLargeBlob() throws Exception { } else if ("PUT".equals(requestComponents.method()) && requestComponents.query().contains("uploadId=TEST") && requestComponents.query().contains("partNumber=")) { - // upload part request - MD5DigestCalculatingInputStream md5 = new MD5DigestCalculatingInputStream(exchange.getRequestBody()); - BytesReference bytes = Streams.readFully(md5); - assertThat((long) bytes.length(), anyOf(equalTo(lastPartSize), equalTo(bufferSize.getBytes()))); - assertThat(contentLength, anyOf(equalTo(lastPartSize), equalTo(bufferSize.getBytes()))); - - if (countDownUploads.decrementAndGet() % 2 == 0) { - exchange.getResponseHeaders().add("ETag", Base16.encodeAsString(md5.getMd5Digest())); - exchange.sendResponseHeaders(HttpStatus.SC_OK, -1); - exchange.close(); - return; - } + // upload part request + MD5DigestCalculatingInputStream md5 = new MD5DigestCalculatingInputStream(exchange.getRequestBody()); + BytesReference bytes = Streams.readFully(md5); + assertThat((long) bytes.length(), anyOf(equalTo(lastPartSize), equalTo(bufferSize.getBytes()))); + assertThat(contentLength, anyOf(equalTo(lastPartSize), equalTo(bufferSize.getBytes()))); + + if (countDownUploads.decrementAndGet() % 2 == 0) { + exchange.getResponseHeaders().add("ETag", Base16.encodeAsString(md5.getMd5Digest())); + exchange.sendResponseHeaders(HttpStatus.SC_OK, -1); + exchange.close(); + return; + } - } else if ("POST".equals(requestComponents.method()) && requestComponents.query().equals("uploadId=TEST")) { - // complete multipart upload request - if (countDownComplete.countDown()) { - Streams.readFully(exchange.getRequestBody()); - byte[] response = (""" + } else if ("POST".equals(requestComponents.method()) && requestComponents.query().equals("uploadId=TEST")) { + // complete multipart upload request + if (countDownComplete.countDown()) { + Streams.readFully(exchange.getRequestBody()); + byte[] response = (""" bucket write_large_blob """).getBytes(StandardCharsets.UTF_8); - exchange.getResponseHeaders().add("Content-Type", "application/xml"); - exchange.sendResponseHeaders(HttpStatus.SC_OK, response.length); - exchange.getResponseBody().write(response); - exchange.close(); - return; - } + exchange.getResponseHeaders().add("Content-Type", "application/xml"); + exchange.sendResponseHeaders(HttpStatus.SC_OK, response.length); + exchange.getResponseBody().write(response); + exchange.close(); + return; } + } // sends an error back or let the request time out if (useTimeout == false) { @@ -474,35 +528,35 @@ public void testWriteLargeBlobStreaming() throws Exception { } else if ("PUT".equals(requestComponents.method()) && requestComponents.query().contains("uploadId=TEST") && requestComponents.query().contains("partNumber=")) { - // upload part request - MD5DigestCalculatingInputStream md5 = new MD5DigestCalculatingInputStream(exchange.getRequestBody()); - BytesReference bytes = Streams.readFully(md5); - - if (counterUploads.incrementAndGet() % 2 == 0) { - bytesReceived.addAndGet(bytes.length()); - exchange.getResponseHeaders().add("ETag", Base16.encodeAsString(md5.getMd5Digest())); - exchange.sendResponseHeaders(HttpStatus.SC_OK, -1); - exchange.close(); - return; - } + // upload part request + MD5DigestCalculatingInputStream md5 = new MD5DigestCalculatingInputStream(exchange.getRequestBody()); + BytesReference bytes = Streams.readFully(md5); + + if (counterUploads.incrementAndGet() % 2 == 0) { + bytesReceived.addAndGet(bytes.length()); + exchange.getResponseHeaders().add("ETag", Base16.encodeAsString(md5.getMd5Digest())); + exchange.sendResponseHeaders(HttpStatus.SC_OK, -1); + exchange.close(); + return; + } - } else if ("POST".equals(requestComponents.method()) && requestComponents.query().equals("uploadId=TEST")) { - // complete multipart upload request - if (countDownComplete.countDown()) { - Streams.readFully(exchange.getRequestBody()); - byte[] response = (""" + } else if ("POST".equals(requestComponents.method()) && requestComponents.query().equals("uploadId=TEST")) { + // complete multipart upload request + if (countDownComplete.countDown()) { + Streams.readFully(exchange.getRequestBody()); + byte[] response = (""" bucket write_large_blob_streaming """).getBytes(StandardCharsets.UTF_8); - exchange.getResponseHeaders().add("Content-Type", "application/xml"); - exchange.sendResponseHeaders(HttpStatus.SC_OK, response.length); - exchange.getResponseBody().write(response); - exchange.close(); - return; - } + exchange.getResponseHeaders().add("Content-Type", "application/xml"); + exchange.sendResponseHeaders(HttpStatus.SC_OK, response.length); + exchange.getResponseBody().write(response); + exchange.close(); + return; } + } // sends an error back or let the request time out if (useTimeout == false) { From e86aac42757e764670baf131b9e75c67bf424f36 Mon Sep 17 00:00:00 2001 From: elasticsearchmachine Date: Wed, 26 Feb 2025 15:08:11 +0000 Subject: [PATCH 2/2] [CI] Auto commit changes from spotless --- .../s3/S3BlobContainerRetriesTests.java | 88 +++++++++---------- 1 file changed, 44 insertions(+), 44 deletions(-) diff --git a/modules/repository-s3/src/test/java/org/elasticsearch/repositories/s3/S3BlobContainerRetriesTests.java b/modules/repository-s3/src/test/java/org/elasticsearch/repositories/s3/S3BlobContainerRetriesTests.java index 4d73ee324992a..eb2589f881ab8 100644 --- a/modules/repository-s3/src/test/java/org/elasticsearch/repositories/s3/S3BlobContainerRetriesTests.java +++ b/modules/repository-s3/src/test/java/org/elasticsearch/repositories/s3/S3BlobContainerRetriesTests.java @@ -426,36 +426,36 @@ public void testWriteLargeBlob() throws Exception { } else if ("PUT".equals(requestComponents.method()) && requestComponents.query().contains("uploadId=TEST") && requestComponents.query().contains("partNumber=")) { - // upload part request - MD5DigestCalculatingInputStream md5 = new MD5DigestCalculatingInputStream(exchange.getRequestBody()); - BytesReference bytes = Streams.readFully(md5); - assertThat((long) bytes.length(), anyOf(equalTo(lastPartSize), equalTo(bufferSize.getBytes()))); - assertThat(contentLength, anyOf(equalTo(lastPartSize), equalTo(bufferSize.getBytes()))); - - if (countDownUploads.decrementAndGet() % 2 == 0) { - exchange.getResponseHeaders().add("ETag", Base16.encodeAsString(md5.getMd5Digest())); - exchange.sendResponseHeaders(HttpStatus.SC_OK, -1); - exchange.close(); - return; - } + // upload part request + MD5DigestCalculatingInputStream md5 = new MD5DigestCalculatingInputStream(exchange.getRequestBody()); + BytesReference bytes = Streams.readFully(md5); + assertThat((long) bytes.length(), anyOf(equalTo(lastPartSize), equalTo(bufferSize.getBytes()))); + assertThat(contentLength, anyOf(equalTo(lastPartSize), equalTo(bufferSize.getBytes()))); + + if (countDownUploads.decrementAndGet() % 2 == 0) { + exchange.getResponseHeaders().add("ETag", Base16.encodeAsString(md5.getMd5Digest())); + exchange.sendResponseHeaders(HttpStatus.SC_OK, -1); + exchange.close(); + return; + } - } else if ("POST".equals(requestComponents.method()) && requestComponents.query().equals("uploadId=TEST")) { - // complete multipart upload request - if (countDownComplete.countDown()) { - Streams.readFully(exchange.getRequestBody()); - byte[] response = (""" + } else if ("POST".equals(requestComponents.method()) && requestComponents.query().equals("uploadId=TEST")) { + // complete multipart upload request + if (countDownComplete.countDown()) { + Streams.readFully(exchange.getRequestBody()); + byte[] response = (""" bucket write_large_blob """).getBytes(StandardCharsets.UTF_8); - exchange.getResponseHeaders().add("Content-Type", "application/xml"); - exchange.sendResponseHeaders(HttpStatus.SC_OK, response.length); - exchange.getResponseBody().write(response); - exchange.close(); - return; + exchange.getResponseHeaders().add("Content-Type", "application/xml"); + exchange.sendResponseHeaders(HttpStatus.SC_OK, response.length); + exchange.getResponseBody().write(response); + exchange.close(); + return; + } } - } // sends an error back or let the request time out if (useTimeout == false) { @@ -528,35 +528,35 @@ public void testWriteLargeBlobStreaming() throws Exception { } else if ("PUT".equals(requestComponents.method()) && requestComponents.query().contains("uploadId=TEST") && requestComponents.query().contains("partNumber=")) { - // upload part request - MD5DigestCalculatingInputStream md5 = new MD5DigestCalculatingInputStream(exchange.getRequestBody()); - BytesReference bytes = Streams.readFully(md5); + // upload part request + MD5DigestCalculatingInputStream md5 = new MD5DigestCalculatingInputStream(exchange.getRequestBody()); + BytesReference bytes = Streams.readFully(md5); - if (counterUploads.incrementAndGet() % 2 == 0) { - bytesReceived.addAndGet(bytes.length()); - exchange.getResponseHeaders().add("ETag", Base16.encodeAsString(md5.getMd5Digest())); - exchange.sendResponseHeaders(HttpStatus.SC_OK, -1); - exchange.close(); - return; - } + if (counterUploads.incrementAndGet() % 2 == 0) { + bytesReceived.addAndGet(bytes.length()); + exchange.getResponseHeaders().add("ETag", Base16.encodeAsString(md5.getMd5Digest())); + exchange.sendResponseHeaders(HttpStatus.SC_OK, -1); + exchange.close(); + return; + } - } else if ("POST".equals(requestComponents.method()) && requestComponents.query().equals("uploadId=TEST")) { - // complete multipart upload request - if (countDownComplete.countDown()) { - Streams.readFully(exchange.getRequestBody()); - byte[] response = (""" + } else if ("POST".equals(requestComponents.method()) && requestComponents.query().equals("uploadId=TEST")) { + // complete multipart upload request + if (countDownComplete.countDown()) { + Streams.readFully(exchange.getRequestBody()); + byte[] response = (""" bucket write_large_blob_streaming """).getBytes(StandardCharsets.UTF_8); - exchange.getResponseHeaders().add("Content-Type", "application/xml"); - exchange.sendResponseHeaders(HttpStatus.SC_OK, response.length); - exchange.getResponseBody().write(response); - exchange.close(); - return; + exchange.getResponseHeaders().add("Content-Type", "application/xml"); + exchange.sendResponseHeaders(HttpStatus.SC_OK, response.length); + exchange.getResponseBody().write(response); + exchange.close(); + return; + } } - } // sends an error back or let the request time out if (useTimeout == false) {