Skip to content

Commit 689e2ca

Browse files
committed
Add integration test for concurrent multipart uploads on Azure
Relates ES-11815
1 parent 3bc6a43 commit 689e2ca

File tree

2 files changed

+93
-15
lines changed

2 files changed

+93
-15
lines changed

modules/repository-azure/src/internalClusterTest/java/org/elasticsearch/repositories/azure/AzureStorageCleanupThirdPartyTests.java

Lines changed: 91 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -39,13 +39,21 @@
3939
import org.elasticsearch.rest.RestStatus;
4040
import org.junit.ClassRule;
4141

42-
import java.io.ByteArrayInputStream;
42+
import java.io.BufferedInputStream;
43+
import java.io.BufferedOutputStream;
4344
import java.net.HttpURLConnection;
45+
import java.nio.channels.Channels;
46+
import java.nio.file.Files;
4447
import java.util.Collection;
48+
import java.util.zip.CRC32;
49+
import java.util.zip.CheckedInputStream;
50+
import java.util.zip.CheckedOutputStream;
4551

52+
import static org.elasticsearch.common.io.Streams.limitStream;
4653
import static org.elasticsearch.repositories.blobstore.BlobStoreTestUtil.randomPurpose;
4754
import static org.hamcrest.Matchers.blankOrNullString;
4855
import static org.hamcrest.Matchers.equalTo;
56+
import static org.hamcrest.Matchers.lessThan;
4957
import static org.hamcrest.Matchers.not;
5058

