Skip to content

Commit c137348

Browse files
authored
Fixed an issue that could cause checksum mismatch errors in S3 uploads. (#5836)
* Fixed an issue that could cause checksum mismatch errors in S3 uploads. This error is sometimes encountered when a customer uses: 1. The AsyncS3Client 2. A ChecksumAlgorithm of SHA1 or SHA256 (instead of the default CRC32) 3. Parallel uploads The root cause was the SDK using thread locals to cache the SHA1 or SHA256 message digest implementations. This meant that if a single event loop thread was processing multiple requests, those two requests would use the same digest implementation to calculate the checksum. * Fixed an issue where SignerUtils was using the internal class of another module. * Update clone() failure message. Move to LinkedBlockingDeque for DigestAlgorithm cache. * Added direct testing of the DigestAlgorithm cache. * Fixed a transient issue that could occur when the first attempt fully sends the payload, and then a retry happens with checksums enabled.
1 parent 70ae0dd commit c137348

File tree

15 files changed

+376
-241
lines changed

15 files changed

+376
-241
lines changed
Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,6 @@
1+
{
2+
"type": "bugfix",
3+
"category": "Amazon S3",
4+
"contributor": "",
5+
"description": "Fixed an issue that could cause checksum mismatch errors when performing parallel uploads with the async S3 client and the SHA1 or SHA256 checksum algorithms selected."
6+
}

core/checksums/pom.xml

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -63,6 +63,11 @@
6363
<artifactId>junit-jupiter</artifactId>
6464
<scope>test</scope>
6565
</dependency>
66+
<dependency>
67+
<groupId>org.assertj</groupId>
68+
<artifactId>assertj-core</artifactId>
69+
<scope>test</scope>
70+
</dependency>
6671
</dependencies>
6772

6873
<build>

core/checksums/src/main/java/software/amazon/awssdk/checksums/SdkChecksum.java

Lines changed: 5 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -21,9 +21,8 @@
2121
import software.amazon.awssdk.checksums.internal.Crc32Checksum;
2222
import software.amazon.awssdk.checksums.internal.Crc64NvmeChecksum;
2323
import software.amazon.awssdk.checksums.internal.CrcChecksumProvider;
24-
import software.amazon.awssdk.checksums.internal.Md5Checksum;
25-
import software.amazon.awssdk.checksums.internal.Sha1Checksum;
26-
import software.amazon.awssdk.checksums.internal.Sha256Checksum;
24+
import software.amazon.awssdk.checksums.internal.DigestAlgorithm;
25+
import software.amazon.awssdk.checksums.internal.DigestAlgorithmChecksum;
2726
import software.amazon.awssdk.checksums.spi.ChecksumAlgorithm;
2827

2928
/**
@@ -43,11 +42,11 @@ static SdkChecksum forAlgorithm(ChecksumAlgorithm algorithm) {
4342
case "CRC32":
4443
return new Crc32Checksum();
4544
case "SHA1":
46-
return new Sha1Checksum();
45+
return new DigestAlgorithmChecksum(DigestAlgorithm.SHA1);
4746
case "SHA256":
48-
return new Sha256Checksum();
47+
return new DigestAlgorithmChecksum(DigestAlgorithm.SHA256);
4948
case "MD5":
50-
return new Md5Checksum();
49+
return new DigestAlgorithmChecksum(DigestAlgorithm.MD5);
5150
case "CRC64NVME":
5251
return new Crc64NvmeChecksum();
5352
default:

core/checksums/src/main/java/software/amazon/awssdk/checksums/internal/DigestAlgorithm.java

Lines changed: 81 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -17,51 +17,115 @@
1717

1818
import java.security.MessageDigest;
1919
import java.security.NoSuchAlgorithmException;
20+
import java.util.Deque;
21+
import java.util.concurrent.LinkedBlockingDeque;
22+
import java.util.function.Supplier;
2023
import software.amazon.awssdk.annotations.SdkInternalApi;
24+
import software.amazon.awssdk.annotations.SdkTestInternalApi;
25+
import software.amazon.awssdk.utils.SdkAutoCloseable;
2126

2227
@SdkInternalApi
2328
public enum DigestAlgorithm {
24-
2529
SHA1("SHA-1"),
30+
2631
MD5("MD5"),
2732
SHA256("SHA-256")
2833
;
2934

35+
private static final Supplier<MessageDigest> CLOSED_DIGEST = () -> {
36+
throw new IllegalStateException("This message digest is closed.");
37+
};
38+
39+
private static final int MAX_CACHED_DIGESTS = 10_000;
3040
private final String algorithmName;
31-
private final DigestThreadLocal digestReference;
41+
private final Deque<MessageDigest> digestCache = new LinkedBlockingDeque<>(MAX_CACHED_DIGESTS); // LIFO
3242

3343
DigestAlgorithm(String algorithmName) {
3444
this.algorithmName = algorithmName;
35-
digestReference = new DigestThreadLocal(algorithmName);
3645
}
3746

3847
public String getAlgorithmName() {
3948
return algorithmName;
4049
}
4150

4251
/**
43-
* Returns the thread local reference for the {@link MessageDigest} algorithm
52+
* Returns a {@link CloseableMessageDigest} to use for this algorithm.
4453
*/
45-
public MessageDigest getDigest() {
46-
MessageDigest digest = digestReference.get();
47-
digest.reset();
48-
return digest;
54+
public CloseableMessageDigest getDigest() {
55+
MessageDigest digest = digestCache.pollFirst();
56+
if (digest != null) {
57+
digest.reset();
58+
return new CloseableMessageDigest(digest);
59+
}
60+
return new CloseableMessageDigest(newDigest());
4961
}
5062

51-
private static class DigestThreadLocal extends ThreadLocal<MessageDigest> {
52-
private final String algorithmName;
63+
private MessageDigest newDigest() {
64+
try {
65+
return MessageDigest.getInstance(algorithmName);
66+
} catch (NoSuchAlgorithmException e) {
67+
throw new RuntimeException("Unable to fetch message digest instance for Algorithm "
68+
+ algorithmName + ": " + e.getMessage(), e);
69+
}
70+
}
71+
72+
@SdkTestInternalApi
73+
static void clearCaches() {
74+
for (DigestAlgorithm value : values()) {
75+
value.digestCache.clear();
76+
}
77+
}
78+
79+
public final class CloseableMessageDigest implements SdkAutoCloseable, Cloneable {
80+
81+
private Supplier<MessageDigest> digest;
82+
private byte[] messageDigest;
83+
84+
private CloseableMessageDigest(MessageDigest digest) {
85+
this.digest = () -> digest;
86+
}
87+
88+
/**
89+
* Retrieve the message digest instance.
90+
*/
91+
public MessageDigest messageDigest() {
92+
return digest.get();
93+
}
94+
95+
/**
96+
* Retrieve the message digest bytes. This will close the message digest when invoked. This is because the underlying
97+
* message digest is reset on read, and we'd rather fail future interactions with the digest than act on the wrong data.
98+
*/
99+
public byte[] digest() {
100+
if (messageDigest != null) {
101+
return messageDigest;
102+
}
103+
messageDigest = messageDigest().digest();
104+
close();
105+
return messageDigest;
106+
}
107+
108+
/**
109+
* Release this message digest back to the cache. Once released, you must not use the digest anymore.
110+
*/
111+
@Override
112+
public void close() {
113+
if (digest == CLOSED_DIGEST) {
114+
return;
115+
}
116+
117+
// Drop this digest is the cache is full.
118+
digestCache.offerFirst(digest.get());
53119

54-
DigestThreadLocal(String algorithmName) {
55-
this.algorithmName = algorithmName;
120+
digest = CLOSED_DIGEST;
56121
}
57122

58123
@Override
59-
protected MessageDigest initialValue() {
124+
public CloseableMessageDigest clone() {
60125
try {
61-
return MessageDigest.getInstance(algorithmName);
62-
} catch (NoSuchAlgorithmException e) {
63-
throw new RuntimeException("Unable to fetch message digest instance for Algorithm "
64-
+ algorithmName + ": " + e.getMessage(), e);
126+
return new CloseableMessageDigest((MessageDigest) digest.get().clone());
127+
} catch (CloneNotSupportedException e) {
128+
throw new IllegalStateException("Clone was not supported by this digest type.", e);
65129
}
66130
}
67131
}

core/checksums/src/main/java/software/amazon/awssdk/checksums/internal/Md5Checksum.java renamed to core/checksums/src/main/java/software/amazon/awssdk/checksums/internal/DigestAlgorithmChecksum.java

Lines changed: 23 additions & 27 deletions
Original file line numberDiff line numberDiff line change
@@ -15,32 +15,39 @@
1515

1616
package software.amazon.awssdk.checksums.internal;
1717

18-
import java.security.MessageDigest;
1918
import software.amazon.awssdk.annotations.SdkInternalApi;
2019
import software.amazon.awssdk.checksums.SdkChecksum;
20+
import software.amazon.awssdk.checksums.internal.DigestAlgorithm.CloseableMessageDigest;
2121

2222
/**
23-
* Implementation of {@link SdkChecksum} to calculate an MD5 checksum.
23+
* An implementation of {@link SdkChecksum} that uses a {@link DigestAlgorithm}.
2424
*/
2525
@SdkInternalApi
26-
public class Md5Checksum implements SdkChecksum {
26+
public class DigestAlgorithmChecksum implements SdkChecksum {
2727

28-
private MessageDigest digest;
28+
private final DigestAlgorithm algorithm;
2929

30-
private MessageDigest digestLastMarked;
30+
private CloseableMessageDigest digest;
3131

32-
public Md5Checksum() {
33-
this.digest = getDigest();
32+
private CloseableMessageDigest digestLastMarked;
33+
34+
public DigestAlgorithmChecksum(DigestAlgorithm algorithm) {
35+
this.algorithm = algorithm;
36+
this.digest = newDigest();
37+
}
38+
39+
private CloseableMessageDigest newDigest() {
40+
return algorithm.getDigest();
3441
}
3542

3643
@Override
3744
public void update(int b) {
38-
digest.update((byte) b);
45+
digest.messageDigest().update((byte) b);
3946
}
4047

4148
@Override
4249
public void update(byte[] b, int off, int len) {
43-
digest.update(b, off, len);
50+
digest.messageDigest().update(b, off, len);
4451
}
4552

4653
@Override
@@ -50,15 +57,12 @@ public long getValue() {
5057

5158
@Override
5259
public void reset() {
53-
digest = (digestLastMarked == null)
54-
// This is necessary so that should there be a reset without a
55-
// preceding mark, the MD5 would still be computed correctly.
56-
? getDigest()
57-
: cloneFrom(digestLastMarked);
58-
}
59-
60-
private MessageDigest getDigest() {
61-
return DigestAlgorithm.MD5.getDigest();
60+
digest.close();
61+
if (digestLastMarked == null) {
62+
digest = newDigest();
63+
} else {
64+
digest = digestLastMarked;
65+
}
6266
}
6367

6468
@Override
@@ -68,14 +72,6 @@ public byte[] getChecksumBytes() {
6872

6973
@Override
7074
public void mark(int readLimit) {
71-
digestLastMarked = cloneFrom(digest);
72-
}
73-
74-
private MessageDigest cloneFrom(MessageDigest from) {
75-
try {
76-
return (MessageDigest) from.clone();
77-
} catch (CloneNotSupportedException e) { // should never occur
78-
throw new IllegalStateException("unexpected", e);
79-
}
75+
digestLastMarked = digest.clone();
8076
}
8177
}

core/checksums/src/main/java/software/amazon/awssdk/checksums/internal/Sha1Checksum.java

Lines changed: 0 additions & 81 deletions
This file was deleted.

0 commit comments

Comments
 (0)