Skip to content

Commit a9650d7

Browse files
committed
[Test] Add test for ES-10931
1 parent 8176746 commit a9650d7

File tree

1 file changed

+99
-45
lines changed

1 file changed

+99
-45
lines changed

modules/repository-s3/src/test/java/org/elasticsearch/repositories/s3/S3BlobContainerRetriesTests.java

Lines changed: 99 additions & 45 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,8 @@
2121
import com.sun.net.httpserver.HttpHandler;
2222

2323
import org.apache.http.HttpStatus;
24+
import org.apache.lucene.index.CorruptIndexException;
25+
import org.apache.lucene.store.AlreadyClosedException;
2426
import org.elasticsearch.ExceptionsHelper;
2527
import org.elasticsearch.cluster.metadata.RepositoryMetadata;
2628
import org.elasticsearch.common.BackoffPolicy;
@@ -78,6 +80,7 @@
7880
import java.util.concurrent.atomic.AtomicBoolean;
7981
import java.util.concurrent.atomic.AtomicInteger;
8082
import java.util.concurrent.atomic.AtomicLong;
83+
import java.util.concurrent.atomic.AtomicReference;
8184
import java.util.function.IntConsumer;
8285

8386
import static org.elasticsearch.repositories.blobstore.BlobStoreTestUtil.randomNonDataPurpose;
@@ -332,6 +335,57 @@ public void testWriteBlobWithReadTimeouts() {
332335
assertThat(exception.getCause().getCause().getMessage().toLowerCase(Locale.ROOT), containsString("read timed out"));
333336
}
334337