5159
/**
@@ -58,6 +66,27 @@ public class AzureStorageCleanupThirdPartyTests extends AbstractThirdPartyReposi
5866

5967
private static final String AZURE_ACCOUNT = System.getProperty("test.azure.account");
6068

69+
/**
70+
* AzureRepositoryPlugin that sets a low value for getUploadBlockSize()
71+
*/
72+
public static class TestAzureRepositoryPlugin extends AzureRepositoryPlugin {
73+
74+
public TestAzureRepositoryPlugin(Settings settings) {
75+
super(settings);
76+
}
77+
78+
@Override
79+
AzureStorageService createAzureStorageService(Settings settings, AzureClientProvider azureClientProvider) {
80+
final long blockSize = ByteSizeValue.ofKb(64L).getBytes() * randomIntBetween(1, 15);
81+
return new AzureStorageService(settings, azureClientProvider) {
82+
@Override
83+
long getUploadBlockSize() {
84+
return blockSize;
85+
}
86+
};
87+
}
88+
}
89+
6190
@ClassRule
6291
public static AzureHttpFixture fixture = new AzureHttpFixture(
6392
USE_FIXTURE ? AzureHttpFixture.Protocol.HTTP : AzureHttpFixture.Protocol.NONE,
@@ -71,7 +100,7 @@ public class AzureStorageCleanupThirdPartyTests extends AbstractThirdPartyReposi
71100

72101
@Override
73102
protected Collection<Class<? extends Plugin>> getPlugins() {
74-
return pluginList(AzureRepositoryPlugin.class);
103+
return pluginList(TestAzureRepositoryPlugin.class);
75104
}
76105

77106
@Override
@@ -158,19 +187,70 @@ private void ensureSasTokenPermissions() {
158187

159188
public void testMultiBlockUpload() throws Exception {
160189
final BlobStoreRepository repo = getRepository();
190+
assertThat(
191+
asInstanceOf(AzureBlobStore.class, repo.blobStore()).getLargeBlobThresholdInBytes(),
192+
equalTo(ByteSizeUnit.MB.toBytes(1L))
193+
);
194+
assertThat(
195+
asInstanceOf(AzureBlobStore.class, repo.blobStore()).getUploadBlockSize(),
196+
lessThan(ByteSizeUnit.MB.toBytes(1L))
197+
);
198+
161199
// The configured threshold for this test suite is 1mb
162-
final int blobSize = ByteSizeUnit.MB.toIntBytes(2);
200+
final long blobSize = randomLongBetween(ByteSizeUnit.MB.toBytes(2), ByteSizeUnit.MB.toBytes(4));
201+
final int bufferSize = 8192;
202+
203+
final var file = createTempFile();
204+
final long expectedChecksum;
205+
try (var output = new CheckedOutputStream(new BufferedOutputStream(Files.newOutputStream(file)), new CRC32())) {
206+
long remaining = blobSize;
207+
while (remaining > 0L) {
208+
final var buffer = randomByteArrayOfLength(Math.toIntExact(Math.min(bufferSize, remaining)));
209+
output.write(buffer);
210+
remaining -= buffer.length;
211+
}
212+
output.flush();
213+
expectedChecksum = output.getChecksum().getValue();
214+
}
215+
163216
PlainActionFuture<Void> future = new PlainActionFuture<>();
164217
repo.threadPool().generic().execute(ActionRunnable.run(future, () -> {
165218
final BlobContainer blobContainer = repo.blobStore().blobContainer(repo.basePath().add("large_write"));
166-
blobContainer.writeBlob(
167-
randomPurpose(),
168-
UUIDs.base64UUID(),
169-
new ByteArrayInputStream(randomByteArrayOfLength(blobSize)),
170-
blobSize,
171-
false
172-
);
173-
blobContainer.delete(randomPurpose());
219+
try {
220+
final var blobName = UUIDs.base64UUID();
221+
if (randomBoolean()) {
222+
try (var input = new BufferedInputStream(Files.newInputStream(file))) {
223+
blobContainer.writeBlob(randomPurpose(), blobName, input, blobSize, false);
224+
}
225+
} else {
226+
assertThat(blobContainer.supportsConcurrentMultipartUploads(), equalTo(true));
227+
blobContainer.writeBlobAtomic(randomPurpose(), blobName, blobSize, (offset, length) -> {
228+
var channel = Files.newByteChannel(file);
229+
if (offset > 0L) {
230+
if (channel.size() <= offset) {
231+
throw new AssertionError();
232+
}
233+
channel.position(offset);
234+
}
235+
assert channel.position() == offset;
236+
return new BufferedInputStream(limitStream(Channels.newInputStream(channel), length));
237+
}, false);
238+
}
239+
240+
long bytesCount = 0L;
241+
try (var input = new CheckedInputStream(blobContainer.readBlob(OperationPurpose.INDICES, blobName), new CRC32())) {
242+
var buffer = new byte[bufferSize];
243+
int bytesRead;
244+
while ((bytesRead = input.read(buffer)) != -1) {
245+
bytesCount += bytesRead;
246+
}
247+
248+
assertThat(bytesCount, equalTo(blobSize));
249+
assertThat(input.getChecksum().getValue(), equalTo(expectedChecksum));
250+
}
251+
} finally {
252+
blobContainer.delete(randomPurpose());
253+
}
174254
}));
175255
future.get();
176256
}

modules/repository-azure/src/main/java/org/elasticsearch/repositories/azure/AzureBlobStore.java

Lines changed: 2 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -504,12 +504,10 @@ void writeBlobAtomic(
504504
.collect(Collectors.toList())
505505
.flatMap(blockIds -> {
506506
logger.debug("{}: all {} parts uploaded, now committing", blobName, multiParts.size());
507-
var response = asyncClient.commitBlockList(
507+
return asyncClient.commitBlockList(
508508
multiParts.stream().map(MultiPart::blockId).toList(),
509509
failIfAlreadyExists == false
510-
);
511-
logger.debug("{}: all {} parts committed", blobName, multiParts.size());
512-
return response;
510+
).doOnSuccess(unused -> logger.debug("{}: all {} parts committed", blobName, multiParts.size()));
513511
})
514512
.block();
515513
}

0 commit comments

Comments
 (0)