Skip to content

Commit e5cdc58

Browse files
authored
Add integration test for concurrent multipart uploads on Azure (#128503)
Enhances existing integration test to account for #128449. Relates ES-11815
1 parent 6bc1452 commit e5cdc58

File tree

2 files changed

+90
-15
lines changed

2 files changed

+90
-15
lines changed

modules/repository-azure/src/internalClusterTest/java/org/elasticsearch/repositories/azure/AzureStorageCleanupThirdPartyTests.java

Lines changed: 88 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -39,13 +39,21 @@
3939
import org.elasticsearch.rest.RestStatus;
4040
import org.junit.ClassRule;
4141

42-
import java.io.ByteArrayInputStream;
42+
import java.io.BufferedInputStream;
43+
import java.io.BufferedOutputStream;
4344
import java.net.HttpURLConnection;
45+
import java.nio.channels.Channels;
46+
import java.nio.file.Files;
4447
import java.util.Collection;
48+
import java.util.zip.CRC32;
49+
import java.util.zip.CheckedInputStream;
50+
import java.util.zip.CheckedOutputStream;
4551

52+
import static org.elasticsearch.common.io.Streams.limitStream;
4653
import static org.elasticsearch.repositories.blobstore.BlobStoreTestUtil.randomPurpose;
4754
import static org.hamcrest.Matchers.blankOrNullString;
4855
import static org.hamcrest.Matchers.equalTo;
56+
import static org.hamcrest.Matchers.lessThan;
4957
import static org.hamcrest.Matchers.not;
5058

5159
/**
@@ -58,6 +66,27 @@ public class AzureStorageCleanupThirdPartyTests extends AbstractThirdPartyReposi
5866

5967
private static final String AZURE_ACCOUNT = System.getProperty("test.azure.account");
6068

69+
/**
70+
* AzureRepositoryPlugin that sets a low value for getUploadBlockSize()
71+
*/
72+
public static class TestAzureRepositoryPlugin extends AzureRepositoryPlugin {
73+
74+
public TestAzureRepositoryPlugin(Settings settings) {
75+
super(settings);
76+
}
77+
78+
@Override
79+
AzureStorageService createAzureStorageService(Settings settings, AzureClientProvider azureClientProvider) {
80+
final long blockSize = ByteSizeValue.ofKb(64L).getBytes() * randomIntBetween(1, 15);
81+
return new AzureStorageService(settings, azureClientProvider) {
82+
@Override
83+
long getUploadBlockSize() {
84+
return blockSize;
85+
}
86+
};
87+
}
88+
}
89+
6190
@ClassRule
6291
public static AzureHttpFixture fixture = new AzureHttpFixture(
6392
USE_FIXTURE ? AzureHttpFixture.Protocol.HTTP : AzureHttpFixture.Protocol.NONE,
@@ -71,7 +100,7 @@ public class AzureStorageCleanupThirdPartyTests extends AbstractThirdPartyReposi
71100

72101
@Override
73102
protected Collection<Class<? extends Plugin>> getPlugins() {
74-
return pluginList(AzureRepositoryPlugin.class);
103+
return pluginList(TestAzureRepositoryPlugin.class);
75104
}
76105

77106
@Override
@@ -158,19 +187,67 @@ private void ensureSasTokenPermissions() {
158187

159188
public void testMultiBlockUpload() throws Exception {
160189
final BlobStoreRepository repo = getRepository();
190+
assertThat(
191+
asInstanceOf(AzureBlobStore.class, repo.blobStore()).getLargeBlobThresholdInBytes(),
192+
equalTo(ByteSizeUnit.MB.toBytes(1L))
193+
);
194+
assertThat(asInstanceOf(AzureBlobStore.class, repo.blobStore()).getUploadBlockSize(), lessThan(ByteSizeUnit.MB.toBytes(1L)));
195+
161196
// The configured threshold for this test suite is 1mb
162-
final int blobSize = ByteSizeUnit.MB.toIntBytes(2);
197+
final long blobSize = randomLongBetween(ByteSizeUnit.MB.toBytes(2), ByteSizeUnit.MB.toBytes(4));
198+
final int bufferSize = 8192;
199+
200+
final var file = createTempFile();
201+
final long expectedChecksum;
202+
try (var output = new CheckedOutputStream(new BufferedOutputStream(Files.newOutputStream(file)), new CRC32())) {
203+
long remaining = blobSize;
204+
while (remaining > 0L) {
205+
final var buffer = randomByteArrayOfLength(Math.toIntExact(Math.min(bufferSize, remaining)));
206+
output.write(buffer);
207+
remaining -= buffer.length;
208+
}
209+
output.flush();
210+
expectedChecksum = output.getChecksum().getValue();
211+
}
212+
163213
PlainActionFuture<Void> future = new PlainActionFuture<>();
164214
repo.threadPool().generic().execute(ActionRunnable.run(future, () -> {
165215
final BlobContainer blobContainer = repo.blobStore().blobContainer(repo.basePath().add("large_write"));
166-
blobContainer.writeBlob(
167-
randomPurpose(),
168-
UUIDs.base64UUID(),
169-
new ByteArrayInputStream(randomByteArrayOfLength(blobSize)),
170-
blobSize,
171-
false
172-
);
173-
blobContainer.delete(randomPurpose());
216+
try {
217+
final var blobName = UUIDs.base64UUID();
218+
if (randomBoolean()) {
219+
try (var input = new BufferedInputStream(Files.newInputStream(file))) {
220+
blobContainer.writeBlob(randomPurpose(), blobName, input, blobSize, false);
221+
}
222+
} else {
223+
assertThat(blobContainer.supportsConcurrentMultipartUploads(), equalTo(true));
224+
blobContainer.writeBlobAtomic(randomPurpose(), blobName, blobSize, (offset, length) -> {
225+
var channel = Files.newByteChannel(file);
226+
if (offset > 0L) {
227+
if (channel.size() <= offset) {
228+
throw new AssertionError();
229+
}
230+
channel.position(offset);
231+
}
232+
assert channel.position() == offset;
233+
return new BufferedInputStream(limitStream(Channels.newInputStream(channel), length));
234+
}, false);
235+
}
236+
237+
long bytesCount = 0L;
238+
try (var input = new CheckedInputStream(blobContainer.readBlob(OperationPurpose.INDICES, blobName), new CRC32())) {
239+
var buffer = new byte[bufferSize];
240+
int bytesRead;
241+
while ((bytesRead = input.read(buffer)) != -1) {
242+
bytesCount += bytesRead;
243+
}
244+
245+
assertThat(bytesCount, equalTo(blobSize));
246+
assertThat(input.getChecksum().getValue(), equalTo(expectedChecksum));
247+
}
248+
} finally {
249+
blobContainer.delete(randomPurpose());
250+
}
174251
}));
175252
future.get();
176253
}

modules/repository-azure/src/main/java/org/elasticsearch/repositories/azure/AzureBlobStore.java

Lines changed: 2 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -504,12 +504,10 @@ void writeBlobAtomic(
504504
.collect(Collectors.toList())
505505
.flatMap(blockIds -> {
506506
logger.debug("{}: all {} parts uploaded, now committing", blobName, multiParts.size());
507-
var response = asyncClient.commitBlockList(
507+
return asyncClient.commitBlockList(
508508
multiParts.stream().map(MultiPart::blockId).toList(),
509509
failIfAlreadyExists == false
510-
);
511-
logger.debug("{}: all {} parts committed", blobName, multiParts.size());
512-
return response;
510+
).doOnSuccess(unused -> logger.debug("{}: all {} parts committed", blobName, multiParts.size()));
513511
})
514512
.block();
515513
}

0 commit comments

Comments
 (0)