diff --git a/core/http-auth-aws/src/main/java/software/amazon/awssdk/http/auth/aws/internal/signer/AwsChunkedV4PayloadSigner.java b/core/http-auth-aws/src/main/java/software/amazon/awssdk/http/auth/aws/internal/signer/AwsChunkedV4PayloadSigner.java index 391894b28b39..c1fbdbbb42f4 100644 --- a/core/http-auth-aws/src/main/java/software/amazon/awssdk/http/auth/aws/internal/signer/AwsChunkedV4PayloadSigner.java +++ b/core/http-auth-aws/src/main/java/software/amazon/awssdk/http/auth/aws/internal/signer/AwsChunkedV4PayloadSigner.java @@ -31,6 +31,8 @@ import java.util.ArrayList; import java.util.Collections; import java.util.List; +import java.util.concurrent.CompletableFuture; +import java.util.stream.Collectors; import org.reactivestreams.Publisher; import software.amazon.awssdk.annotations.SdkInternalApi; import software.amazon.awssdk.checksums.SdkChecksum; @@ -40,11 +42,13 @@ import software.amazon.awssdk.http.SdkHttpRequest; import software.amazon.awssdk.http.auth.aws.internal.signer.chunkedencoding.ChecksumTrailerProvider; import software.amazon.awssdk.http.auth.aws.internal.signer.chunkedencoding.ChunkedEncodedInputStream; +import software.amazon.awssdk.http.auth.aws.internal.signer.chunkedencoding.ChunkedEncodedPublisher; import software.amazon.awssdk.http.auth.aws.internal.signer.chunkedencoding.SigV4ChunkExtensionProvider; import software.amazon.awssdk.http.auth.aws.internal.signer.chunkedencoding.SigV4TrailerProvider; import software.amazon.awssdk.http.auth.aws.internal.signer.chunkedencoding.TrailerProvider; import software.amazon.awssdk.http.auth.aws.internal.signer.io.ChecksumInputStream; import software.amazon.awssdk.http.auth.aws.internal.signer.io.ResettableContentStreamProvider; +import software.amazon.awssdk.http.auth.aws.internal.signer.io.UnbufferedChecksumSubscriber; import software.amazon.awssdk.utils.BinaryUtils; import software.amazon.awssdk.utils.Pair; import software.amazon.awssdk.utils.Validate; @@ -116,8 +120,44 @@ public ContentStreamProvider sign(ContentStreamProvider payload, V4RequestSignin @Override public Publisher signAsync(Publisher payload, V4RequestSigningResult requestSigningResult) { - // TODO(sra-identity-and-auth): implement this first and remove addFlexibleChecksumInTrailer logic in HttpChecksumStage - throw new UnsupportedOperationException(); + ChunkedEncodedPublisher.Builder chunkedStreamBuilder = ChunkedEncodedPublisher.builder() + .publisher(payload) + .chunkSize(chunkSize) + .addEmptyTrailingChunk(true); + + preExistingTrailers.forEach(t -> chunkedStreamBuilder.addTrailer(() -> t)); + + SdkHttpRequest.Builder request = requestSigningResult.getSignedRequest(); + + String checksum = request.firstMatchingHeader(X_AMZ_CONTENT_SHA256).orElseThrow( + () -> new IllegalArgumentException(X_AMZ_CONTENT_SHA256 + " must be set!") + ); + + switch (checksum) { + case STREAMING_SIGNED_PAYLOAD: { + RollingSigner rollingSigner = new RollingSigner(requestSigningResult.getSigningKey(), + requestSigningResult.getSignature()); + chunkedStreamBuilder.addExtension(new SigV4ChunkExtensionProvider(rollingSigner, credentialScope)); + break; + } + case STREAMING_UNSIGNED_PAYLOAD_TRAILER: + setupChecksumTrailerIfNeeded(chunkedStreamBuilder); + break; + case STREAMING_SIGNED_PAYLOAD_TRAILER: { + setupChecksumTrailerIfNeeded(chunkedStreamBuilder); + RollingSigner rollingSigner = new RollingSigner(requestSigningResult.getSigningKey(), + requestSigningResult.getSignature()); + chunkedStreamBuilder.addExtension(new SigV4ChunkExtensionProvider(rollingSigner, credentialScope)); + chunkedStreamBuilder.addTrailer( + new SigV4TrailerProvider(chunkedStreamBuilder.trailers(), rollingSigner, credentialScope) + ); + break; + } + default: + throw new UnsupportedOperationException(); + } + + return chunkedStreamBuilder.build(); } @Override @@ -127,27 +167,66 @@ public void beforeSigning(SdkHttpRequest.Builder request, ContentStreamProvider setupPreExistingTrailers(request); // pre-existing trailers + encodedContentLength = calculateEncodedContentLength(request, contentLength); + + if (checksumAlgorithm != null) { + String checksumHeaderName = checksumHeaderName(checksumAlgorithm); + request.appendHeader(X_AMZ_TRAILER, checksumHeaderName); + } + request.putHeader(Header.CONTENT_LENGTH, Long.toString(encodedContentLength)); + request.appendHeader(CONTENT_ENCODING, AWS_CHUNKED); + } + + @Override + public CompletableFuture>> beforeSigningAsync( + SdkHttpRequest.Builder request, Publisher payload) { + return moveContentLength(request, payload) + .thenApply(p -> { + SdkHttpRequest.Builder requestBuilder = p.left(); + setupPreExistingTrailers(requestBuilder); + + long decodedContentLength = requestBuilder.firstMatchingHeader("x-amz-decoded-content-length") + .map(Long::parseLong) + // should not happen, this header is added by moveContentLength + .orElseThrow(() -> new RuntimeException("x-amz-decoded-content-length " + + "header not present")); + + long encodedContentLength = calculateEncodedContentLength(request, decodedContentLength); + + if (checksumAlgorithm != null) { + String checksumHeaderName = checksumHeaderName(checksumAlgorithm); + request.appendHeader(X_AMZ_TRAILER, checksumHeaderName); + } + request.putHeader(Header.CONTENT_LENGTH, Long.toString(encodedContentLength)); + request.appendHeader(CONTENT_ENCODING, AWS_CHUNKED); + return Pair.of(requestBuilder, p.right()); + }); + } + + private long calculateEncodedContentLength(SdkHttpRequest.Builder requestBuilder, long decodedContentLength) { + long encodedContentLength = 0; + encodedContentLength += calculateExistingTrailersLength(); - String checksum = request.firstMatchingHeader(X_AMZ_CONTENT_SHA256).orElseThrow( + String checksum = requestBuilder.firstMatchingHeader(X_AMZ_CONTENT_SHA256).orElseThrow( () -> new IllegalArgumentException(X_AMZ_CONTENT_SHA256 + " must be set!") ); switch (checksum) { case STREAMING_SIGNED_PAYLOAD: { long extensionsLength = 81; // ;chunk-signature: - encodedContentLength += calculateChunksLength(contentLength, extensionsLength); + encodedContentLength += calculateChunksLength(decodedContentLength, extensionsLength); break; } case STREAMING_UNSIGNED_PAYLOAD_TRAILER: if (checksumAlgorithm != null) { encodedContentLength += calculateChecksumTrailerLength(checksumHeaderName(checksumAlgorithm)); } - encodedContentLength += calculateChunksLength(contentLength, 0); + encodedContentLength += calculateChunksLength(decodedContentLength, 0); break; case STREAMING_SIGNED_PAYLOAD_TRAILER: { long extensionsLength = 81; // ;chunk-signature: - encodedContentLength += calculateChunksLength(contentLength, extensionsLength); + encodedContentLength += calculateChunksLength(decodedContentLength, extensionsLength); if (checksumAlgorithm != null) { encodedContentLength += calculateChecksumTrailerLength(checksumHeaderName(checksumAlgorithm)); } @@ -161,12 +240,7 @@ public void beforeSigning(SdkHttpRequest.Builder request, ContentStreamProvider // terminating \r\n encodedContentLength += 2; - if (checksumAlgorithm != null) { - String checksumHeaderName = checksumHeaderName(checksumAlgorithm); - request.appendHeader(X_AMZ_TRAILER, checksumHeaderName); - } - request.putHeader(Header.CONTENT_LENGTH, Long.toString(encodedContentLength)); - request.appendHeader(CONTENT_ENCODING, AWS_CHUNKED); + return encodedContentLength; } /** @@ -271,6 +345,24 @@ private void setupChecksumTrailerIfNeeded(ChunkedEncodedInputStream.Builder buil builder.inputStream(checksumInputStream).addTrailer(checksumTrailer); } + private void setupChecksumTrailerIfNeeded(ChunkedEncodedPublisher.Builder builder) { + if (checksumAlgorithm != null) { + String checksumHeaderName = checksumHeaderName(checksumAlgorithm); + SdkChecksum sdkChecksum = fromChecksumAlgorithm(checksumAlgorithm); + Publisher checksummedPayload = computeChecksum(builder.publisher(), sdkChecksum); + + builder.publisher(checksummedPayload); + + TrailerProvider checksumTrailer = new ChecksumTrailerProvider(sdkChecksum, checksumHeaderName); + builder.addTrailer(checksumTrailer); + } + } + + private Publisher computeChecksum(Publisher publisher, SdkChecksum checksum) { + return subscriber -> publisher.subscribe( + new UnbufferedChecksumSubscriber(Collections.singletonList(checksum), subscriber)); + } + static class Builder { private CredentialScope credentialScope; private Integer chunkSize; diff --git a/core/http-auth-aws/src/main/java/software/amazon/awssdk/http/auth/aws/internal/signer/DefaultAwsV4HttpSigner.java b/core/http-auth-aws/src/main/java/software/amazon/awssdk/http/auth/aws/internal/signer/DefaultAwsV4HttpSigner.java index 8ad79a12007e..198d82df0f0d 100644 --- a/core/http-auth-aws/src/main/java/software/amazon/awssdk/http/auth/aws/internal/signer/DefaultAwsV4HttpSigner.java +++ b/core/http-auth-aws/src/main/java/software/amazon/awssdk/http/auth/aws/internal/signer/DefaultAwsV4HttpSigner.java @@ -25,11 +25,13 @@ import static software.amazon.awssdk.http.auth.aws.internal.signer.util.SignerConstant.PRESIGN_URL_MAX_EXPIRATION_DURATION; import static software.amazon.awssdk.http.auth.aws.internal.signer.util.SignerConstant.X_AMZ_TRAILER; +import java.nio.ByteBuffer; import java.time.Clock; import java.time.Duration; import java.time.Instant; import java.util.concurrent.CompletableFuture; import java.util.function.Function; +import org.reactivestreams.Publisher; import software.amazon.awssdk.annotations.SdkInternalApi; import software.amazon.awssdk.http.ContentStreamProvider; import software.amazon.awssdk.http.SdkHttpRequest; @@ -70,7 +72,7 @@ public CompletableFuture signAsync(AsyncSignRequest request) { @@ -182,6 +184,8 @@ private static V4PayloadSigner v4PayloadAsyncSigner( boolean isPayloadSigning = request.requireProperty(PAYLOAD_SIGNING_ENABLED, true); boolean isEventStreaming = isEventStreaming(request.request()); boolean isChunkEncoding = request.requireProperty(CHUNK_ENCODING_ENABLED, false); + boolean isTrailing = request.request().firstMatchingHeader(X_AMZ_TRAILER).isPresent(); + boolean isFlexible = request.hasProperty(CHECKSUM_ALGORITHM) && !hasChecksumHeader(request); if (isEventStreaming) { if (isPayloadSigning) { @@ -194,12 +198,15 @@ private static V4PayloadSigner v4PayloadAsyncSigner( throw new UnsupportedOperationException("Unsigned payload is not supported with event-streaming."); } - if (isChunkEncoding && isPayloadSigning) { - // TODO(sra-identity-and-auth): We need to implement aws-chunk content-encoding for async. - // For now, we basically have to treat this as an unsigned case because there are existing s3 use-cases for - // Unsigned-payload + HTTP. These requests SHOULD be signed-payload, but are not pre-SRA, hence the problem. This - // will be taken care of in HttpChecksumStage for now, so we shouldn't throw an unsupported exception here, we - // should just fall through to the default since it will already encoded by the time it gets here. + if (isChunkEncoding) { + if (!isPayloadSigning) { + return AwsChunkedV4PayloadSigner.builder() + .credentialScope(properties.getCredentialScope()) + .chunkSize(DEFAULT_CHUNK_SIZE_IN_BYTES) + .checksumAlgorithm(request.property(CHECKSUM_ALGORITHM)) + .build(); + } + // TODO: support payload signing with chunked encoding return V4PayloadSigner.create(); } @@ -230,19 +237,26 @@ private static SignedRequest doSign(SignRequest doSign(AsyncSignRequest request, - Checksummer checksummer, - V4RequestSigner requestSigner, - V4PayloadSigner payloadSigner) { + private static CompletableFuture doSignAsync(AsyncSignRequest request, + Checksummer checksummer, + V4RequestSigner requestSigner, + V4PayloadSigner payloadSigner) { SdkHttpRequest.Builder requestBuilder = request.request().toBuilder(); - return checksummer.checksum(request.payload().orElse(null), requestBuilder) - .thenApply(payload -> { - V4RequestSigningResult requestSigningResultFuture = requestSigner.sign(requestBuilder); + Publisher payload = request.payload().orElse(null); + + return checksummer.checksum(payload, requestBuilder) + .thenCompose(checksummedPayload -> + payloadSigner.beforeSigningAsync(requestBuilder, checksummedPayload)) + .thenApply(p -> { + SdkHttpRequest.Builder requestToSign = p.left(); + Publisher payloadToSign = p.right(); + + V4RequestSigningResult requestSigningResult = requestSigner.sign(requestToSign); return AsyncSignedRequest.builder() - .request(requestSigningResultFuture.getSignedRequest().build()) - .payload(payloadSigner.signAsync(payload, requestSigningResultFuture)) + .request(requestSigningResult.getSignedRequest().build()) + .payload(payloadSigner.signAsync(payloadToSign, requestSigningResult)) .build(); }); } diff --git a/core/http-auth-aws/src/main/java/software/amazon/awssdk/http/auth/aws/internal/signer/V4PayloadSigner.java b/core/http-auth-aws/src/main/java/software/amazon/awssdk/http/auth/aws/internal/signer/V4PayloadSigner.java index 189fbe420085..b396083df859 100644 --- a/core/http-auth-aws/src/main/java/software/amazon/awssdk/http/auth/aws/internal/signer/V4PayloadSigner.java +++ b/core/http-auth-aws/src/main/java/software/amazon/awssdk/http/auth/aws/internal/signer/V4PayloadSigner.java @@ -16,10 +16,12 @@ package software.amazon.awssdk.http.auth.aws.internal.signer; import java.nio.ByteBuffer; +import java.util.concurrent.CompletableFuture; import org.reactivestreams.Publisher; import software.amazon.awssdk.annotations.SdkInternalApi; import software.amazon.awssdk.http.ContentStreamProvider; import software.amazon.awssdk.http.SdkHttpRequest; +import software.amazon.awssdk.utils.Pair; /** * An interface for defining how to sign a payload via SigV4. @@ -48,4 +50,9 @@ static V4PayloadSigner create() { */ default void beforeSigning(SdkHttpRequest.Builder request, ContentStreamProvider payload) { } + + default CompletableFuture>> beforeSigningAsync( + SdkHttpRequest.Builder request, Publisher payload) { + return CompletableFuture.completedFuture(Pair.of(request, payload)); + } } diff --git a/core/http-auth-aws/src/main/java/software/amazon/awssdk/http/auth/aws/internal/signer/chunkedencoding/ChunkedEncodedPublisher.java b/core/http-auth-aws/src/main/java/software/amazon/awssdk/http/auth/aws/internal/signer/chunkedencoding/ChunkedEncodedPublisher.java index b4805a78dca2..f0c47a6b7f36 100644 --- a/core/http-auth-aws/src/main/java/software/amazon/awssdk/http/auth/aws/internal/signer/chunkedencoding/ChunkedEncodedPublisher.java +++ b/core/http-auth-aws/src/main/java/software/amazon/awssdk/http/auth/aws/internal/signer/chunkedencoding/ChunkedEncodedPublisher.java @@ -33,7 +33,7 @@ /** * An implementation of chunk-transfer encoding, but by wrapping a {@link Publisher} of {@link ByteBuffer}. This implementation - * supports chunk-headers, chunk-extensions. + * supports chunk-headers, chunk-extensions, and trailer-part. *

* Per RFC-7230, a chunk-transfer encoded message is * defined as: @@ -153,8 +153,7 @@ private ByteBuffer encodeChunk(ByteBuffer byteBuffer) { } int trailerLen = trailerData.stream() - // + 2 for each CRLF that ends the header-field - .mapToInt(t -> t.remaining() + 2) + .mapToInt(t -> t.remaining() + CRLF.length) .sum(); int encodedLen = chunkSizeHex.length + extensionsLength + CRLF.length + contentLen + trailerLen + CRLF.length; @@ -188,6 +187,7 @@ private ByteBuffer encodeChunk(ByteBuffer byteBuffer) { encoded.put(t); encoded.put(CRLF); }); + // empty line ends the request body encoded.put(CRLF); } @@ -304,6 +304,10 @@ public Builder publisher(Publisher publisher) { return this; } + public Publisher publisher() { + return publisher; + } + public Builder chunkSize(int chunkSize) { this.chunkSize = chunkSize; return this; @@ -324,6 +328,10 @@ public Builder addTrailer(TrailerProvider trailerProvider) { return this; } + public List trailers() { + return trailers; + } + public ChunkedEncodedPublisher build() { return new ChunkedEncodedPublisher(this); } diff --git a/core/http-auth-aws/src/main/java/software/amazon/awssdk/http/auth/aws/internal/signer/io/UnbufferedChecksumSubscriber.java b/core/http-auth-aws/src/main/java/software/amazon/awssdk/http/auth/aws/internal/signer/io/UnbufferedChecksumSubscriber.java new file mode 100644 index 000000000000..1d094782004a --- /dev/null +++ b/core/http-auth-aws/src/main/java/software/amazon/awssdk/http/auth/aws/internal/signer/io/UnbufferedChecksumSubscriber.java @@ -0,0 +1,68 @@ +/* + * Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved. + * + * Licensed under the Apache License, Version 2.0 (the "License"). + * You may not use this file except in compliance with the License. + * A copy of the License is located at + * + * http://aws.amazon.com/apache2.0 + * + * or in the "license" file accompanying this file. This file is distributed + * on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either + * express or implied. See the License for the specific language governing + * permissions and limitations under the License. + */ + +package software.amazon.awssdk.http.auth.aws.internal.signer.io; + +import java.nio.ByteBuffer; +import java.util.ArrayList; +import java.util.List; +import org.reactivestreams.Subscriber; +import org.reactivestreams.Subscription; +import software.amazon.awssdk.annotations.SdkInternalApi; +import software.amazon.awssdk.checksums.SdkChecksum; + +/** + * A decorating {@code Subscriber} that updates a list of {@code SdkChecksum}s with the data of each buffer given to {@code + * onNext}. + *

+ * This is "unbuffered", as opposed to {@link ChecksumSubscriber} which does buffer the data. + */ +@SdkInternalApi +public class UnbufferedChecksumSubscriber implements Subscriber { + private final List checksums; + private final Subscriber wrapped; + + public UnbufferedChecksumSubscriber(List checksums, Subscriber wrapped) { + this.checksums = new ArrayList<>(checksums); + this.wrapped = wrapped; + } + + @Override + public void onSubscribe(Subscription subscription) { + if (subscription == null) { + throw new NullPointerException("subscription is null"); + } + wrapped.onSubscribe(subscription); + } + + @Override + public void onNext(ByteBuffer byteBuffer) { + checksums.forEach(ck -> ck.update(byteBuffer.duplicate())); + wrapped.onNext(byteBuffer); + } + + @Override + public void onError(Throwable throwable) { + if (throwable == null) { + throw new NullPointerException("throwable is null"); + } + wrapped.onError(throwable); + } + + @Override + public void onComplete() { + wrapped.onComplete(); + } +} diff --git a/core/http-auth-aws/src/main/java/software/amazon/awssdk/http/auth/aws/internal/signer/util/LengthCalculatingSubscriber.java b/core/http-auth-aws/src/main/java/software/amazon/awssdk/http/auth/aws/internal/signer/util/LengthCalculatingSubscriber.java new file mode 100644 index 000000000000..54010862855f --- /dev/null +++ b/core/http-auth-aws/src/main/java/software/amazon/awssdk/http/auth/aws/internal/signer/util/LengthCalculatingSubscriber.java @@ -0,0 +1,56 @@ +/* + * Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved. + * + * Licensed under the Apache License, Version 2.0 (the "License"). + * You may not use this file except in compliance with the License. + * A copy of the License is located at + * + * http://aws.amazon.com/apache2.0 + * + * or in the "license" file accompanying this file. This file is distributed + * on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either + * express or implied. See the License for the specific language governing + * permissions and limitations under the License. + */ + +package software.amazon.awssdk.http.auth.aws.internal.signer.util; + +import java.nio.ByteBuffer; +import java.util.concurrent.CompletableFuture; +import org.reactivestreams.Subscriber; +import org.reactivestreams.Subscription; + +class LengthCalculatingSubscriber implements Subscriber { + private final CompletableFuture contentLengthFuture = new CompletableFuture<>(); + private Subscription subscription; + private long length = 0; + + @Override + public void onSubscribe(Subscription subscription) { + if (this.subscription == null) { + this.subscription = subscription; + this.subscription.request(Long.MAX_VALUE); + } else { + subscription.cancel(); + } + } + + @Override + public void onNext(ByteBuffer byteBuffer) { + length += byteBuffer.remaining(); + } + + @Override + public void onError(Throwable throwable) { + contentLengthFuture.completeExceptionally(throwable); + } + + @Override + public void onComplete() { + contentLengthFuture.complete(length); + } + + public CompletableFuture contentLengthFuture() { + return contentLengthFuture; + } +} diff --git a/core/http-auth-aws/src/main/java/software/amazon/awssdk/http/auth/aws/internal/signer/util/SignerUtils.java b/core/http-auth-aws/src/main/java/software/amazon/awssdk/http/auth/aws/internal/signer/util/SignerUtils.java index e4e0b711eb9e..34a0dcc33e36 100644 --- a/core/http-auth-aws/src/main/java/software/amazon/awssdk/http/auth/aws/internal/signer/util/SignerUtils.java +++ b/core/http-auth-aws/src/main/java/software/amazon/awssdk/http/auth/aws/internal/signer/util/SignerUtils.java @@ -26,8 +26,12 @@ import java.time.ZoneId; import java.time.format.DateTimeFormatter; import java.util.Optional; +import java.util.concurrent.CompletableFuture; import javax.crypto.Mac; import javax.crypto.spec.SecretKeySpec; +import org.reactivestreams.Publisher; +import org.reactivestreams.Subscriber; +import org.reactivestreams.Subscription; import software.amazon.awssdk.annotations.SdkInternalApi; import software.amazon.awssdk.checksums.SdkChecksum; import software.amazon.awssdk.http.ContentStreamProvider; @@ -37,6 +41,7 @@ import software.amazon.awssdk.identity.spi.AwsCredentialsIdentity; import software.amazon.awssdk.utils.BinaryUtils; import software.amazon.awssdk.utils.Logger; +import software.amazon.awssdk.utils.Pair; import software.amazon.awssdk.utils.http.SdkHttpUtils; /** @@ -221,6 +226,39 @@ public static long moveContentLength(SdkHttpRequest.Builder request, ContentStre return contentLength; } + /** + * Move `Content-Length` to `x-amz-decoded-content-length` if not already present. If `Content-Length` is not present, then + * the payload is read in its entirety to calculate the length. + */ + public static CompletableFuture>> moveContentLength( + SdkHttpRequest.Builder request, Publisher contentPublisher) { + Optional decodedContentLength = request.firstMatchingHeader(X_AMZ_DECODED_CONTENT_LENGTH); + + if (decodedContentLength.isPresent()) { + request.removeHeader(Header.CONTENT_LENGTH); + return CompletableFuture.completedFuture(Pair.of(request, contentPublisher)); + } + + CompletableFuture contentLengthFuture; + + Optional contentLengthFromHeader = + request.firstMatchingHeader(Header.CONTENT_LENGTH); + if (contentLengthFromHeader.isPresent()) { + long contentLength = Long.parseLong(contentLengthFromHeader.get()); + contentLengthFuture = CompletableFuture.completedFuture(contentLength); + } else { + LengthCalculatingSubscriber lengthCalculatingSubscriber = new LengthCalculatingSubscriber(); + contentPublisher.subscribe(lengthCalculatingSubscriber); + contentLengthFuture = lengthCalculatingSubscriber.contentLengthFuture(); + } + + return contentLengthFuture.thenApply(cl -> { + request.putHeader(X_AMZ_DECODED_CONTENT_LENGTH, String.valueOf(cl)) + .removeHeader(Header.CONTENT_LENGTH); + return Pair.of(request, contentPublisher); + }); + } + public static InputStream getBinaryRequestPayloadStream(ContentStreamProvider streamProvider) { try { if (streamProvider == null) { diff --git a/core/http-auth-aws/src/test/java/software/amazon/awssdk/http/auth/aws/internal/signer/io/UnbufferedChecksumSubscriberTckTest.java b/core/http-auth-aws/src/test/java/software/amazon/awssdk/http/auth/aws/internal/signer/io/UnbufferedChecksumSubscriberTckTest.java new file mode 100644 index 000000000000..72979d4c5a53 --- /dev/null +++ b/core/http-auth-aws/src/test/java/software/amazon/awssdk/http/auth/aws/internal/signer/io/UnbufferedChecksumSubscriberTckTest.java @@ -0,0 +1,44 @@ +/* + * Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved. + * + * Licensed under the Apache License, Version 2.0 (the "License"). + * You may not use this file except in compliance with the License. + * A copy of the License is located at + * + * http://aws.amazon.com/apache2.0 + * + * or in the "license" file accompanying this file. This file is distributed + * on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either + * express or implied. See the License for the specific language governing + * permissions and limitations under the License. + */ + +package software.amazon.awssdk.http.auth.aws.internal.signer.io; + +import io.reactivex.subscribers.TestSubscriber; +import java.nio.ByteBuffer; +import java.util.Collections; +import org.reactivestreams.Subscriber; +import org.reactivestreams.tck.SubscriberBlackboxVerification; +import org.reactivestreams.tck.TestEnvironment; +import software.amazon.awssdk.checksums.DefaultChecksumAlgorithm; +import software.amazon.awssdk.checksums.SdkChecksum; + +public class UnbufferedChecksumSubscriberTckTest extends SubscriberBlackboxVerification { + + public UnbufferedChecksumSubscriberTckTest() { + super(new TestEnvironment()); + } + + @Override + public Subscriber createSubscriber() { + return new UnbufferedChecksumSubscriber( + Collections.singletonList(SdkChecksum.forAlgorithm(DefaultChecksumAlgorithm.CRC32)), + new TestSubscriber<>()); + } + + @Override + public ByteBuffer createElement(int element) { + return ByteBuffer.wrap(String.valueOf(element).getBytes()); + } +} diff --git a/core/http-auth-aws/src/test/java/software/amazon/awssdk/http/auth/aws/internal/signer/io/UnbufferedChecksumSubscriberTest.java b/core/http-auth-aws/src/test/java/software/amazon/awssdk/http/auth/aws/internal/signer/io/UnbufferedChecksumSubscriberTest.java new file mode 100644 index 000000000000..7de930442a80 --- /dev/null +++ b/core/http-auth-aws/src/test/java/software/amazon/awssdk/http/auth/aws/internal/signer/io/UnbufferedChecksumSubscriberTest.java @@ -0,0 +1,89 @@ +/* + * Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved. + * + * Licensed under the Apache License, Version 2.0 (the "License"). + * You may not use this file except in compliance with the License. + * A copy of the License is located at + * + * http://aws.amazon.com/apache2.0 + * + * or in the "license" file accompanying this file. This file is distributed + * on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either + * express or implied. See the License for the specific language governing + * permissions and limitations under the License. + */ + +package software.amazon.awssdk.http.auth.aws.internal.signer.io; + +import static org.assertj.core.api.Assertions.assertThat; +import static org.mockito.ArgumentMatchers.any; +import static org.mockito.Mockito.doAnswer; +import static org.mockito.Mockito.when; + +import io.reactivex.Flowable; +import io.reactivex.subscribers.TestSubscriber; +import java.nio.ByteBuffer; +import java.util.Arrays; +import java.util.Collections; +import java.util.List; +import org.junit.jupiter.api.Test; +import org.mockito.ArgumentCaptor; +import org.mockito.Mockito; +import org.reactivestreams.Publisher; +import org.reactivestreams.Subscriber; +import org.reactivestreams.Subscription; +import software.amazon.awssdk.checksums.SdkChecksum; + +public class UnbufferedChecksumSubscriberTest { + @Test + void subscribe_updatesEachChecksumWithIdenticalData() { + List buffers = Arrays.asList(ByteBuffer.wrap("foo".getBytes()), + ByteBuffer.wrap("bar".getBytes()), + ByteBuffer.wrap("baz".getBytes())); + + Publisher publisher = Flowable.fromIterable(buffers); + + SdkChecksum checksum1 = Mockito.mock(SdkChecksum.class); + SdkChecksum checksum2 = Mockito.mock(SdkChecksum.class); + + List checksums = Arrays.asList(checksum1, checksum2); + + UnbufferedChecksumSubscriber subscriber = new UnbufferedChecksumSubscriber(checksums, new TestSubscriber<>()); + + publisher.subscribe(subscriber); + + for (SdkChecksum checksum : checksums) { + ArgumentCaptor captor = ArgumentCaptor.forClass(ByteBuffer.class); + Mockito.verify(checksum, Mockito.times(3)).update(captor.capture()); + assertThat(captor.getAllValues()).containsExactlyElementsOf(buffers); + } + } + + @Test + public void subscribe_onNextDelegatedToWrappedSubscriber() { + List buffers = Arrays.asList(ByteBuffer.wrap("foo".getBytes()), + ByteBuffer.wrap("bar".getBytes()), + ByteBuffer.wrap("baz".getBytes())); + + Publisher publisher = Flowable.fromIterable(buffers); + + SdkChecksum checksum = Mockito.mock(SdkChecksum.class); + + Subscriber wrappedSubscriber = Mockito.mock(Subscriber.class); + doAnswer(i -> { + ((Subscription) i.getArguments()[0]).request(Long.MAX_VALUE); + return null; + }).when(wrappedSubscriber).onSubscribe(any(Subscription.class)); + + UnbufferedChecksumSubscriber subscriber = new UnbufferedChecksumSubscriber(Collections.singletonList(checksum), + wrappedSubscriber); + + publisher.subscribe(subscriber); + + ArgumentCaptor captor = ArgumentCaptor.forClass(ByteBuffer.class); + + Mockito.verify(wrappedSubscriber, Mockito.times(3)).onNext(captor.capture()); + + assertThat(captor.getAllValues()).containsExactlyElementsOf(buffers); + } +} diff --git a/core/http-auth-aws/src/test/java/software/amazon/awssdk/http/auth/aws/internal/signer/util/LengthCalculatingSubscriberTckTest.java b/core/http-auth-aws/src/test/java/software/amazon/awssdk/http/auth/aws/internal/signer/util/LengthCalculatingSubscriberTckTest.java new file mode 100644 index 000000000000..946a073a56e4 --- /dev/null +++ b/core/http-auth-aws/src/test/java/software/amazon/awssdk/http/auth/aws/internal/signer/util/LengthCalculatingSubscriberTckTest.java @@ -0,0 +1,38 @@ +/* + * Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved. + * + * Licensed under the Apache License, Version 2.0 (the "License"). + * You may not use this file except in compliance with the License. + * A copy of the License is located at + * + * http://aws.amazon.com/apache2.0 + * + * or in the "license" file accompanying this file. This file is distributed + * on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either + * express or implied. See the License for the specific language governing + * permissions and limitations under the License. + */ + +package software.amazon.awssdk.http.auth.aws.internal.signer.util; + +import java.nio.ByteBuffer; +import org.reactivestreams.Subscriber; +import org.reactivestreams.tck.SubscriberBlackboxVerification; +import org.reactivestreams.tck.TestEnvironment; + +public class LengthCalculatingSubscriberTckTest extends SubscriberBlackboxVerification { + + public LengthCalculatingSubscriberTckTest() { + super(new TestEnvironment()); + } + + @Override + public Subscriber createSubscriber() { + return new LengthCalculatingSubscriber(); + } + + @Override + public ByteBuffer createElement(int element) { + return ByteBuffer.wrap(Integer.toString(element).getBytes()); + } +} diff --git a/core/sdk-core/src/main/java/software/amazon/awssdk/core/internal/http/pipeline/stages/HttpChecksumStage.java b/core/sdk-core/src/main/java/software/amazon/awssdk/core/internal/http/pipeline/stages/HttpChecksumStage.java index d7bb3c699fb5..f0764e4a4449 100644 --- a/core/sdk-core/src/main/java/software/amazon/awssdk/core/internal/http/pipeline/stages/HttpChecksumStage.java +++ b/core/sdk-core/src/main/java/software/amazon/awssdk/core/internal/http/pipeline/stages/HttpChecksumStage.java @@ -119,19 +119,6 @@ private SdkHttpFullRequest.Builder sraChecksum(SdkHttpFullRequest.Builder reques } executionAttributes.putAttribute(RESOLVED_CHECKSUM_SPECS, resolvedChecksumSpecs); - SdkHttpRequest httpRequest = context.executionContext().interceptorContext().httpRequest(); - - // TODO(sra-identity-and-auth): payload checksum calculation (trailer) for sync is done in AwsChunkedV4PayloadSigner, - // but async is still in this class. We should first add chunked encoding support for async to - // AwsChunkedV4PayloadSigner - // and remove the logic here. Details in https://github.com/aws/aws-sdk-java-v2/pull/4568 - if (clientType == ClientType.ASYNC && - isStreamingUnsignedPayload(httpRequest, executionAttributes, resolvedChecksumSpecs, - resolvedChecksumSpecs.isRequestStreaming())) { - addFlexibleChecksumInTrailer(request, context, resolvedChecksumSpecs); - return request; - } - return request; }