Skip to content

Commit 258602c

Browse files
committed
Add full conditional write support to S3 test fixture
In elastic#133030 we added limited support for conditional writes in `S3HttpHandler`, allowing callers to prevent overwriting an existing blob with an `If-None-Match: *` precondition header. This commit extends the implementation to include support for the `If-Match: <etag>` precondition header allowing callers to perform atomic compare-and-set operations which overwrite existing objects.
1 parent 0e5dc16 commit 258602c

File tree

2 files changed

+78
-13
lines changed

2 files changed

+78
-13
lines changed

test/fixtures/s3-fixture/src/main/java/fixture/s3/S3HttpHandler.java

Lines changed: 47 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -44,6 +44,7 @@
4444
import java.util.Set;
4545
import java.util.concurrent.ConcurrentHashMap;
4646
import java.util.concurrent.ConcurrentMap;
47+
import java.util.concurrent.atomic.AtomicBoolean;
4748
import java.util.regex.Matcher;
4849
import java.util.regex.Pattern;
4950

@@ -248,6 +249,9 @@ public void handle(final HttpExchange exchange) throws IOException {
248249
if (isProtectOverwrite(exchange)) {
249250
throw new AssertionError("If-None-Match: * header is not supported here");
250251
}
252+
if (getRequiredExistingETag(exchange) != null) {
253+
throw new AssertionError("If-Match: * header is not supported here");
254+
}
251255

252256
var sourceBlob = blobs.get(copySource);
253257
if (sourceBlob == null) {
@@ -412,10 +416,24 @@ public void handle(final HttpExchange exchange) throws IOException {
412416
private boolean updateBlobContents(HttpExchange exchange, String path, BytesReference newContents) {
413417
if (isProtectOverwrite(exchange)) {
414418
return blobs.putIfAbsent(path, newContents) == null;
415-
} else {
416-
blobs.put(path, newContents);
417-
return true;
418419
}
420+
421+
final var requireExistingETag = getRequiredExistingETag(exchange);
422+
if (requireExistingETag != null) {
423+
final var success = new AtomicBoolean(true);
424+
blobs.compute(path, (ignoredPath, existingContents) -> {
425+
if (existingContents != null && requireExistingETag.equals(getEtagFromContents(existingContents))) {
426+
return newContents;
427+
}
428+
429+
success.set(false);
430+
return existingContents;
431+
});
432+
return success.get();
433+
}
434+
435+
blobs.put(path, newContents);
436+
return true;
419437
}
420438

421439
/**
@@ -594,6 +612,9 @@ private static boolean isProtectOverwrite(final HttpExchange exchange) {
594612
return false;
595613
}
596614

615+
if (exchange.getRequestHeaders().get("If-Match") != null) {
616+
throw new AssertionError("Handling both If-None-Match and If-Match headers is not supported");
617+
}
597618
if (ifNoneMatch.size() != 1) {
598619
throw new AssertionError("multiple If-None-Match headers found: " + ifNoneMatch);
599620
}
@@ -605,6 +626,29 @@ private static boolean isProtectOverwrite(final HttpExchange exchange) {
605626
throw new AssertionError("invalid If-None-Match header: " + ifNoneMatch);
606627
}
607628

629+
@Nullable // if no If-Match header found
630+
private static String getRequiredExistingETag(final HttpExchange exchange) {
631+
final var ifMatch = exchange.getRequestHeaders().get("If-Match");
632+
633+
if (ifMatch == null) {
634+
return null;
635+
}
636+
637+
if (exchange.getRequestHeaders().get("If-None-Match") != null) {
638+
throw new AssertionError("Handling both If-None-Match and If-Match headers is not supported");
639+
}
640+
641+
final var iterator = ifMatch.iterator();
642+
if (iterator.hasNext()) {
643+
final var result = iterator.next();
644+
if (iterator.hasNext() == false) {
645+
return result;
646+
}
647+
}
648+
649+
throw new AssertionError("multiple If-Match headers found: " + ifMatch);
650+
}
651+
608652
MultipartUpload putUpload(String path) {
609653
final var upload = new MultipartUpload(UUIDs.randomBase64UUID(), path);
610654
synchronized (uploads) {

test/fixtures/s3-fixture/src/test/java/fixture/s3/S3HttpHandlerTests.java

Lines changed: 31 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -413,13 +413,29 @@ public void testExtractPartEtags() {
413413
}
414414

415415
public void testPreventObjectOverwrite() throws InterruptedException {
416+
ensureExactlyOneSuccess(new S3HttpHandler("bucket", "path"), null);
417+
}
418+
419+
public void testConditionalOverwrite() throws InterruptedException {
416420
final var handler = new S3HttpHandler("bucket", "path");
417421

422+
final var originalBody = new BytesArray(randomAlphaOfLength(50).getBytes(StandardCharsets.UTF_8));
423+
final var originalETag = S3HttpHandler.getEtagFromContents(originalBody);
424+
assertEquals(RestStatus.OK, handleRequest(handler, "PUT", "/bucket/path/blob", originalBody).status());
425+
assertEquals(
426+
new TestHttpResponse(RestStatus.OK, originalBody, addETag(originalETag, TestHttpExchange.EMPTY_HEADERS)),
427+
handleRequest(handler, "GET", "/bucket/path/blob")
428+
);
429+
430+
ensureExactlyOneSuccess(handler, originalETag);
431+
}
432+
433+
private static void ensureExactlyOneSuccess(S3HttpHandler handler, String originalETag) throws InterruptedException {
418434
var tasks = List.of(
419-
createPutObjectTask(handler),
420-
createPutObjectTask(handler),
421-
createMultipartUploadTask(handler),
422-
createMultipartUploadTask(handler)
435+
createPutObjectTask(handler, originalETag),
436+
createPutObjectTask(handler, originalETag),
437+
createMultipartUploadTask(handler, originalETag),
438+
createMultipartUploadTask(handler, originalETag)
423439
);
424440

425441
try (var executor = Executors.newVirtualThreadPerTaskExecutor()) {
@@ -450,13 +466,14 @@ public void testPreventObjectOverwrite() throws InterruptedException {
450466
);
451467
}
452468

453-
private static TestWriteTask createPutObjectTask(S3HttpHandler handler) {
469+
private static TestWriteTask createPutObjectTask(S3HttpHandler handler, @Nullable String originalETag) {
454470
return new TestWriteTask(
455-
(task) -> task.status = handleRequest(handler, "PUT", "/bucket/path/blob", task.body, ifNoneMatchHeader()).status()
471+
(task) -> task.status = handleRequest(handler, "PUT", "/bucket/path/blob", task.body, conditionalWriteHeader(originalETag))
472+
.status()
456473
);
457474
}
458475

459-
private static TestWriteTask createMultipartUploadTask(S3HttpHandler handler) {
476+
private static TestWriteTask createMultipartUploadTask(S3HttpHandler handler, @Nullable String originalETag) {
460477
final var multipartUploadTask = new TestWriteTask(
461478
(task) -> task.status = handleRequest(
462479
handler,
@@ -470,7 +487,7 @@ private static TestWriteTask createMultipartUploadTask(S3HttpHandler handler) {
470487
<PartNumber>1</PartNumber>
471488
</Part>
472489
</CompleteMultipartUpload>""", task.etag)),
473-
ifNoneMatchHeader()
490+
conditionalWriteHeader(originalETag)
474491
).status()
475492
);
476493

@@ -599,9 +616,13 @@ private static Headers contentRangeHeader(long start, long end, long length) {
599616
return headers;
600617
}
601618

602-
private static Headers ifNoneMatchHeader() {
619+
private static Headers conditionalWriteHeader(@Nullable String originalEtag) {
603620
var headers = new Headers();
604-
headers.put("If-None-Match", List.of("*"));
621+
if (originalEtag == null) {
622+
headers.put("If-None-Match", List.of("*"));
623+
} else {
624+
headers.put("If-Match", List.of(originalEtag));
625+
}
605626
return headers;
606627
}
607628

0 commit comments

Comments
 (0)