6969import java .io .IOException ;
7070import java .io .InputStream ;
7171import java .io .OutputStream ;
72+ import java .security .MessageDigest ;
73+ import java .security .NoSuchAlgorithmException ;
7274import java .time .Instant ;
7375import java .util .ArrayList ;
76+ import java .util .Base64 ;
7477import java .util .Date ;
7578import java .util .Iterator ;
7679import java .util .List ;
@@ -189,13 +192,17 @@ private void flushBuffer(boolean lastPart) throws IOException {
189192 }
190193 }
191194 assert lastPart == false || successful : "must only write last part if successful" ;
195+ final var byteStream = buffer .bytes ().streamInput ();
196+ final var md5 = md5DigestOfInputStream (byteStream , Long .MAX_VALUE );
197+ byteStream .reset ();
192198 final UploadPartRequest uploadRequest = createPartUploadRequest (
193199 purpose ,
194- buffer . bytes (). streamInput () ,
200+ byteStream ,
195201 uploadId .get (),
196202 parts .size () + 1 ,
197203 absoluteBlobKey ,
198204 buffer .size (),
205+ md5 ,
199206 lastPart
200207 );
201208 final UploadPartResult uploadResponse ;
@@ -260,6 +267,7 @@ private UploadPartRequest createPartUploadRequest(
260267 int number ,
261268 String blobName ,
262269 long size ,
270+ byte [] partMd5 ,
263271 boolean lastPart
264272 ) {
265273 final UploadPartRequest uploadRequest = new UploadPartRequest ();
@@ -270,6 +278,7 @@ private UploadPartRequest createPartUploadRequest(
270278 uploadRequest .setInputStream (stream );
271279 S3BlobStore .configureRequestForMetrics (uploadRequest , blobStore , Operation .PUT_MULTIPART_OBJECT , purpose );
272280 uploadRequest .setPartSize (size );
281+ uploadRequest .setMd5Digest (Base64 .getEncoder ().encodeToString (partMd5 ));
273282 uploadRequest .setLastPart (lastPart );
274283 return uploadRequest ;
275284 }
@@ -457,8 +466,17 @@ void executeSingleUpload(
457466 if (blobSize > s3BlobStore .bufferSizeInBytes ()) {
458467 throw new IllegalArgumentException ("Upload request size [" + blobSize + "] can't be larger than buffer size" );
459468 }
469+ // required to reset the stream for MD5 calculation
470+ if (input .markSupported () == false ) {
471+ throw new IllegalArgumentException ("input stream mark not supported" );
472+ }
460473
461474 final ObjectMetadata md = new ObjectMetadata ();
475+
476+ final byte [] md5 = md5DigestOfInputStream (input , Long .MAX_VALUE );
477+ input .reset ();
478+ md .setContentMD5 (Base64 .getEncoder ().encodeToString (md5 ));
479+
462480 md .setContentLength (blobSize );
463481 if (s3BlobStore .serverSideEncryption ()) {
464482 md .setSSEAlgorithm (ObjectMetadata .AES_256_SERVER_SIDE_ENCRYPTION );
@@ -485,7 +503,6 @@ void executeMultipartUpload(
485503 final InputStream input ,
486504 final long blobSize
487505 ) throws IOException {
488-
489506 ensureMultiPartUploadSize (blobSize );
490507 final long partSize = s3BlobStore .bufferSizeInBytes ();
491508 final Tuple <Long , Long > multiparts = numberOfMultiparts (blobSize , partSize );
@@ -498,6 +515,18 @@ void executeMultipartUpload(
498515 final long lastPartSize = multiparts .v2 ();
499516 assert blobSize == (((nbParts - 1 ) * partSize ) + lastPartSize ) : "blobSize does not match multipart sizes" ;
500517
518+ // required to reset the stream for MD5 calculation
519+ if (input .markSupported () == false ) {
520+ throw new IllegalArgumentException ("input stream mark not supported" );
521+ }
522+
523+ final byte [][] md5s = new byte [nbParts ][];
524+ for (int i = 0 ; i < nbParts - 1 ; i ++) {
525+ md5s [i ] = md5DigestOfInputStream (input , partSize );
526+ }
527+ md5s [nbParts - 1 ] = md5DigestOfInputStream (input , lastPartSize );
528+ input .reset ();
529+
501530 final SetOnce <String > uploadId = new SetOnce <>();
502531 final String bucketName = s3BlobStore .bucket ();
503532 boolean success = false ;
@@ -525,6 +554,7 @@ void executeMultipartUpload(
525554 i ,
526555 blobName ,
527556 lastPart ? lastPartSize : partSize ,
557+ md5s [i - 1 ],
528558 lastPart
529559 );
530560 bytesCount += uploadRequest .getPartSize ();
@@ -564,6 +594,28 @@ void executeMultipartUpload(
564594 }
565595 }
566596
597+ // Calculate the MD5 of up to remaining bytes of the given InputStream
598+ private byte [] md5DigestOfInputStream (final InputStream inputStream , long remaining ) throws IOException {
599+ try {
600+ final MessageDigest md5 = MessageDigest .getInstance ("MD5" );
601+ // update in chunks to bound memory usage while amortizing read cost
602+ byte [] buffer = new byte [65536 ];
603+ int bytesRead ;
604+ do {
605+ final int toRead = (int ) Math .min (remaining , buffer .length );
606+ bytesRead = inputStream .read (buffer , 0 , toRead );
607+ if (bytesRead > 0 ) {
608+ md5 .update (buffer , 0 , bytesRead );
609+ remaining -= bytesRead ;
610+ }
611+ } while (bytesRead > 0 );
612+
613+ return md5 .digest ();
614+ } catch (NoSuchAlgorithmException e ) {
615+ throw new IOException (e );
616+ }
617+ }
618+
567619 // non-static, package private for testing
568620 void ensureMultiPartUploadSize (final long blobSize ) {
569621 if (blobSize > MAX_FILE_SIZE_USING_MULTIPART .getBytes ()) {
0 commit comments