|
32 | 32 | import java.util.ArrayList; |
33 | 33 | import java.util.List; |
34 | 34 | import java.util.Objects; |
| 35 | +import java.util.concurrent.Executors; |
| 36 | +import java.util.concurrent.TimeUnit; |
| 37 | +import java.util.function.Consumer; |
35 | 38 |
|
36 | 39 | import static org.hamcrest.Matchers.allOf; |
37 | 40 | import static org.hamcrest.Matchers.containsString; |
38 | 41 | import static org.hamcrest.Matchers.greaterThan; |
| 42 | +import static org.hamcrest.Matchers.hasSize; |
39 | 43 |
|
40 | 44 | public class S3HttpHandlerTests extends ESTestCase { |
41 | 45 |
|
@@ -383,35 +387,80 @@ public void testExtractPartEtags() { |
383 | 387 |
|
384 | 388 | } |
385 | 389 |
|
386 | | - public void testPreventObjectOverwrite() { |
| 390 | + public void testPreventObjectOverwrite() throws InterruptedException { |
387 | 391 | final var handler = new S3HttpHandler("bucket", "path"); |
388 | 392 |
|
389 | | - final var body = randomBytesReference(50); |
390 | | - assertEquals(RestStatus.OK, handleRequest(handler, "PUT", "/bucket/path/blob", body, ifNoneMatchHeader()).status()); |
391 | | - assertEquals( |
392 | | - RestStatus.PRECONDITION_FAILED, |
393 | | - handleRequest(handler, "PUT", "/bucket/path/blob", body, ifNoneMatchHeader()).status() |
394 | | - ); |
395 | | - |
396 | | - // multipart upload |
397 | | - final var createUploadResponse = handleRequest(handler, "POST", "/bucket/path/blob?uploads"); |
398 | | - final var uploadId = getUploadId(createUploadResponse.body()); |
399 | | - |
400 | | - final var part1 = randomAlphaOfLength(50); |
401 | | - final var uploadPart1Response = handleRequest(handler, "PUT", "/bucket/path/blob?uploadId=" + uploadId + "&partNumber=1", part1); |
402 | | - final var part1Etag = Objects.requireNonNull(uploadPart1Response.etag()); |
403 | | - |
404 | | - assertEquals( |
405 | | - RestStatus.PRECONDITION_FAILED, |
406 | | - handleRequest(handler, "POST", "/bucket/path/blob?uploadId=" + uploadId, new BytesArray(Strings.format(""" |
| 393 | + Consumer<TestWriteTask> putObjectConsumer = (task) -> task.status = handleRequest( |
| 394 | + handler, |
| 395 | + "PUT", |
| 396 | + "/bucket/path/blob", |
| 397 | + task.body, |
| 398 | + ifNoneMatchHeader() |
| 399 | + ).status(); |
| 400 | + |
| 401 | + Consumer<TestWriteTask> prepareMultipartUploadConsumer = (task) -> { |
| 402 | + final var createUploadResponse = handleRequest(handler, "POST", "/bucket/path/blob?uploads"); |
| 403 | + task.uploadId = getUploadId(createUploadResponse.body()); |
| 404 | + |
| 405 | + final var uploadPart1Response = handleRequest( |
| 406 | + handler, |
| 407 | + "PUT", |
| 408 | + "/bucket/path/blob?uploadId=" + task.uploadId + "&partNumber=1", |
| 409 | + task.body |
| 410 | + ); |
| 411 | + task.etag = Objects.requireNonNull(uploadPart1Response.etag()); |
| 412 | + }; |
| 413 | + |
| 414 | + Consumer<TestWriteTask> completeMultipartUploadConsumer = (task) -> { |
| 415 | + task.status = handleRequest(handler, "POST", "/bucket/path/blob?uploadId=" + task.uploadId, new BytesArray(Strings.format(""" |
407 | 416 | <?xml version="1.0" encoding="UTF-8"?> |
408 | 417 | <CompleteMultipartUpload xmlns="http://s3.amazonaws.com/doc/2006-03-01/"> |
409 | 418 | <Part> |
410 | 419 | <ETag>%s</ETag> |
411 | 420 | <PartNumber>1</PartNumber> |
412 | 421 | </Part> |
413 | | - </CompleteMultipartUpload>""", part1Etag)), ifNoneMatchHeader()).status() |
| 422 | + </CompleteMultipartUpload>""", task.etag)), ifNoneMatchHeader()).status(); |
| 423 | + }; |
| 424 | + |
| 425 | + var tasks = List.of( |
| 426 | + new TestWriteTask(putObjectConsumer), |
| 427 | + new TestWriteTask(putObjectConsumer), |
| 428 | + new TestWriteTask(completeMultipartUploadConsumer, prepareMultipartUploadConsumer), |
| 429 | + new TestWriteTask(completeMultipartUploadConsumer, prepareMultipartUploadConsumer) |
414 | 430 | ); |
| 431 | + |
| 432 | + try (var executor = Executors.newVirtualThreadPerTaskExecutor()) { |
| 433 | + tasks.forEach(task -> executor.submit(task.consumer)); |
| 434 | + executor.shutdown(); |
| 435 | + var done = executor.awaitTermination(1, TimeUnit.SECONDS); |
| 436 | + assertTrue(done); |
| 437 | + } |
| 438 | + |
| 439 | + List<TestWriteTask> successfulTasks = tasks.stream().filter(task -> task.status == RestStatus.OK).toList(); |
| 440 | + assertThat(successfulTasks, hasSize(1)); |
| 441 | + |
| 442 | + assertEquals( |
| 443 | + new TestHttpResponse(RestStatus.OK, successfulTasks.getFirst().body, TestHttpExchange.EMPTY_HEADERS), |
| 444 | + handleRequest(handler, "GET", "/bucket/path/blob") |
| 445 | + ); |
| 446 | + } |
| 447 | + |
| 448 | + private static class TestWriteTask { |
| 449 | + final BytesReference body; |
| 450 | + final Runnable consumer; |
| 451 | + String uploadId; |
| 452 | + String etag; |
| 453 | + RestStatus status; |
| 454 | + |
| 455 | + TestWriteTask(Consumer<TestWriteTask> consumer, Consumer<TestWriteTask> prepare) { |
| 456 | + this(consumer); |
| 457 | + prepare.accept(this); |
| 458 | + } |
| 459 | + |
| 460 | + TestWriteTask(Consumer<TestWriteTask> consumer) { |
| 461 | + this.body = randomBytesReference(50); |
| 462 | + this.consumer = () -> consumer.accept(this); |
| 463 | + } |
415 | 464 | } |
416 | 465 |
|
417 | 466 | private void runExtractPartETagsTest(String body, String... expectedTags) { |
|
0 commit comments