3131import java .util .ArrayList ;
3232import java .util .Collections ;
3333import java .util .List ;
34+ import java .util .concurrent .CompletableFuture ;
35+ import java .util .stream .Collectors ;
3436import org .reactivestreams .Publisher ;
3537import software .amazon .awssdk .annotations .SdkInternalApi ;
3638import software .amazon .awssdk .checksums .SdkChecksum ;
4042import software .amazon .awssdk .http .SdkHttpRequest ;
4143import software .amazon .awssdk .http .auth .aws .internal .signer .chunkedencoding .ChecksumTrailerProvider ;
4244import software .amazon .awssdk .http .auth .aws .internal .signer .chunkedencoding .ChunkedEncodedInputStream ;
45+ import software .amazon .awssdk .http .auth .aws .internal .signer .chunkedencoding .ChunkedEncodedPublisher ;
4346import software .amazon .awssdk .http .auth .aws .internal .signer .chunkedencoding .SigV4ChunkExtensionProvider ;
4447import software .amazon .awssdk .http .auth .aws .internal .signer .chunkedencoding .SigV4TrailerProvider ;
4548import software .amazon .awssdk .http .auth .aws .internal .signer .chunkedencoding .TrailerProvider ;
4649import software .amazon .awssdk .http .auth .aws .internal .signer .io .ChecksumInputStream ;
4750import software .amazon .awssdk .http .auth .aws .internal .signer .io .ResettableContentStreamProvider ;
51+ import software .amazon .awssdk .http .auth .aws .internal .signer .io .UnbufferedChecksumSubscriber ;
4852import software .amazon .awssdk .utils .BinaryUtils ;
4953import software .amazon .awssdk .utils .Pair ;
5054import software .amazon .awssdk .utils .Validate ;
@@ -116,8 +120,44 @@ public ContentStreamProvider sign(ContentStreamProvider payload, V4RequestSignin
116120
117121 @ Override
118122 public Publisher <ByteBuffer > signAsync (Publisher <ByteBuffer > payload , V4RequestSigningResult requestSigningResult ) {
119- // TODO(sra-identity-and-auth): implement this first and remove addFlexibleChecksumInTrailer logic in HttpChecksumStage
120- throw new UnsupportedOperationException ();
123+ ChunkedEncodedPublisher .Builder chunkedStreamBuilder = ChunkedEncodedPublisher .builder ()
124+ .publisher (payload )
125+ .chunkSize (chunkSize )
126+ .addEmptyTrailingChunk (true );
127+
128+ preExistingTrailers .forEach (t -> chunkedStreamBuilder .addTrailer (() -> t ));
129+
130+ SdkHttpRequest .Builder request = requestSigningResult .getSignedRequest ();
131+
132+ String checksum = request .firstMatchingHeader (X_AMZ_CONTENT_SHA256 ).orElseThrow (
133+ () -> new IllegalArgumentException (X_AMZ_CONTENT_SHA256 + " must be set!" )
134+ );
135+
136+ switch (checksum ) {
137+ case STREAMING_SIGNED_PAYLOAD : {
138+ RollingSigner rollingSigner = new RollingSigner (requestSigningResult .getSigningKey (),
139+ requestSigningResult .getSignature ());
140+ chunkedStreamBuilder .addExtension (new SigV4ChunkExtensionProvider (rollingSigner , credentialScope ));
141+ break ;
142+ }
143+ case STREAMING_UNSIGNED_PAYLOAD_TRAILER :
144+ setupChecksumTrailerIfNeeded (chunkedStreamBuilder );
145+ break ;
146+ case STREAMING_SIGNED_PAYLOAD_TRAILER : {
147+ setupChecksumTrailerIfNeeded (chunkedStreamBuilder );
148+ RollingSigner rollingSigner = new RollingSigner (requestSigningResult .getSigningKey (),
149+ requestSigningResult .getSignature ());
150+ chunkedStreamBuilder .addExtension (new SigV4ChunkExtensionProvider (rollingSigner , credentialScope ));
151+ chunkedStreamBuilder .addTrailer (
152+ new SigV4TrailerProvider (chunkedStreamBuilder .trailers (), rollingSigner , credentialScope )
153+ );
154+ break ;
155+ }
156+ default :
157+ throw new UnsupportedOperationException ();
158+ }
159+
160+ return chunkedStreamBuilder .build ();
121161 }
122162
123163 @ Override
@@ -127,27 +167,66 @@ public void beforeSigning(SdkHttpRequest.Builder request, ContentStreamProvider
127167 setupPreExistingTrailers (request );
128168
129169 // pre-existing trailers
170+ encodedContentLength = calculateEncodedContentLength (request , contentLength );
171+
172+ if (checksumAlgorithm != null ) {
173+ String checksumHeaderName = checksumHeaderName (checksumAlgorithm );
174+ request .appendHeader (X_AMZ_TRAILER , checksumHeaderName );
175+ }
176+ request .putHeader (Header .CONTENT_LENGTH , Long .toString (encodedContentLength ));
177+ request .appendHeader (CONTENT_ENCODING , AWS_CHUNKED );
178+ }
179+
180+ @ Override
181+ public CompletableFuture <Pair <SdkHttpRequest .Builder , Publisher <ByteBuffer >>> beforeSigningAsync (
182+ SdkHttpRequest .Builder request , Publisher <ByteBuffer > payload ) {
183+ return moveContentLength (request , payload )
184+ .thenApply (p -> {
185+ SdkHttpRequest .Builder requestBuilder = p .left ();
186+ setupPreExistingTrailers (requestBuilder );
187+
188+ long decodedContentLength = requestBuilder .firstMatchingHeader ("x-amz-decoded-content-length" )
189+ .map (Long ::parseLong )
190+ // should not happen, this header is added by moveContentLength
191+ .orElseThrow (() -> new RuntimeException ("x-amz-decoded-content-length "
192+ + "header not present" ));
193+
194+ long encodedContentLength = calculateEncodedContentLength (request , decodedContentLength );
195+
196+ if (checksumAlgorithm != null ) {
197+ String checksumHeaderName = checksumHeaderName (checksumAlgorithm );
198+ request .appendHeader (X_AMZ_TRAILER , checksumHeaderName );
199+ }
200+ request .putHeader (Header .CONTENT_LENGTH , Long .toString (encodedContentLength ));
201+ request .appendHeader (CONTENT_ENCODING , AWS_CHUNKED );
202+ return Pair .of (requestBuilder , p .right ());
203+ });
204+ }
205+
206+ private long calculateEncodedContentLength (SdkHttpRequest .Builder requestBuilder , long decodedContentLength ) {
207+ long encodedContentLength = 0 ;
208+
130209 encodedContentLength += calculateExistingTrailersLength ();
131210
132- String checksum = request .firstMatchingHeader (X_AMZ_CONTENT_SHA256 ).orElseThrow (
211+ String checksum = requestBuilder .firstMatchingHeader (X_AMZ_CONTENT_SHA256 ).orElseThrow (
133212 () -> new IllegalArgumentException (X_AMZ_CONTENT_SHA256 + " must be set!" )
134213 );
135214
136215 switch (checksum ) {
137216 case STREAMING_SIGNED_PAYLOAD : {
138217 long extensionsLength = 81 ; // ;chunk-signature:<sigv4 hex signature, 64 bytes>
139- encodedContentLength += calculateChunksLength (contentLength , extensionsLength );
218+ encodedContentLength += calculateChunksLength (decodedContentLength , extensionsLength );
140219 break ;
141220 }
142221 case STREAMING_UNSIGNED_PAYLOAD_TRAILER :
143222 if (checksumAlgorithm != null ) {
144223 encodedContentLength += calculateChecksumTrailerLength (checksumHeaderName (checksumAlgorithm ));
145224 }
146- encodedContentLength += calculateChunksLength (contentLength , 0 );
225+ encodedContentLength += calculateChunksLength (decodedContentLength , 0 );
147226 break ;
148227 case STREAMING_SIGNED_PAYLOAD_TRAILER : {
149228 long extensionsLength = 81 ; // ;chunk-signature:<sigv4 hex signature, 64 bytes>
150- encodedContentLength += calculateChunksLength (contentLength , extensionsLength );
229+ encodedContentLength += calculateChunksLength (decodedContentLength , extensionsLength );
151230 if (checksumAlgorithm != null ) {
152231 encodedContentLength += calculateChecksumTrailerLength (checksumHeaderName (checksumAlgorithm ));
153232 }
@@ -161,12 +240,7 @@ public void beforeSigning(SdkHttpRequest.Builder request, ContentStreamProvider
161240 // terminating \r\n
162241 encodedContentLength += 2 ;
163242
164- if (checksumAlgorithm != null ) {
165- String checksumHeaderName = checksumHeaderName (checksumAlgorithm );
166- request .appendHeader (X_AMZ_TRAILER , checksumHeaderName );
167- }
168- request .putHeader (Header .CONTENT_LENGTH , Long .toString (encodedContentLength ));
169- request .appendHeader (CONTENT_ENCODING , AWS_CHUNKED );
243+ return encodedContentLength ;
170244 }
171245
172246 /**
@@ -271,6 +345,24 @@ private void setupChecksumTrailerIfNeeded(ChunkedEncodedInputStream.Builder buil
271345 builder .inputStream (checksumInputStream ).addTrailer (checksumTrailer );
272346 }
273347
348+ private void setupChecksumTrailerIfNeeded (ChunkedEncodedPublisher .Builder builder ) {
349+ if (checksumAlgorithm != null ) {
350+ String checksumHeaderName = checksumHeaderName (checksumAlgorithm );
351+ SdkChecksum sdkChecksum = fromChecksumAlgorithm (checksumAlgorithm );
352+ Publisher <ByteBuffer > checksummedPayload = computeChecksum (builder .publisher (), sdkChecksum );
353+
354+ builder .publisher (checksummedPayload );
355+
356+ TrailerProvider checksumTrailer = new ChecksumTrailerProvider (sdkChecksum , checksumHeaderName );
357+ builder .addTrailer (checksumTrailer );
358+ }
359+ }
360+
361+ private Publisher <ByteBuffer > computeChecksum (Publisher <ByteBuffer > publisher , SdkChecksum checksum ) {
362+ return subscriber -> publisher .subscribe (
363+ new UnbufferedChecksumSubscriber (Collections .singletonList (checksum ), subscriber ));
364+ }
365+
274366 static class Builder {
275367 private CredentialScope credentialScope ;
276368 private Integer chunkSize ;
0 commit comments