|
21 | 21 | import com.sun.net.httpserver.HttpHandler; |
22 | 22 |
|
23 | 23 | import org.apache.http.HttpStatus; |
| 24 | +import org.apache.lucene.index.CorruptIndexException; |
| 25 | +import org.apache.lucene.store.AlreadyClosedException; |
24 | 26 | import org.elasticsearch.ExceptionsHelper; |
25 | 27 | import org.elasticsearch.cluster.metadata.RepositoryMetadata; |
26 | 28 | import org.elasticsearch.common.BackoffPolicy; |
|
78 | 80 | import java.util.concurrent.atomic.AtomicBoolean; |
79 | 81 | import java.util.concurrent.atomic.AtomicInteger; |
80 | 82 | import java.util.concurrent.atomic.AtomicLong; |
| 83 | +import java.util.concurrent.atomic.AtomicReference; |
81 | 84 | import java.util.function.IntConsumer; |
82 | 85 |
|
83 | 86 | import static org.elasticsearch.repositories.blobstore.BlobStoreTestUtil.randomNonDataPurpose; |
@@ -332,6 +335,57 @@ public void testWriteBlobWithReadTimeouts() { |
332 | 335 | assertThat(exception.getCause().getCause().getMessage().toLowerCase(Locale.ROOT), containsString("read timed out")); |
333 | 336 | } |
334 | 337 |
|
| 338 | + /** |
| 339 | + * This test shows that the AWS SDKv1 defers the closing of the InputStream used to upload a blob after the HTTP request has been sent |
| 340 | + * to S3, swallowing any exception thrown at closing time. |
| 341 | + */ |
| 342 | + public void testWriteBlobWithExceptionThrownAtClosingTime() throws Exception { |
| 343 | + var maxRetries = randomInt(3); |
| 344 | + var blobLength = randomIntBetween(1, 4096 * 3); |
| 345 | + var blobName = getTestName().toLowerCase(Locale.ROOT); |
| 346 | + var blobContainer = createBlobContainer(maxRetries, null, true, null); |
| 347 | + |
| 348 | + var uploadedBytes = new AtomicReference<BytesReference>(); |
| 349 | + httpServer.createContext(downloadStorageEndpoint(blobContainer, blobName), exchange -> { |
| 350 | + var requestComponents = S3HttpHandler.parseRequestComponents(S3HttpHandler.getRawRequestString(exchange)); |
| 351 | + if ("PUT".equals(requestComponents.method()) && requestComponents.query().isEmpty()) { |
| 352 | + var body = Streams.readFully(exchange.getRequestBody()); |
| 353 | + if (uploadedBytes.compareAndSet(null, body)) { |
| 354 | + exchange.sendResponseHeaders(HttpStatus.SC_OK, -1); |
| 355 | + exchange.close(); |
| 356 | + return; |
| 357 | + } |
| 358 | + } |
| 359 | + exchange.sendResponseHeaders(HttpStatus.SC_BAD_REQUEST, -1); |
| 360 | + exchange.close(); |
| 361 | + }); |
| 362 | + |
| 363 | + final byte[] bytes = randomByteArrayOfLength(blobLength); |
| 364 | + |
| 365 | + var exceptionThrown = new AtomicBoolean(); |
| 366 | + blobContainer.writeBlobAtomic(randomPurpose(), blobName, new FilterInputStream(new ByteArrayInputStream(bytes)) { |
| 367 | + @Override |
| 368 | + public void close() throws IOException { |
| 369 | + if (exceptionThrown.compareAndSet(false, true)) { |
| 370 | + switch (randomInt(3)) { |
| 371 | + case 0: |
| 372 | + throw new CorruptIndexException("simulated", blobName); |
| 373 | + case 1: |
| 374 | + throw new AlreadyClosedException("simulated"); |
| 375 | + case 2: |
| 376 | + throw new RuntimeException("simulated"); |
| 377 | + case 3: |
| 378 | + default: |
| 379 | + throw new IOException("simulated"); |
| 380 | + } |
| 381 | + } |
| 382 | + } |
| 383 | + }, blobLength, true); |
| 384 | + |
| 385 | + assertThat(exceptionThrown.get(), is(true)); |
| 386 | + assertArrayEquals(bytes, BytesReference.toBytes(uploadedBytes.get())); |
| 387 | + } |
| 388 | + |
335 | 389 | public void testWriteLargeBlob() throws Exception { |
336 | 390 | final boolean useTimeout = rarely(); |
337 | 391 | final TimeValue readTimeout = useTimeout ? TimeValue.timeValueMillis(randomIntBetween(100, 500)) : null; |
|
0 commit comments