5151import software .amazon .awssdk .core .async .SdkPublisher ;
5252import software .amazon .awssdk .http .SdkHttpResponse ;
5353import software .amazon .awssdk .utils .CompletableFutureUtils ;
54+ import software .amazon .awssdk .utils .ContentRangeParser ;
5455
5556class FileAsyncResponseTransformerPublisherTest {
5657
@@ -239,11 +240,17 @@ public void onComplete() {
239240 }
240241
241242 private SdkResponse createMockResponseWithRange (String contentRange ) {
243+ return createMockResponseWithRange (contentRange ,
244+ ContentRangeParser .totalBytes (contentRange ).getAsLong ());
245+ }
246+
247+ private SdkResponse createMockResponseWithRange (String contentRange , Long contentLength ) {
242248 SdkResponse mockResponse = mock (SdkResponse .class );
243249 SdkHttpResponse mockHttpResponse = mock (SdkHttpResponse .class );
244250
245251 when (mockResponse .sdkHttpResponse ()).thenReturn (mockHttpResponse );
246- when (mockHttpResponse .firstMatchingHeader ("x-amz-content-range" )).thenReturn (Optional .of (contentRange ));
252+ when (mockHttpResponse .firstMatchingHeader ("content-length" )).thenReturn (Optional .ofNullable (String .valueOf (contentLength )));
253+ when (mockHttpResponse .firstMatchingHeader ("x-amz-content-range" )).thenReturn (Optional .ofNullable (contentRange ));
247254
248255 return mockResponse ;
249256 }
@@ -298,7 +305,101 @@ void createOrAppendToExisting_shouldThrowException() {
298305 assertThatThrownBy (() -> new FileAsyncResponseTransformerPublisher <>((FileAsyncResponseTransformer <?>) initialTransformer ))
299306 .isInstanceOf (IllegalArgumentException .class )
300307 .hasMessageContaining ("CREATE_OR_APPEND_TO_EXISTING" );
308+ }
309+
310+ @ Test
311+ void singleDemand_contentRangeMissing_shouldSucceed () throws Exception {
312+ AsyncResponseTransformer <SdkResponse , SdkResponse > initialTransformer = AsyncResponseTransformer .toFile (testFile );
313+ FileAsyncResponseTransformerPublisher <SdkResponse > publisher =
314+ new FileAsyncResponseTransformerPublisher <>((FileAsyncResponseTransformer <SdkResponse >) initialTransformer );
315+
316+ CountDownLatch latch = new CountDownLatch (1 );
317+ AtomicReference <AsyncResponseTransformer <SdkResponse , SdkResponse >> receivedTransformer = new AtomicReference <>();
318+ CompletableFuture <SdkResponse > future = new CompletableFuture <>();
301319
320+ publisher .subscribe (new Subscriber <AsyncResponseTransformer <SdkResponse , SdkResponse >>() {
321+ private Subscription subscription ;
322+
323+ @ Override
324+ public void onSubscribe (Subscription s ) {
325+ this .subscription = s ;
326+ s .request (1 );
327+ }
328+
329+ @ Override
330+ public void onNext (AsyncResponseTransformer <SdkResponse , SdkResponse > transformer ) {
331+ receivedTransformer .set (transformer );
332+
333+ // Simulate response with content-range header
334+ SdkResponse mockResponse = createMockResponseWithRange (null , 0L );
335+ CompletableFuture <SdkResponse > prepareFuture = transformer .prepare ();
336+ CompletableFutureUtils .forwardResultTo (prepareFuture , future );
337+ transformer .onResponse (mockResponse );
338+
339+ // Simulate stream data
340+ SdkPublisher <ByteBuffer > mockPublisher = createMockPublisher ();
341+ transformer .onStream (mockPublisher );
342+
343+ latch .countDown ();
344+ }
345+
346+ @ Override
347+ public void onError (Throwable t ) {
348+ fail ("Unexpected error with exception: " + t .getMessage ());
349+ }
350+
351+ @ Override
352+ public void onComplete () {
353+ latch .countDown ();
354+ }
355+ });
356+
357+ assertThat (latch .await (5 , TimeUnit .SECONDS )).isTrue ();
358+ assertThat (receivedTransformer .get ()).isNotNull ();
359+ assertThat (Files .exists (testFile )).isTrue ();
360+ assertThat (future ).succeedsWithin (10 , TimeUnit .SECONDS );
361+ }
362+
363+ @ Test
364+ void multipleTransformers_contentRangeMissingOnSecondRequest_shouldFail () throws Exception {
365+ AsyncResponseTransformer <SdkResponse , SdkResponse > initialTransformer = AsyncResponseTransformer .toFile (testFile );
366+ FileAsyncResponseTransformerPublisher <SdkResponse > publisher =
367+ new FileAsyncResponseTransformerPublisher <>((FileAsyncResponseTransformer <SdkResponse >) initialTransformer );
368+
369+ CountDownLatch latch = new CountDownLatch (1 );
370+ CompletableFuture <SdkResponse > future = new CompletableFuture <>();
371+ AtomicReference <Throwable > exception = new AtomicReference <>();
372+
373+ publisher .subscribe (new Subscriber <AsyncResponseTransformer <SdkResponse , SdkResponse >>() {
374+
375+ @ Override
376+ public void onSubscribe (Subscription s ) {
377+ s .request (2 );
378+ }
379+
380+ @ Override
381+ public void onNext (AsyncResponseTransformer <SdkResponse , SdkResponse > transformer ) {
382+ SdkResponse mockResponse = createMockResponseWithRange (null , 0L );
383+
384+ CompletableFuture <SdkResponse > prepareFuture = transformer .prepare ();
385+ CompletableFutureUtils .forwardResultTo (prepareFuture , future );
386+ transformer .onResponse (mockResponse );
387+ }
388+
389+ @ Override
390+ public void onError (Throwable t ) {
391+ exception .set (t );
392+ latch .countDown ();
393+ }
394+
395+ @ Override
396+ public void onComplete () {
397+ fail ("Unexpected onComplete" );
398+ }
399+ });
400+
401+ assertThat (latch .await (5 , TimeUnit .SECONDS )).isTrue ();
402+ assertThat (exception .get ()).hasMessageContaining ("Content range header is missing" );
302403 }
303404
304405}
0 commit comments