33
33
import static software .amazon .awssdk .services .s3 .internal .multipart .utils .MultipartDownloadTestUtils .internalErrorBody ;
34
34
import static software .amazon .awssdk .services .s3 .internal .multipart .utils .MultipartDownloadTestUtils .transformersSuppliers ;
35
35
36
+ import com .github .tomakehurst .wiremock .http .Fault ;
36
37
import com .github .tomakehurst .wiremock .junit5 .WireMockRuntimeInfo ;
37
38
import com .github .tomakehurst .wiremock .junit5 .WireMockTest ;
38
39
import com .github .tomakehurst .wiremock .stubbing .Scenario ;
39
40
import java .io .IOException ;
40
- import java .io . UncheckedIOException ;
41
+ import java .net . SocketException ;
41
42
import java .net .URI ;
42
- import java .nio .file .Files ;
43
- import java .nio .file .Path ;
44
43
import java .time .Duration ;
45
44
import java .util .ArrayList ;
46
45
import java .util .Arrays ;
49
48
import java .util .concurrent .CompletableFuture ;
50
49
import java .util .concurrent .CompletionException ;
51
50
import java .util .concurrent .TimeUnit ;
52
- import java .util .stream .IntStream ;
53
51
import java .util .stream .Stream ;
54
- import org .junit .jupiter .api .Assumptions ;
55
52
import org .junit .jupiter .api .BeforeEach ;
56
53
import org .junit .jupiter .api .Timeout ;
57
54
import org .junit .jupiter .params .ParameterizedTest ;
58
55
import org .junit .jupiter .params .provider .Arguments ;
59
56
import org .junit .jupiter .params .provider .MethodSource ;
60
- import org .reactivestreams .Subscriber ;
61
57
import software .amazon .awssdk .auth .credentials .AwsBasicCredentials ;
62
58
import software .amazon .awssdk .auth .credentials .StaticCredentialsProvider ;
63
59
import software .amazon .awssdk .core .SplittingTransformerConfiguration ;
70
66
import software .amazon .awssdk .regions .Region ;
71
67
import software .amazon .awssdk .services .s3 .S3AsyncClient ;
72
68
import software .amazon .awssdk .services .s3 .internal .multipart .utils .MultipartDownloadTestUtils ;
73
- import software .amazon .awssdk .services .s3 .model .GetObjectRequest ;
74
69
import software .amazon .awssdk .services .s3 .model .GetObjectResponse ;
75
70
import software .amazon .awssdk .services .s3 .model .S3Exception ;
76
71
import software .amazon .awssdk .services .s3 .utils .AsyncResponseTransformerTestSupplier ;
@@ -82,7 +77,6 @@ public class S3MultipartClientGetObjectWiremockTest {
82
77
private static final String BUCKET = "Example-Bucket" ;
83
78
private static final String KEY = "Key" ;
84
79
private static final int MAX_ATTEMPTS = 3 ;
85
- private static int fileCounter = 0 ;
86
80
private S3AsyncClient multipartClient ;
87
81
private MultipartDownloadTestUtils util ;
88
82
@@ -112,10 +106,6 @@ <T> void happyPath_shouldReceiveAllBodyPartInCorrectOrder(AsyncResponseTransform
112
106
int partSize ) {
113
107
byte [] expectedBody = util .stubAllParts (BUCKET , KEY , amountOfPartToTest , partSize );
114
108
AsyncResponseTransformer <GetObjectResponse , T > transformer = supplier .transformer ();
115
- AsyncResponseTransformer .SplitResult <GetObjectResponse , T > split = transformer .split (
116
- SplittingTransformerConfiguration .builder ()
117
- .bufferSizeInBytes (1024 * 32L )
118
- .build ());
119
109
120
110
T response = multipartClient .getObject (b -> b .bucket (BUCKET ).key (KEY ), transformer ).join ();
121
111
@@ -124,25 +114,6 @@ <T> void happyPath_shouldReceiveAllBodyPartInCorrectOrder(AsyncResponseTransform
124
114
util .verifyCorrectAmountOfRequestsMade (amountOfPartToTest );
125
115
}
126
116
127
- @ ParameterizedTest
128
- @ MethodSource ("partSizeAndTransformerParams" )
129
- <T > void nonRetryableErrorOnFirstPart_shouldFail (AsyncResponseTransformerTestSupplier <T > supplier ,
130
- int amountOfPartToTest ,
131
- int partSize ) {
132
- stubFor (get (urlEqualTo (String .format ("/%s/%s?partNumber=1" , BUCKET , KEY ))).willReturn (
133
- aResponse ()
134
- .withStatus (400 )
135
- .withBody ("<Error><Code>400</Code><Message>test error message</Message></Error>" )));
136
- AsyncResponseTransformer <GetObjectResponse , T > transformer = supplier .transformer ();
137
- AsyncResponseTransformer .SplitResult <GetObjectResponse , T > split = transformer .split (
138
- SplittingTransformerConfiguration .builder ()
139
- .bufferSizeInBytes (1024 * 32L )
140
- .build ());
141
-
142
- assertThatThrownBy (() -> multipartClient .getObject (b -> b .bucket (BUCKET ).key (KEY ), transformer ).join ())
143
- .hasMessageContaining ("test error message" );
144
- }
145
-
146
117
@ ParameterizedTest
147
118
@ MethodSource ("partSizeAndTransformerParams" )
148
119
<T > void nonRetryableErrorOnThirdPart_shouldCompleteExceptionallyOnlyPartsGreaterThanTwo (
@@ -160,12 +131,6 @@ <T> void nonRetryableErrorOnThirdPart_shouldCompleteExceptionallyOnlyPartsGreate
160
131
SplittingTransformerConfiguration .builder ()
161
132
.bufferSizeInBytes (1024 * 32L )
162
133
.build ());
163
- Subscriber <AsyncResponseTransformer <GetObjectResponse , GetObjectResponse >> subscriber = new MultipartDownloaderSubscriber (
164
- multipartClient ,
165
- GetObjectRequest .builder ()
166
- .bucket (BUCKET )
167
- .key (KEY )
168
- .build ());
169
134
170
135
if (partSize > 1 ) {
171
136
assertThatThrownBy (() -> {
@@ -179,116 +144,46 @@ <T> void nonRetryableErrorOnThirdPart_shouldCompleteExceptionallyOnlyPartsGreate
179
144
}
180
145
181
146
@ ParameterizedTest
182
- @ MethodSource ("partSizeAndTransformerParams" )
183
- <T > void serverError_retryExhausted_shouldFail (AsyncResponseTransformerTestSupplier <T > supplier ,
184
- int amountOfPartToTest ,
185
- int partSize ) {
186
- util .stubSeverError (1 , internalErrorBody (), amountOfPartToTest );
147
+ @ MethodSource ("responseTransformers" )
148
+ <T > void nonRetryableErrorOnFirstPart_shouldFail (AsyncResponseTransformerTestSupplier <T > supplier ) {
149
+ stubFor (get (urlEqualTo (String .format ("/%s/%s?partNumber=1" , BUCKET , KEY ))).willReturn (
150
+ aResponse ()
151
+ .withStatus (400 )
152
+ .withBody ("<Error><Code>400</Code><Message>test error message</Message></Error>" )));
187
153
AsyncResponseTransformer <GetObjectResponse , T > transformer = supplier .transformer ();
188
154
189
-
190
- // Only enable this test for ByteArrayAsyncResponseTransformer because only ByteArrayAsyncResponseTransformer supports
191
- // retry
192
- Assumptions .assumeTrue (transformer instanceof ByteArrayAsyncResponseTransformer );
193
-
194
155
assertThatThrownBy (() -> multipartClient .getObject (b -> b .bucket (BUCKET ).key (KEY ), transformer ).join ())
195
- .hasMessageContaining (" We encountered an internal error" );
196
- verify (MAX_ATTEMPTS , getRequestedFor (urlEqualTo (String .format ("/%s/%s?partNumber=1" ,
197
- BUCKET , KEY ))));
156
+ .hasMessageContaining ("test error message" );
198
157
}
199
158
200
159
@ ParameterizedTest
201
- @ MethodSource ("partSizeAndTransformerParams" )
202
- <T > void serverError_retrySucceeds_shouldSucceed (AsyncResponseTransformerTestSupplier <T > supplier ,
203
- int amountOfPartToTest ,
204
- int partSize ) {
205
-
206
- byte [] expectedBody = util .stubFirst503Second200AllParts (amountOfPartToTest , partSize );
207
- AsyncResponseTransformer <GetObjectResponse , T > transformer = supplier .transformer ();
208
-
209
- // Only enable this test for ByteArrayAsyncResponseTransformer because only ByteArrayAsyncResponseTransformer supports
210
- // retry
211
- Assumptions .assumeTrue (transformer instanceof ByteArrayAsyncResponseTransformer );
212
-
213
- T response = multipartClient .getObject (b -> b .bucket (BUCKET ).key (KEY ), transformer ).join ();
214
-
215
- byte [] body = supplier .body (response );
216
- assertArrayEquals (expectedBody , body );
217
- util .verifyCorrectAmountOfRequestsMade (amountOfPartToTest );
218
-
219
- IntStream .range (1 , amountOfPartToTest )
220
- .forEach (index -> verify (2 , getRequestedFor (urlEqualTo (String .format ("/%s/%s?partNumber=" + index ,
221
- BUCKET , KEY )))));
222
- }
223
-
224
- private static Stream <Arguments > partSizeAndTransformerParams () {
225
- // amount of part, individual part size
226
- List <Pair <Integer , Integer >> partSizes = Arrays .asList (
227
- Pair .of (4 , 16 ),
228
- Pair .of (1 , 1024 ),
229
- Pair .of (31 , 1243 ),
230
- Pair .of (16 , 16 * 1024 ),
231
- Pair .of (1 , 1024 * 1024 ),
232
- Pair .of (4 , 1024 * 1024 ),
233
- Pair .of (1 , 4 * 1024 * 1024 ),
234
- Pair .of (4 , 6 * 1024 * 1024 ),
235
- Pair .of (7 , 5 * 3752 )
236
- );
237
-
238
- Stream .Builder <Arguments > sb = Stream .builder ();
239
- transformersSuppliers ().forEach (tr -> partSizes .forEach (p -> sb .accept (arguments (tr , p .left (), p .right ()))));
240
- return sb .build ();
241
- }
242
-
243
- /**
244
- * Testing {@link PublisherAsyncResponseTransformer}, {@link InputStreamResponseTransformer}, and
245
- * {@link FileAsyncResponseTransformer}
246
- * <p>
247
- *
248
- * Retry for multipart download is supported for {@link ByteArrayAsyncResponseTransformer}, tested in
249
- * {@link S3MultipartClientGetObjectToBytesWiremockTest}.
250
- */
251
- private static Stream <TransformerFactory > responseTransformerFactories () {
252
- return Stream .of (
253
- AsyncResponseTransformer ::toBlockingInputStream ,
254
- AsyncResponseTransformer ::toPublisher ,
255
- () -> {
256
- try {
257
- Path tempDir = Files .createTempDirectory ("s3-test" );
258
- Path tempFile = tempDir .resolve ("testFile" + fileCounter + ".txt" );
259
- fileCounter ++;
260
- tempFile .toFile ().deleteOnExit ();
261
- return AsyncResponseTransformer .toFile (tempFile );
262
- } catch (IOException e ) {
263
- throw new UncheckedIOException (e );
264
- }
265
- }
266
- );
267
- }
160
+ @ MethodSource ("responseTransformers" )
161
+ public void ioError_shouldFailAndNotRetry () {
162
+ stubFor (get (urlEqualTo (String .format ("/%s/%s?partNumber=1" , BUCKET , KEY )))
163
+ .willReturn (aResponse ()
164
+ .withFault (Fault .CONNECTION_RESET_BY_PEER )));
268
165
269
- interface TransformerFactory {
270
- AsyncResponseTransformer <GetObjectResponse , ?> create ();
271
- }
166
+ assertThatThrownBy (() -> multipartClient .getObject (b -> b .bucket (BUCKET ).key (KEY ),
167
+ AsyncResponseTransformer .toBlockingInputStream ()).join ())
168
+ .satisfiesAnyOf (
169
+ throwable -> assertThat (throwable )
170
+ .hasMessageContaining ("The connection was closed during the request" ),
272
171
273
- @ ParameterizedTest
274
- @ MethodSource ("responseTransformerFactories" )
275
- public void ioError_shouldFailAndNotRetry (TransformerFactory transformerFactory ) {
276
- util .stubIoError ( 1 );
277
- AsyncResponseTransformer <GetObjectResponse , ?> transformer = transformerFactory .create ();
172
+ throwable -> assertThat (throwable )
173
+ .hasMessageContaining ("Connection reset" )
174
+ );
278
175
279
- assertThatThrownBy (() -> multipartClient .getObject (b -> b .bucket (BUCKET ).key (KEY ), transformer ).join ())
280
- .hasCauseInstanceOf (IOException .class );
281
176
verify (1 , getRequestedFor (urlEqualTo (String .format ("/%s/%s?partNumber=1" , BUCKET , KEY ))));
282
177
}
283
178
284
179
@ ParameterizedTest
285
- @ MethodSource ("responseTransformerFactories " )
286
- public void getObject_single500WithinMany200s_shouldNotRetryError (TransformerFactory transformerFactory ) {
180
+ @ MethodSource ("responseTransformers " )
181
+ public void getObject_single500WithinMany200s_shouldNotRetryError (AsyncResponseTransformerTestSupplier <?> transformerSupplier ) {
287
182
List <CompletableFuture <?>> futures = new ArrayList <>();
288
183
289
- int numRuns = 100 ;
184
+ int numRuns = 50 ;
290
185
for (int i = 0 ; i < numRuns ; i ++) {
291
- CompletableFuture <?> resp = mock200Response (multipartClient , i , transformerFactory );
186
+ CompletableFuture <?> resp = mock200Response (multipartClient , i , transformerSupplier );
292
187
futures .add (resp );
293
188
}
294
189
@@ -311,11 +206,11 @@ public void getObject_single500WithinMany200s_shouldNotRetryError(TransformerFac
311
206
.withBody ("Hello World" )));
312
207
313
208
CompletableFuture <?> requestWithRetryableError =
314
- multipartClient .getObject (r -> r .bucket (BUCKET ).key (errorKey ), transformerFactory . create ());
209
+ multipartClient .getObject (r -> r .bucket (BUCKET ).key (errorKey ), transformerSupplier . transformer ());
315
210
futures .add (requestWithRetryableError );
316
211
317
212
for (int i = 0 ; i < numRuns ; i ++) {
318
- CompletableFuture <?> resp = mock200Response (multipartClient , i + 1000 , transformerFactory );
213
+ CompletableFuture <?> resp = mock200Response (multipartClient , i + 1000 , transformerSupplier );
319
214
futures .add (resp );
320
215
}
321
216
@@ -329,7 +224,42 @@ public void getObject_single500WithinMany200s_shouldNotRetryError(TransformerFac
329
224
verify (1 , getRequestedFor (urlEqualTo (String .format ("/%s/%s?partNumber=1" , BUCKET , errorKey ))));
330
225
}
331
226
332
- private CompletableFuture <?> mock200Response (S3AsyncClient s3Client , int runNumber , TransformerFactory transformerFactory ) {
227
+ private static Stream <Arguments > partSizeAndTransformerParams () {
228
+ // amount of part, individual part size
229
+ List <Pair <Integer , Integer >> partSizes = Arrays .asList (
230
+ Pair .of (4 , 16 ),
231
+ Pair .of (1 , 1024 ),
232
+ Pair .of (31 , 1243 ),
233
+ Pair .of (16 , 16 * 1024 ),
234
+ Pair .of (1 , 1024 * 1024 ),
235
+ Pair .of (4 , 1024 * 1024 ),
236
+ Pair .of (1 , 4 * 1024 * 1024 ),
237
+ Pair .of (4 , 6 * 1024 * 1024 ),
238
+ Pair .of (7 , 5 * 3752 )
239
+ );
240
+
241
+ Stream .Builder <Arguments > sb = Stream .builder ();
242
+ transformersSuppliers ().forEach (tr -> partSizes .forEach (p -> sb .accept (arguments (tr , p .left (), p .right ()))));
243
+ return sb .build ();
244
+ }
245
+
246
+
247
+ /**
248
+ * Testing {@link PublisherAsyncResponseTransformer}, {@link InputStreamResponseTransformer}, and
249
+ * {@link FileAsyncResponseTransformer}
250
+ * <p>
251
+ *
252
+ * Retry for multipart download is supported for {@link ByteArrayAsyncResponseTransformer}, tested in
253
+ * {@link S3MultipartClientGetObjectToBytesWiremockTest}.
254
+ */
255
+ private static Stream <AsyncResponseTransformerTestSupplier <?>> responseTransformers () {
256
+ return Stream .of (new AsyncResponseTransformerTestSupplier .InputStreamArtSupplier (),
257
+ new AsyncResponseTransformerTestSupplier .PublisherArtSupplier (),
258
+ new AsyncResponseTransformerTestSupplier .FileArtSupplier ());
259
+ }
260
+
261
+ private CompletableFuture <?> mock200Response (S3AsyncClient s3Client , int runNumber ,
262
+ AsyncResponseTransformerTestSupplier <?> transformerSupplier ) {
333
263
String runId = runNumber + " success" ;
334
264
335
265
stubFor (any (anyUrl ())
@@ -342,6 +272,6 @@ private CompletableFuture<?> mock200Response(S3AsyncClient s3Client, int runNumb
342
272
343
273
return s3Client .getObject (r -> r .bucket (BUCKET ).key (KEY )
344
274
.overrideConfiguration (c -> c .putHeader ("RunNum" , runId )),
345
- transformerFactory . create ());
275
+ transformerSupplier . transformer ());
346
276
}
347
277
}
0 commit comments