23
23
import static software .amazon .awssdk .http .auth .aws .internal .signer .util .SignerConstant .STREAMING_SIGNED_PAYLOAD_TRAILER ;
24
24
import static software .amazon .awssdk .http .auth .aws .internal .signer .util .SignerConstant .STREAMING_UNSIGNED_PAYLOAD_TRAILER ;
25
25
import static software .amazon .awssdk .http .auth .aws .internal .signer .util .SignerConstant .X_AMZ_CONTENT_SHA256 ;
26
+ import static software .amazon .awssdk .http .auth .aws .internal .signer .util .SignerConstant .X_AMZ_DECODED_CONTENT_LENGTH ;
26
27
import static software .amazon .awssdk .http .auth .aws .internal .signer .util .SignerConstant .X_AMZ_TRAILER ;
27
28
import static software .amazon .awssdk .http .auth .aws .internal .signer .util .SignerUtils .moveContentLength ;
28
29
29
30
import java .nio .ByteBuffer ;
30
31
import java .nio .charset .StandardCharsets ;
31
32
import java .util .ArrayList ;
32
- import java .util .Collections ;
33
33
import java .util .List ;
34
+ import java .util .Optional ;
35
+ import java .util .concurrent .CompletableFuture ;
34
36
import org .reactivestreams .Publisher ;
35
37
import software .amazon .awssdk .annotations .SdkInternalApi ;
36
38
import software .amazon .awssdk .checksums .SdkChecksum ;
37
39
import software .amazon .awssdk .checksums .spi .ChecksumAlgorithm ;
38
40
import software .amazon .awssdk .http .ContentStreamProvider ;
39
41
import software .amazon .awssdk .http .Header ;
40
42
import software .amazon .awssdk .http .SdkHttpRequest ;
43
+ import software .amazon .awssdk .http .auth .aws .internal .signer .chunkedencoding .AsyncChunkEncodedPayload ;
41
44
import software .amazon .awssdk .http .auth .aws .internal .signer .chunkedencoding .ChecksumTrailerProvider ;
42
45
import software .amazon .awssdk .http .auth .aws .internal .signer .chunkedencoding .ChunkedEncodedInputStream ;
46
+ import software .amazon .awssdk .http .auth .aws .internal .signer .chunkedencoding .ChunkedEncodedPayload ;
47
+ import software .amazon .awssdk .http .auth .aws .internal .signer .chunkedencoding .ChunkedEncodedPublisher ;
43
48
import software .amazon .awssdk .http .auth .aws .internal .signer .chunkedencoding .SigV4ChunkExtensionProvider ;
44
49
import software .amazon .awssdk .http .auth .aws .internal .signer .chunkedencoding .SigV4TrailerProvider ;
50
+ import software .amazon .awssdk .http .auth .aws .internal .signer .chunkedencoding .SyncChunkEncodedPayload ;
45
51
import software .amazon .awssdk .http .auth .aws .internal .signer .chunkedencoding .TrailerProvider ;
46
- import software .amazon .awssdk .http .auth .aws .internal .signer .io .ChecksumInputStream ;
47
52
import software .amazon .awssdk .http .auth .aws .internal .signer .io .ResettableContentStreamProvider ;
48
53
import software .amazon .awssdk .utils .BinaryUtils ;
49
54
import software .amazon .awssdk .utils .Pair ;
@@ -73,51 +78,67 @@ public static Builder builder() {
73
78
74
79
@ Override
75
80
public ContentStreamProvider sign (ContentStreamProvider payload , V4RequestSigningResult requestSigningResult ) {
76
- SdkHttpRequest .Builder request = requestSigningResult .getSignedRequest ();
77
-
78
- String checksum = request .firstMatchingHeader (X_AMZ_CONTENT_SHA256 ).orElseThrow (
79
- () -> new IllegalArgumentException (X_AMZ_CONTENT_SHA256 + " must be set!" )
80
- );
81
-
82
81
ChunkedEncodedInputStream .Builder chunkedEncodedInputStreamBuilder = ChunkedEncodedInputStream
83
82
.builder ()
84
83
.inputStream (payload .newStream ())
85
84
.chunkSize (chunkSize )
86
85
.header (chunk -> Integer .toHexString (chunk .remaining ()).getBytes (StandardCharsets .UTF_8 ));
87
86
88
- preExistingTrailers .forEach (trailer -> chunkedEncodedInputStreamBuilder .addTrailer (() -> trailer ));
87
+ SyncChunkEncodedPayload chunkedPayload = new SyncChunkEncodedPayload (chunkedEncodedInputStreamBuilder );
88
+ signCommon (chunkedPayload , requestSigningResult );
89
+
90
+ return new ResettableContentStreamProvider (chunkedEncodedInputStreamBuilder ::build );
91
+ }
92
+
93
+ @ Override
94
+ public Publisher <ByteBuffer > signAsync (Publisher <ByteBuffer > payload , V4RequestSigningResult requestSigningResult ) {
95
+ ChunkedEncodedPublisher .Builder chunkedStreamBuilder = ChunkedEncodedPublisher .builder ()
96
+ .publisher (payload )
97
+ .chunkSize (chunkSize )
98
+ .addEmptyTrailingChunk (true );
99
+
100
+ AsyncChunkEncodedPayload checksumPayload = new AsyncChunkEncodedPayload (chunkedStreamBuilder );
101
+ signCommon (checksumPayload , requestSigningResult );
102
+
103
+ return chunkedStreamBuilder .build ();
104
+ }
105
+
106
+ private void signCommon (ChunkedEncodedPayload payload , V4RequestSigningResult requestSigningResult ) {
107
+ preExistingTrailers .forEach (t -> payload .addTrailer (() -> t ));
108
+
109
+ SdkHttpRequest .Builder request = requestSigningResult .getSignedRequest ();
110
+
111
+ payload .decodedContentLength (request .firstMatchingHeader (X_AMZ_DECODED_CONTENT_LENGTH )
112
+ .map (Long ::parseLong )
113
+ .orElse (0L ));
114
+
115
+ String checksum = request .firstMatchingHeader (X_AMZ_CONTENT_SHA256 ).orElseThrow (
116
+ () -> new IllegalArgumentException (X_AMZ_CONTENT_SHA256 + " must be set!" )
117
+ );
89
118
90
119
switch (checksum ) {
91
120
case STREAMING_SIGNED_PAYLOAD : {
92
121
RollingSigner rollingSigner = new RollingSigner (requestSigningResult .getSigningKey (),
93
122
requestSigningResult .getSignature ());
94
- chunkedEncodedInputStreamBuilder .addExtension (new SigV4ChunkExtensionProvider (rollingSigner , credentialScope ));
123
+ payload .addExtension (new SigV4ChunkExtensionProvider (rollingSigner , credentialScope ));
95
124
break ;
96
125
}
97
126
case STREAMING_UNSIGNED_PAYLOAD_TRAILER :
98
- setupChecksumTrailerIfNeeded (chunkedEncodedInputStreamBuilder );
127
+ setupChecksumTrailerIfNeeded (payload );
99
128
break ;
100
129
case STREAMING_SIGNED_PAYLOAD_TRAILER : {
130
+ setupChecksumTrailerIfNeeded (payload );
101
131
RollingSigner rollingSigner = new RollingSigner (requestSigningResult .getSigningKey (),
102
132
requestSigningResult .getSignature ());
103
- chunkedEncodedInputStreamBuilder .addExtension (new SigV4ChunkExtensionProvider (rollingSigner , credentialScope ));
104
- setupChecksumTrailerIfNeeded (chunkedEncodedInputStreamBuilder );
105
- chunkedEncodedInputStreamBuilder .addTrailer (
106
- new SigV4TrailerProvider (chunkedEncodedInputStreamBuilder .trailers (), rollingSigner , credentialScope )
133
+ payload .addExtension (new SigV4ChunkExtensionProvider (rollingSigner , credentialScope ));
134
+ payload .addTrailer (
135
+ new SigV4TrailerProvider (payload .trailers (), rollingSigner , credentialScope )
107
136
);
108
137
break ;
109
138
}
110
139
default :
111
140
throw new UnsupportedOperationException ();
112
141
}
113
-
114
- return new ResettableContentStreamProvider (chunkedEncodedInputStreamBuilder ::build );
115
- }
116
-
117
- @ Override
118
- public Publisher <ByteBuffer > signAsync (Publisher <ByteBuffer > payload , V4RequestSigningResult requestSigningResult ) {
119
- // TODO(sra-identity-and-auth): implement this first and remove addFlexibleChecksumInTrailer logic in HttpChecksumStage
120
- throw new UnsupportedOperationException ();
121
142
}
122
143
123
144
@ Override
@@ -127,27 +148,66 @@ public void beforeSigning(SdkHttpRequest.Builder request, ContentStreamProvider
127
148
setupPreExistingTrailers (request );
128
149
129
150
// pre-existing trailers
151
+ encodedContentLength = calculateEncodedContentLength (request , contentLength );
152
+
153
+ if (checksumAlgorithm != null ) {
154
+ String checksumHeaderName = checksumHeaderName (checksumAlgorithm );
155
+ request .appendHeader (X_AMZ_TRAILER , checksumHeaderName );
156
+ }
157
+ request .putHeader (Header .CONTENT_LENGTH , Long .toString (encodedContentLength ));
158
+ request .appendHeader (CONTENT_ENCODING , AWS_CHUNKED );
159
+ }
160
+
161
+ @ Override
162
+ public CompletableFuture <Pair <SdkHttpRequest .Builder , Optional <Publisher <ByteBuffer >>>> beforeSigningAsync (
163
+ SdkHttpRequest .Builder request , Publisher <ByteBuffer > payload ) {
164
+ return moveContentLength (request , payload )
165
+ .thenApply (p -> {
166
+ SdkHttpRequest .Builder requestBuilder = p .left ();
167
+ setupPreExistingTrailers (requestBuilder );
168
+
169
+ long decodedContentLength = requestBuilder .firstMatchingHeader (X_AMZ_DECODED_CONTENT_LENGTH )
170
+ .map (Long ::parseLong )
171
+ // should not happen, this header is added by moveContentLength
172
+ .orElseThrow (() -> new RuntimeException (X_AMZ_DECODED_CONTENT_LENGTH
173
+ + " header not present" ));
174
+
175
+ long encodedContentLength = calculateEncodedContentLength (request , decodedContentLength );
176
+
177
+ if (checksumAlgorithm != null ) {
178
+ String checksumHeaderName = checksumHeaderName (checksumAlgorithm );
179
+ request .appendHeader (X_AMZ_TRAILER , checksumHeaderName );
180
+ }
181
+ request .putHeader (Header .CONTENT_LENGTH , Long .toString (encodedContentLength ));
182
+ request .appendHeader (CONTENT_ENCODING , AWS_CHUNKED );
183
+ return Pair .of (requestBuilder , p .right ());
184
+ });
185
+ }
186
+
187
+ private long calculateEncodedContentLength (SdkHttpRequest .Builder requestBuilder , long decodedContentLength ) {
188
+ long encodedContentLength = 0 ;
189
+
130
190
encodedContentLength += calculateExistingTrailersLength ();
131
191
132
- String checksum = request .firstMatchingHeader (X_AMZ_CONTENT_SHA256 ).orElseThrow (
192
+ String checksum = requestBuilder .firstMatchingHeader (X_AMZ_CONTENT_SHA256 ).orElseThrow (
133
193
() -> new IllegalArgumentException (X_AMZ_CONTENT_SHA256 + " must be set!" )
134
194
);
135
195
136
196
switch (checksum ) {
137
197
case STREAMING_SIGNED_PAYLOAD : {
138
198
long extensionsLength = 81 ; // ;chunk-signature:<sigv4 hex signature, 64 bytes>
139
- encodedContentLength += calculateChunksLength (contentLength , extensionsLength );
199
+ encodedContentLength += calculateChunksLength (decodedContentLength , extensionsLength );
140
200
break ;
141
201
}
142
202
case STREAMING_UNSIGNED_PAYLOAD_TRAILER :
143
203
if (checksumAlgorithm != null ) {
144
204
encodedContentLength += calculateChecksumTrailerLength (checksumHeaderName (checksumAlgorithm ));
145
205
}
146
- encodedContentLength += calculateChunksLength (contentLength , 0 );
206
+ encodedContentLength += calculateChunksLength (decodedContentLength , 0 );
147
207
break ;
148
208
case STREAMING_SIGNED_PAYLOAD_TRAILER : {
149
209
long extensionsLength = 81 ; // ;chunk-signature:<sigv4 hex signature, 64 bytes>
150
- encodedContentLength += calculateChunksLength (contentLength , extensionsLength );
210
+ encodedContentLength += calculateChunksLength (decodedContentLength , extensionsLength );
151
211
if (checksumAlgorithm != null ) {
152
212
encodedContentLength += calculateChecksumTrailerLength (checksumHeaderName (checksumAlgorithm ));
153
213
}
@@ -161,12 +221,7 @@ public void beforeSigning(SdkHttpRequest.Builder request, ContentStreamProvider
161
221
// terminating \r\n
162
222
encodedContentLength += 2 ;
163
223
164
- if (checksumAlgorithm != null ) {
165
- String checksumHeaderName = checksumHeaderName (checksumAlgorithm );
166
- request .appendHeader (X_AMZ_TRAILER , checksumHeaderName );
167
- }
168
- request .putHeader (Header .CONTENT_LENGTH , Long .toString (encodedContentLength ));
169
- request .appendHeader (CONTENT_ENCODING , AWS_CHUNKED );
224
+ return encodedContentLength ;
170
225
}
171
226
172
227
/**
@@ -250,25 +305,17 @@ private long calculateChecksumTrailerLength(String checksumHeaderName) {
250
305
return lengthInBytes + 2 ;
251
306
}
252
307
253
- /**
254
- * Add the checksum as a trailer to the chunk-encoded stream.
255
- * <p>
256
- * If the checksum-algorithm is not present, then nothing is done.
257
- */
258
- private void setupChecksumTrailerIfNeeded (ChunkedEncodedInputStream .Builder builder ) {
308
+ private void setupChecksumTrailerIfNeeded (ChunkedEncodedPayload payload ) {
259
309
if (checksumAlgorithm == null ) {
260
310
return ;
261
311
}
262
312
String checksumHeaderName = checksumHeaderName (checksumAlgorithm );
263
313
SdkChecksum sdkChecksum = fromChecksumAlgorithm (checksumAlgorithm );
264
- ChecksumInputStream checksumInputStream = new ChecksumInputStream (
265
- builder .inputStream (),
266
- Collections .singleton (sdkChecksum )
267
- );
268
314
269
315
TrailerProvider checksumTrailer = new ChecksumTrailerProvider (sdkChecksum , checksumHeaderName );
270
316
271
- builder .inputStream (checksumInputStream ).addTrailer (checksumTrailer );
317
+ payload .checksumPayload (sdkChecksum );
318
+ payload .addTrailer (checksumTrailer );
272
319
}
273
320
274
321
static class Builder {
0 commit comments