338+
/**
339+
* This test shows that the AWS SDKv1 defers the closing of the InputStream used to upload a blob after the HTTP request has been sent
340+
* to S3, swallowing any exception thrown at closing time.
341+
*/
342+
public void testWriteBlobWithExceptionThrownAtClosingTime() throws Exception {
343+
var maxRetries = randomInt(3);
344+
var blobLength = randomIntBetween(1, 4096 * 3);
345+
var blobName = getTestName().toLowerCase(Locale.ROOT);
346+
var blobContainer = createBlobContainer(maxRetries, null, true, null);
347+
348+
var uploadedBytes = new AtomicReference<BytesReference>();
349+
httpServer.createContext(downloadStorageEndpoint(blobContainer, blobName), exchange -> {
350+
var requestComponents = S3HttpHandler.parseRequestComponents(S3HttpHandler.getRawRequestString(exchange));
351+
if ("PUT".equals(requestComponents.method()) && requestComponents.query().isEmpty()) {
352+
var body = Streams.readFully(exchange.getRequestBody());
353+
if (uploadedBytes.compareAndSet(null, body)) {
354+
exchange.sendResponseHeaders(HttpStatus.SC_OK, -1);
355+
exchange.close();
356+
return;
357+
}
358+
}
359+
exchange.sendResponseHeaders(HttpStatus.SC_BAD_REQUEST, -1);
360+
exchange.close();
361+
});
362+
363+
final byte[] bytes = randomByteArrayOfLength(blobLength);
364+
365+
var exceptionThrown = new AtomicBoolean();
366+
blobContainer.writeBlobAtomic(randomPurpose(), blobName, new FilterInputStream(new ByteArrayInputStream(bytes)) {
367+
@Override
368+
public void close() throws IOException {
369+
if (exceptionThrown.compareAndSet(false, true)) {
370+
switch (randomInt(3)) {
371+
case 0:
372+
throw new CorruptIndexException("simulated", blobName);
373+
case 1:
374+
throw new AlreadyClosedException("simulated");
375+
case 2:
376+
throw new RuntimeException("simulated");
377+
case 3:
378+
default:
379+
throw new IOException("simulated");
380+
}
381+
}
382+
}
383+
}, blobLength, true);
384+
385+
assertThat(exceptionThrown.get(), is(true));
386+
assertArrayEquals(bytes, BytesReference.toBytes(uploadedBytes.get()));
387+
}
388+
335389
public void testWriteLargeBlob() throws Exception {
336390
final boolean useTimeout = rarely();
337391
final TimeValue readTimeout = useTimeout ? TimeValue.timeValueMillis(randomIntBetween(100, 500)) : null;
@@ -372,36 +426,36 @@ public void testWriteLargeBlob() throws Exception {
372426
} else if ("PUT".equals(requestComponents.method())
373427
&& requestComponents.query().contains("uploadId=TEST")
374428
&& requestComponents.query().contains("partNumber=")) {
375-
// upload part request
376-
MD5DigestCalculatingInputStream md5 = new MD5DigestCalculatingInputStream(exchange.getRequestBody());
377-
BytesReference bytes = Streams.readFully(md5);
378-
assertThat((long) bytes.length(), anyOf(equalTo(lastPartSize), equalTo(bufferSize.getBytes())));
379-
assertThat(contentLength, anyOf(equalTo(lastPartSize), equalTo(bufferSize.getBytes())));
380-
381-
if (countDownUploads.decrementAndGet() % 2 == 0) {
382-
exchange.getResponseHeaders().add("ETag", Base16.encodeAsString(md5.getMd5Digest()));
383-
exchange.sendResponseHeaders(HttpStatus.SC_OK, -1);
384-
exchange.close();
385-
return;
386-
}
429+
// upload part request
430+
MD5DigestCalculatingInputStream md5 = new MD5DigestCalculatingInputStream(exchange.getRequestBody());
431+
BytesReference bytes = Streams.readFully(md5);
432+
assertThat((long) bytes.length(), anyOf(equalTo(lastPartSize), equalTo(bufferSize.getBytes())));
433+
assertThat(contentLength, anyOf(equalTo(lastPartSize), equalTo(bufferSize.getBytes())));
434+
435+
if (countDownUploads.decrementAndGet() % 2 == 0) {
436+
exchange.getResponseHeaders().add("ETag", Base16.encodeAsString(md5.getMd5Digest()));
437+
exchange.sendResponseHeaders(HttpStatus.SC_OK, -1);
438+
exchange.close();
439+
return;
440+
}
387441

388-
} else if ("POST".equals(requestComponents.method()) && requestComponents.query().equals("uploadId=TEST")) {
389-
// complete multipart upload request
390-
if (countDownComplete.countDown()) {
391-
Streams.readFully(exchange.getRequestBody());
392-
byte[] response = ("""
442+
} else if ("POST".equals(requestComponents.method()) && requestComponents.query().equals("uploadId=TEST")) {
443+
// complete multipart upload request
444+
if (countDownComplete.countDown()) {
445+
Streams.readFully(exchange.getRequestBody());
446+
byte[] response = ("""
393447
<?xml version="1.0" encoding="UTF-8"?>
394448
<CompleteMultipartUploadResult>
395449
<Bucket>bucket</Bucket>
396450
<Key>write_large_blob</Key>
397451
</CompleteMultipartUploadResult>""").getBytes(StandardCharsets.UTF_8);
398-
exchange.getResponseHeaders().add("Content-Type", "application/xml");
399-
exchange.sendResponseHeaders(HttpStatus.SC_OK, response.length);
400-
exchange.getResponseBody().write(response);
401-
exchange.close();
402-
return;
403-
}
452+
exchange.getResponseHeaders().add("Content-Type", "application/xml");
453+
exchange.sendResponseHeaders(HttpStatus.SC_OK, response.length);
454+
exchange.getResponseBody().write(response);
455+
exchange.close();
456+
return;
404457
}
458+
}
405459

406460
// sends an error back or let the request time out
407461
if (useTimeout == false) {
@@ -474,35 +528,35 @@ public void testWriteLargeBlobStreaming() throws Exception {
474528
} else if ("PUT".equals(requestComponents.method())
475529
&& requestComponents.query().contains("uploadId=TEST")
476530
&& requestComponents.query().contains("partNumber=")) {
477-
// upload part request
478-
MD5DigestCalculatingInputStream md5 = new MD5DigestCalculatingInputStream(exchange.getRequestBody());
479-
BytesReference bytes = Streams.readFully(md5);
480-
481-
if (counterUploads.incrementAndGet() % 2 == 0) {
482-
bytesReceived.addAndGet(bytes.length());
483-
exchange.getResponseHeaders().add("ETag", Base16.encodeAsString(md5.getMd5Digest()));
484-
exchange.sendResponseHeaders(HttpStatus.SC_OK, -1);
485-
exchange.close();
486-
return;
487-
}
531+
// upload part request
532+
MD5DigestCalculatingInputStream md5 = new MD5DigestCalculatingInputStream(exchange.getRequestBody());
533+
BytesReference bytes = Streams.readFully(md5);
534+
535+
if (counterUploads.incrementAndGet() % 2 == 0) {
536+
bytesReceived.addAndGet(bytes.length());
537+
exchange.getResponseHeaders().add("ETag", Base16.encodeAsString(md5.getMd5Digest()));
538+
exchange.sendResponseHeaders(HttpStatus.SC_OK, -1);
539+
exchange.close();
540+
return;
541+
}
488542

489-
} else if ("POST".equals(requestComponents.method()) && requestComponents.query().equals("uploadId=TEST")) {
490-
// complete multipart upload request
491-
if (countDownComplete.countDown()) {
492-
Streams.readFully(exchange.getRequestBody());
493-
byte[] response = ("""
543+
} else if ("POST".equals(requestComponents.method()) && requestComponents.query().equals("uploadId=TEST")) {
544+
// complete multipart upload request
545+
if (countDownComplete.countDown()) {
546+
Streams.readFully(exchange.getRequestBody());
547+
byte[] response = ("""
494548
<?xml version="1.0" encoding="UTF-8"?>
495549
<CompleteMultipartUploadResult>
496550
<Bucket>bucket</Bucket>
497551
<Key>write_large_blob_streaming</Key>
498552
</CompleteMultipartUploadResult>""").getBytes(StandardCharsets.UTF_8);
499-
exchange.getResponseHeaders().add("Content-Type", "application/xml");
500-
exchange.sendResponseHeaders(HttpStatus.SC_OK, response.length);
501-
exchange.getResponseBody().write(response);
502-
exchange.close();
503-
return;
504-
}
553+
exchange.getResponseHeaders().add("Content-Type", "application/xml");
554+
exchange.sendResponseHeaders(HttpStatus.SC_OK, response.length);
555+
exchange.getResponseBody().write(response);
556+
exchange.close();
557+
return;
505558
}
559+
}
506560

507561
// sends an error back or let the request time out
508562
if (useTimeout == false) {

0 commit comments

Comments
 (0)