Skip to content

Commit 35d427c

Browse files
committed
Test failIfAlreadyExists for concurrent writes
1 parent f3c380d commit 35d427c

File tree

1 file changed

+34
-6
lines changed

1 file changed

+34
-6
lines changed

test/framework/src/main/java/org/elasticsearch/repositories/AbstractThirdPartyRepositoryTestCase.java

Lines changed: 34 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -346,19 +346,39 @@ public void testSkipBeyondBlobLengthShouldThrowEOFException() throws IOException
346346
}
347347
}
348348

349-
public void testFailIfAlreadyExists() throws IOException {
349+
public void testFailIfAlreadyExists() {
350350
final var blobName = randomIdentifier();
351351
final int blobLength = randomIntBetween(100, 2_000);
352352
final var initialBlobBytes = randomBytesReference(blobLength);
353353
final var overwriteBlobBytes = randomBytesReference(blobLength);
354354

355355
final var repository = getRepository();
356356

357-
// initial write blob
358-
executeOnBlobStore(repository, blobStore -> {
359-
blobStore.writeBlob(randomPurpose(), blobName, initialBlobBytes, true);
357+
CheckedFunction<BlobContainer, Void, IOException> initialWrite = blobStore -> {
358+
blobStore.writeBlobAtomic(randomPurpose(), blobName, initialBlobBytes, true);
360359
return null;
361-
});
360+
};
361+
362+
// initial write blob
363+
var initialWrite1 = submitOnBlobStore(repository, initialWrite);
364+
var initialWrite2 = submitOnBlobStore(repository, initialWrite);
365+
366+
Exception ex1 = null;
367+
Exception ex2 = null;
368+
369+
try {
370+
initialWrite1.actionGet();
371+
} catch (Exception e) {
372+
ex1 = e;
373+
}
374+
375+
try {
376+
initialWrite2.actionGet();
377+
} catch (Exception e) {
378+
ex2 = e;
379+
}
380+
381+
assertTrue("Exactly one of the writes must fail", ex1 != null ^ ex2 != null);
362382

363383
// override if failIfAlreadyExists is set to false
364384
executeOnBlobStore(repository, blobStore -> {
@@ -410,12 +430,20 @@ protected void testReadFromPositionLargerThanBlobLength(Predicate<RequestedRange
410430
assertThat(responseCodeChecker.test(rangeNotSatisfiedException), is(true));
411431
}
412432

413-
protected static <T> T executeOnBlobStore(BlobStoreRepository repository, CheckedFunction<BlobContainer, T, IOException> fn) {
433+
protected static <T> PlainActionFuture<T> submitOnBlobStore(
434+
BlobStoreRepository repository,
435+
CheckedFunction<BlobContainer, T, IOException> fn
436+
) {
414437
final var future = new PlainActionFuture<T>();
415438
repository.threadPool().generic().execute(ActionRunnable.supply(future, () -> {
416439
var blobContainer = repository.blobStore().blobContainer(repository.basePath());
417440
return fn.apply(blobContainer);
418441
}));
442+
return future;
443+
}
444+
445+
protected static <T> T executeOnBlobStore(BlobStoreRepository repository, CheckedFunction<BlobContainer, T, IOException> fn) {
446+
final var future = submitOnBlobStore(repository, fn);
419447
return future.actionGet();
420448
}
421449

0 commit comments

Comments
 (0)