Skip to content

Commit 5f3eb6f

Browse files
committed
Draft for testing: Supply Content-MD5 to S3 uploads
This is just to kick off the s3 third party test suite.
1 parent 1d5706e commit 5f3eb6f

File tree

2 files changed

+58
-3
lines changed

2 files changed

+58
-3
lines changed

modules/repository-s3/src/main/java/org/elasticsearch/repositories/s3/S3BlobContainer.java

Lines changed: 57 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -69,8 +69,11 @@
6969
import java.io.IOException;
7070
import java.io.InputStream;
7171
import java.io.OutputStream;
72+
import java.security.MessageDigest;
73+
import java.security.NoSuchAlgorithmException;
7274
import java.time.Instant;
7375
import java.util.ArrayList;
76+
import java.util.Base64;
7477
import java.util.Date;
7578
import java.util.Iterator;
7679
import java.util.List;
@@ -189,13 +192,18 @@ private void flushBuffer(boolean lastPart) throws IOException {
189192
}
190193
}
191194
assert lastPart == false || successful : "must only write last part if successful";
195+
final var byteStream = buffer.bytes().streamInput();
196+
byteStream.mark(0);
197+
final var md5 = md5DigestOfInputStream(byteStream, Long.MAX_VALUE);
198+
byteStream.reset();
192199
final UploadPartRequest uploadRequest = createPartUploadRequest(
193200
purpose,
194-
buffer.bytes().streamInput(),
201+
byteStream,
195202
uploadId.get(),
196203
parts.size() + 1,
197204
absoluteBlobKey,
198205
buffer.size(),
206+
md5,
199207
lastPart
200208
);
201209
final UploadPartResult uploadResponse;
@@ -260,6 +268,7 @@ private UploadPartRequest createPartUploadRequest(
260268
int number,
261269
String blobName,
262270
long size,
271+
byte[] partMd5,
263272
boolean lastPart
264273
) {
265274
final UploadPartRequest uploadRequest = new UploadPartRequest();
@@ -270,6 +279,7 @@ private UploadPartRequest createPartUploadRequest(
270279
uploadRequest.setInputStream(stream);
271280
S3BlobStore.configureRequestForMetrics(uploadRequest, blobStore, Operation.PUT_MULTIPART_OBJECT, purpose);
272281
uploadRequest.setPartSize(size);
282+
uploadRequest.setMd5Digest(Base64.getEncoder().encodeToString(partMd5));
273283
uploadRequest.setLastPart(lastPart);
274284
return uploadRequest;
275285
}
@@ -457,8 +467,18 @@ void executeSingleUpload(
457467
if (blobSize > s3BlobStore.bufferSizeInBytes()) {
458468
throw new IllegalArgumentException("Upload request size [" + blobSize + "] can't be larger than buffer size");
459469
}
470+
// required to reset the stream for MD5 calculation
471+
if (input.markSupported() == false) {
472+
throw new IllegalArgumentException("input stream mark not supported");
473+
}
460474

461475
final ObjectMetadata md = new ObjectMetadata();
476+
477+
input.mark(0);
478+
final byte[] md5 = md5DigestOfInputStream(input, Long.MAX_VALUE);
479+
input.reset();
480+
md.setContentMD5(Base64.getEncoder().encodeToString(md5));
481+
462482
md.setContentLength(blobSize);
463483
if (s3BlobStore.serverSideEncryption()) {
464484
md.setSSEAlgorithm(ObjectMetadata.AES_256_SERVER_SIDE_ENCRYPTION);
@@ -485,7 +505,6 @@ void executeMultipartUpload(
485505
final InputStream input,
486506
final long blobSize
487507
) throws IOException {
488-
489508
ensureMultiPartUploadSize(blobSize);
490509
final long partSize = s3BlobStore.bufferSizeInBytes();
491510
final Tuple<Long, Long> multiparts = numberOfMultiparts(blobSize, partSize);
@@ -498,6 +517,19 @@ void executeMultipartUpload(
498517
final long lastPartSize = multiparts.v2();
499518
assert blobSize == (((nbParts - 1) * partSize) + lastPartSize) : "blobSize does not match multipart sizes";
500519

520+
// required to reset the stream for MD5 calculation
521+
if (input.markSupported() == false) {
522+
throw new IllegalArgumentException("input stream mark not supported");
523+
}
524+
525+
final byte[][] md5s = new byte[nbParts][];
526+
input.mark(0);
527+
for (int i = 0; i < nbParts - 1; i++) {
528+
md5s[i] = md5DigestOfInputStream(input, partSize);
529+
}
530+
md5s[nbParts - 1] = md5DigestOfInputStream(input, lastPartSize);
531+
input.reset();
532+
501533
final SetOnce<String> uploadId = new SetOnce<>();
502534
final String bucketName = s3BlobStore.bucket();
503535
boolean success = false;
@@ -525,6 +557,7 @@ void executeMultipartUpload(
525557
i,
526558
blobName,
527559
lastPart ? lastPartSize : partSize,
560+
md5s[i - 1],
528561
lastPart
529562
);
530563
bytesCount += uploadRequest.getPartSize();
@@ -564,6 +597,28 @@ void executeMultipartUpload(
564597
}
565598
}
566599

600+
// Calculate the MD5 of up to remaining bytes of the given InputStream
601+
private byte[] md5DigestOfInputStream(final InputStream inputStream, long remaining) throws IOException {
602+
try {
603+
final MessageDigest md5 = MessageDigest.getInstance("MD5");
604+
// update in chunks to bound memory usage while amortizing read cost
605+
byte[] buffer = new byte[65536];
606+
int bytesRead;
607+
do {
608+
final int toRead = (int) Math.min(remaining, buffer.length);
609+
bytesRead = inputStream.read(buffer, 0, toRead);
610+
if (bytesRead > 0) {
611+
md5.update(buffer, 0, bytesRead);
612+
remaining -= bytesRead;
613+
}
614+
} while (bytesRead > 0);
615+
616+
return md5.digest();
617+
} catch (NoSuchAlgorithmException e) {
618+
throw new IOException(e);
619+
}
620+
}
621+
567622
// non-static, package private for testing
568623
void ensureMultiPartUploadSize(final long blobSize) {
569624
if (blobSize > MAX_FILE_SIZE_USING_MULTIPART.getBytes()) {

modules/repository-s3/src/test/java/org/elasticsearch/repositories/s3/S3BlobStoreContainerTests.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -213,7 +213,7 @@ public void testExecuteMultipartUpload() throws IOException {
213213
final ArgumentCaptor<CompleteMultipartUploadRequest> compArgCaptor = ArgumentCaptor.forClass(CompleteMultipartUploadRequest.class);
214214
when(client.completeMultipartUpload(compArgCaptor.capture())).thenReturn(new CompleteMultipartUploadResult());
215215

216-
final ByteArrayInputStream inputStream = new ByteArrayInputStream(new byte[0]);
216+
final ByteArrayInputStream inputStream = new ByteArrayInputStream(new byte[1]);
217217
final S3BlobContainer blobContainer = new S3BlobContainer(blobPath, blobStore);
218218
blobContainer.executeMultipartUpload(randomPurpose(), blobStore, blobName, inputStream, blobSize);
219219

0 commit comments

Comments
 (0)