Skip to content

Commit ae752b6

Browse files
feat(s3stream): change the checksum algorithm to crc32c (#1860)
* feat(s3stream): change the checksum algorithm to crc32c Signed-off-by: SSpirits <[email protected]> * feat(s3stream): introduce aws crt Signed-off-by: SSpirits <[email protected]> * feat(s3stream): completely disable the MD5 checksum Signed-off-by: SSpirits <[email protected]> * feat(s3stream): support change checksum algorithm Signed-off-by: SSpirits <[email protected]> * fix(s3stream): make spotbugs happy Signed-off-by: SSpirits <[email protected]> --------- Signed-off-by: SSpirits <[email protected]>
1 parent bea20e3 commit ae752b6

File tree

4 files changed

+74
-14
lines changed

4 files changed

+74
-14
lines changed

build.gradle

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2073,6 +2073,7 @@ project(':s3stream') {
20732073
implementation 'com.yammer.metrics:metrics-core:2.2.0'
20742074
implementation 'commons-codec:commons-codec:1.17.0'
20752075
implementation 'org.hdrhistogram:HdrHistogram:2.2.2'
2076+
implementation 'software.amazon.awssdk.crt:aws-crt:0.30.8'
20762077

20772078
testImplementation 'org.slf4j:slf4j-simple:2.0.9'
20782079
testImplementation 'org.junit.jupiter:junit-jupiter:5.10.0'

s3stream/src/main/java/com/automq/stream/s3/operator/AbstractObjectStorage.java

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -919,10 +919,12 @@ public List<String> getErrorsMessages() {
919919
public static class ObjectStorageCompletedPart {
920920
private final int partNumber;
921921
private final String partId;
922+
private final String checkSum;
922923

923-
public ObjectStorageCompletedPart(int partNumber, String partId) {
924+
public ObjectStorageCompletedPart(int partNumber, String partId, String checkSum) {
924925
this.partNumber = partNumber;
925926
this.partId = partId;
927+
this.checkSum = checkSum;
926928
}
927929

928930
public int getPartNumber() {
@@ -932,5 +934,9 @@ public int getPartNumber() {
932934
public String getPartId() {
933935
return partId;
934936
}
937+
938+
public String getCheckSum() {
939+
return checkSum;
940+
}
935941
}
936942
}

s3stream/src/main/java/com/automq/stream/s3/operator/AwsObjectStorage.java

Lines changed: 63 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -19,7 +19,6 @@
1919
import io.netty.buffer.CompositeByteBuf;
2020
import io.netty.buffer.Unpooled;
2121
import io.netty.handler.ssl.OpenSsl;
22-
2322
import java.net.URI;
2423
import java.nio.charset.StandardCharsets;
2524
import java.time.Duration;
@@ -32,7 +31,6 @@
3231
import java.util.concurrent.CompletableFuture;
3332
import java.util.function.Supplier;
3433
import java.util.stream.Collectors;
35-
3634
import org.apache.commons.lang3.StringUtils;
3735
import org.apache.commons.lang3.tuple.Pair;
3836
import software.amazon.awssdk.auth.credentials.AnonymousCredentialsProvider;
@@ -51,6 +49,8 @@
5149
import software.amazon.awssdk.regions.Region;
5250
import software.amazon.awssdk.services.s3.S3AsyncClient;
5351
import software.amazon.awssdk.services.s3.S3AsyncClientBuilder;
52+
import software.amazon.awssdk.services.s3.model.ChecksumAlgorithm;
53+
import software.amazon.awssdk.services.s3.model.ChecksumMode;
5454
import software.amazon.awssdk.services.s3.model.CompleteMultipartUploadRequest;
5555
import software.amazon.awssdk.services.s3.model.CompletedMultipartUpload;
5656
import software.amazon.awssdk.services.s3.model.CompletedPart;
@@ -83,6 +83,7 @@ public class AwsObjectStorage extends AbstractObjectStorage {
8383
public static final String AUTH_TYPE_KEY = "authType";
8484
public static final String STATIC_AUTH_TYPE = "static";
8585
public static final String INSTANCE_AUTH_TYPE = "instance";
86+
public static final String CHECKSUM_ALGORITHM_KEY = "checksumAlgorithm";
8687

8788
// https://docs.aws.amazon.com/AmazonS3/latest/API/API_DeleteObjects.html
8889
// The maximum number of keys that can be deleted in a single request is 1000.
@@ -93,6 +94,8 @@ public class AwsObjectStorage extends AbstractObjectStorage {
9394
private final S3AsyncClient readS3Client;
9495
private final S3AsyncClient writeS3Client;
9596

97+
private final ChecksumAlgorithm checksumAlgorithm;
98+
9699
private volatile static InstanceProfileCredentialsProvider instanceProfileCredentialsProvider;
97100

98101
public AwsObjectStorage(BucketURI bucketURI, Map<String, String> tagging,
@@ -102,6 +105,13 @@ public AwsObjectStorage(BucketURI bucketURI, Map<String, String> tagging,
102105
this.bucket = bucketURI.bucket();
103106
this.tagging = tagging(tagging);
104107
List<AwsCredentialsProvider> credentialsProviders = credentialsProviders();
108+
109+
ChecksumAlgorithm checksumAlgorithm = ChecksumAlgorithm.fromValue(bucketURI.extensionString(CHECKSUM_ALGORITHM_KEY));
110+
if (checksumAlgorithm == null) {
111+
checksumAlgorithm = ChecksumAlgorithm.UNKNOWN_TO_SDK_VERSION;
112+
}
113+
this.checksumAlgorithm = checksumAlgorithm;
114+
105115
Supplier<S3AsyncClient> clientSupplier = () -> newS3Client(bucketURI.endpoint(), bucketURI.region(), bucketURI.extensionBool(PATH_STYLE_KEY, false), credentialsProviders, getMaxObjectStorageConcurrency());
106116
this.writeS3Client = clientSupplier.get();
107117
this.readS3Client = readWriteIsolate ? clientSupplier.get() : writeS3Client;
@@ -114,6 +124,7 @@ public AwsObjectStorage(S3AsyncClient s3Client, String bucket) {
114124
this.writeS3Client = s3Client;
115125
this.readS3Client = s3Client;
116126
this.tagging = null;
127+
this.checksumAlgorithm = ChecksumAlgorithm.UNKNOWN_TO_SDK_VERSION;
117128
}
118129

119130
public static Builder builder() {
@@ -144,9 +155,14 @@ static void checkDeleteObjectsResponse(DeleteObjectsResponse response) throws Ex
144155

145156
@Override
146157
CompletableFuture<ByteBuf> doRangeRead(ReadOptions options, String path, long start, long end) {
147-
GetObjectRequest request = GetObjectRequest.builder().bucket(bucket).key(path).range(range(start, end)).build();
158+
GetObjectRequest.Builder builder = GetObjectRequest.builder().bucket(bucket).key(path).range(range(start, end));
159+
160+
if (checksumAlgorithm != ChecksumAlgorithm.UNKNOWN_TO_SDK_VERSION) {
161+
builder.checksumMode(ChecksumMode.ENABLED);
162+
}
163+
148164
CompletableFuture<ByteBuf> cf = new CompletableFuture<>();
149-
readS3Client.getObject(request, AsyncResponseTransformer.toPublisher())
165+
readS3Client.getObject(builder.build(), AsyncResponseTransformer.toPublisher())
150166
.thenAccept(responsePublisher -> {
151167
CompositeByteBuf buf = ByteBufAlloc.compositeByteBuffer();
152168
responsePublisher.subscribe(bytes -> {
@@ -174,6 +190,11 @@ CompletableFuture<Void> doWrite(WriteOptions options, String path, ByteBuf data)
174190
if (null != tagging) {
175191
builder.tagging(tagging);
176192
}
193+
194+
if (checksumAlgorithm != ChecksumAlgorithm.UNKNOWN_TO_SDK_VERSION) {
195+
builder.checksumAlgorithm(checksumAlgorithm);
196+
}
197+
177198
PutObjectRequest request = builder.build();
178199
AsyncRequestBody body = AsyncRequestBody.fromByteBuffersUnsafe(data.nioBuffers());
179200
return writeS3Client.putObject(request, body).thenApply(rst -> null);
@@ -185,6 +206,11 @@ CompletableFuture<String> doCreateMultipartUpload(WriteOptions options, String p
185206
if (null != tagging) {
186207
builder.tagging(tagging);
187208
}
209+
210+
if (checksumAlgorithm != ChecksumAlgorithm.UNKNOWN_TO_SDK_VERSION) {
211+
builder.checksumAlgorithm(checksumAlgorithm);
212+
}
213+
188214
CreateMultipartUploadRequest request = builder.build();
189215
return writeS3Client.createMultipartUpload(request).thenApply(CreateMultipartUploadResponse::uploadId);
190216
}
@@ -193,10 +219,37 @@ CompletableFuture<String> doCreateMultipartUpload(WriteOptions options, String p
193219
CompletableFuture<ObjectStorageCompletedPart> doUploadPart(WriteOptions options, String path, String uploadId,
194220
int partNumber, ByteBuf part) {
195221
AsyncRequestBody body = AsyncRequestBody.fromByteBuffersUnsafe(part.nioBuffers());
196-
UploadPartRequest request = UploadPartRequest.builder().bucket(bucket).key(path).uploadId(uploadId)
197-
.partNumber(partNumber).build();
198-
return writeS3Client.uploadPart(request, body)
199-
.thenApply(resp -> new ObjectStorageCompletedPart(partNumber, resp.eTag()));
222+
UploadPartRequest.Builder builder = UploadPartRequest.builder()
223+
.bucket(bucket)
224+
.key(path)
225+
.uploadId(uploadId)
226+
.partNumber(partNumber);
227+
228+
if (checksumAlgorithm != ChecksumAlgorithm.UNKNOWN_TO_SDK_VERSION) {
229+
builder.checksumAlgorithm(checksumAlgorithm);
230+
}
231+
232+
return writeS3Client.uploadPart(builder.build(), body)
233+
.thenApply(resp -> {
234+
String checksum;
235+
switch (checksumAlgorithm) {
236+
case CRC32_C:
237+
checksum = resp.checksumCRC32C();
238+
break;
239+
case CRC32:
240+
checksum = resp.checksumCRC32();
241+
break;
242+
case SHA1:
243+
checksum = resp.checksumSHA1();
244+
break;
245+
case SHA256:
246+
checksum = resp.checksumSHA256();
247+
break;
248+
default:
249+
checksum = null;
250+
}
251+
return new ObjectStorageCompletedPart(partNumber, resp.eTag(), checksum);
252+
});
200253
}
201254

202255
@Override
@@ -210,14 +263,14 @@ CompletableFuture<ObjectStorageCompletedPart> doUploadPartCopy(WriteOptions opti
210263
.apiCallTimeout(Duration.ofMillis(options.apiCallAttemptTimeout())).build()
211264
)
212265
.build();
213-
return writeS3Client.uploadPartCopy(request).thenApply(resp -> new ObjectStorageCompletedPart(partNumber, resp.copyPartResult().eTag()));
266+
return writeS3Client.uploadPartCopy(request).thenApply(resp -> new ObjectStorageCompletedPart(partNumber, resp.copyPartResult().eTag(), resp.copyPartResult().checksumCRC32C()));
214267
}
215268

216269
@Override
217270
public CompletableFuture<Void> doCompleteMultipartUpload(WriteOptions options, String path, String uploadId,
218271
List<ObjectStorageCompletedPart> parts) {
219272
List<CompletedPart> completedParts = parts.stream()
220-
.map(part -> CompletedPart.builder().partNumber(part.getPartNumber()).eTag(part.getPartId()).build())
273+
.map(part -> CompletedPart.builder().partNumber(part.getPartNumber()).eTag(part.getPartId()).checksumCRC32C(part.getCheckSum()).build())
221274
.collect(Collectors.toList());
222275
CompletedMultipartUpload multipartUpload = CompletedMultipartUpload.builder().parts(completedParts).build();
223276
CompleteMultipartUploadRequest request = CompleteMultipartUploadRequest.builder().bucket(bucket).key(path).uploadId(uploadId).multipartUpload(multipartUpload).build();

s3stream/src/test/java/com/automq/stream/s3/operator/ProxyWriterTest.java

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -60,8 +60,8 @@ public void testWrite_onePart() {
6060
@Test
6161
public void testWrite_dataLargerThanMaxUploadSize() {
6262
when(operator.createMultipartUpload(any(), eq("testpath"))).thenReturn(CompletableFuture.completedFuture("test_upload_id"));
63-
when(operator.uploadPart(any(), eq("testpath"), eq("test_upload_id"), eq(1), any())).thenReturn(CompletableFuture.completedFuture(new AbstractObjectStorage.ObjectStorageCompletedPart(1, "etag1")));
64-
when(operator.uploadPart(any(), eq("testpath"), eq("test_upload_id"), eq(2), any())).thenReturn(CompletableFuture.completedFuture(new AbstractObjectStorage.ObjectStorageCompletedPart(2, "etag2")));
63+
when(operator.uploadPart(any(), eq("testpath"), eq("test_upload_id"), eq(1), any())).thenReturn(CompletableFuture.completedFuture(new AbstractObjectStorage.ObjectStorageCompletedPart(1, "etag1", "checksum1")));
64+
when(operator.uploadPart(any(), eq("testpath"), eq("test_upload_id"), eq(2), any())).thenReturn(CompletableFuture.completedFuture(new AbstractObjectStorage.ObjectStorageCompletedPart(2, "etag2", "checksum2")));
6565
when(operator.completeMultipartUpload(any(), eq("testpath"), eq("test_upload_id"), any())).thenReturn(CompletableFuture.completedFuture(null));
6666
writer.write(TestUtils.random(17 * 1024 * 1024));
6767
assertTrue(writer.hasBatchingPart());
@@ -80,7 +80,7 @@ public void testWrite_dataLargerThanMaxUploadSize() {
8080
public void testWrite_copyWrite() {
8181
when(operator.createMultipartUpload(any(), eq("testpath"))).thenReturn(CompletableFuture.completedFuture("test_upload_id"));
8282
when(operator.uploadPartCopy(any(), eq("test_src_path"), eq("testpath"), eq(0L), eq(15L * 1024 * 1024), eq("test_upload_id"), eq(1)))
83-
.thenReturn(CompletableFuture.completedFuture(new AbstractObjectStorage.ObjectStorageCompletedPart(1, "etag1")));
83+
.thenReturn(CompletableFuture.completedFuture(new AbstractObjectStorage.ObjectStorageCompletedPart(1, "etag1", "checksum1")));
8484
when(operator.completeMultipartUpload(any(), eq("testpath"), eq("test_upload_id"), any())).thenReturn(CompletableFuture.completedFuture(null));
8585

8686
S3ObjectMetadata s3ObjectMetadata = new S3ObjectMetadata(1, 15 * 1024 * 1024, S3ObjectType.STREAM);

0 commit comments

Comments
 (0)