From 72a515007bf2e3af3dd822fed9194d825a0bd587 Mon Sep 17 00:00:00 2001 From: hdavidh Date: Mon, 18 Aug 2025 15:02:36 -0700 Subject: [PATCH 1/2] Avoid extra byte array copying when downloading to memory with AsyncResponseTransformer --- .../feature-AWSSDKforJavav2-dd9f8bf.json | 6 +++ .../amazon/awssdk/core/ResponseBytes.java | 28 ++++++++++++ .../ByteArrayAsyncResponseTransformer.java | 43 +++++++++++++------ .../amazon/awssdk/core/ResponseBytesTest.java | 25 +++++++++-- .../async/FileAsyncRequestBodyTest.java | 22 +++++----- ...PublisherAsyncResponseTransformerTest.java | 10 +++-- .../async/SplittingPublisherTestUtils.java | 18 +++----- 7 files changed, 112 insertions(+), 40 deletions(-) create mode 100644 .changes/next-release/feature-AWSSDKforJavav2-dd9f8bf.json diff --git a/.changes/next-release/feature-AWSSDKforJavav2-dd9f8bf.json b/.changes/next-release/feature-AWSSDKforJavav2-dd9f8bf.json new file mode 100644 index 000000000000..1490f7d94aae --- /dev/null +++ b/.changes/next-release/feature-AWSSDKforJavav2-dd9f8bf.json @@ -0,0 +1,6 @@ +{ + "type": "feature", + "category": "AWS SDK for Java v2", + "contributor": "", + "description": "Avoid extra byte array copying when downloading to memory with AsyncResponseTransformer" +} diff --git a/core/sdk-core/src/main/java/software/amazon/awssdk/core/ResponseBytes.java b/core/sdk-core/src/main/java/software/amazon/awssdk/core/ResponseBytes.java index 2b3ed0523cca..193e0482ca8e 100644 --- a/core/sdk-core/src/main/java/software/amazon/awssdk/core/ResponseBytes.java +++ b/core/sdk-core/src/main/java/software/amazon/awssdk/core/ResponseBytes.java @@ -19,6 +19,7 @@ import java.io.InputStream; import java.io.UncheckedIOException; +import java.nio.ByteBuffer; import java.util.Arrays; import software.amazon.awssdk.annotations.SdkPublicApi; import software.amazon.awssdk.core.async.AsyncResponseTransformer; @@ -68,6 +69,33 @@ public static ResponseBytes fromByteArrayUnsafe(ResponseT return new ResponseBytes<>(response, bytes); } + /** + * Creates ResponseBytes from a ByteBuffer without copying the underlying data. + * + * @param response the response object containing metadata + * @param buffer the ByteBuffer containing the response body data + * @return ResponseBytes wrapping the buffer data + */ + public static ResponseBytes fromByteBufferUnsafe(ResponseT response, ByteBuffer buffer) { + byte[] array; + if (buffer.hasArray()) { + array = buffer.array(); + int offset = buffer.arrayOffset() + buffer.position(); + int length = buffer.remaining(); + if (offset == 0 && length == array.length) { + // Perfect match - use array directly + } else { + // Create view of the relevant portion + array = Arrays.copyOfRange(array, offset, offset + length); + } + } else { + // Direct buffer - must copy to array + array = new byte[buffer.remaining()]; + buffer.get(array); + } + return new ResponseBytes<>(response, array); + } + /** * @return the unmarshalled response object from the service. */ diff --git a/core/sdk-core/src/main/java/software/amazon/awssdk/core/internal/async/ByteArrayAsyncResponseTransformer.java b/core/sdk-core/src/main/java/software/amazon/awssdk/core/internal/async/ByteArrayAsyncResponseTransformer.java index 4d0fd4f33ab4..baf59ecc1a2b 100644 --- a/core/sdk-core/src/main/java/software/amazon/awssdk/core/internal/async/ByteArrayAsyncResponseTransformer.java +++ b/core/sdk-core/src/main/java/software/amazon/awssdk/core/internal/async/ByteArrayAsyncResponseTransformer.java @@ -41,15 +41,15 @@ public final class ByteArrayAsyncResponseTransformer implements AsyncResponseTransformer> { - private volatile CompletableFuture cf; + private volatile CompletableFuture cf; private volatile ResponseT response; @Override public CompletableFuture> prepare() { cf = new CompletableFuture<>(); - // Using fromByteArrayUnsafe() to avoid unnecessary extra copying of byte array. The data writing has completed and the - // byte array will not be further modified so this is safe - return cf.thenApply(arr -> ResponseBytes.fromByteArrayUnsafe(response, arr)); + // Using fromByteBufferUnsafe() to avoid unnecessary extra copying of byte array. The data writing has completed and the + // byte buffer will not be further modified so this is safe + return cf.thenApply(buffer -> ResponseBytes.fromByteBufferUnsafe(response, buffer)); } @Override @@ -73,13 +73,11 @@ public String name() { } static class BaosSubscriber implements Subscriber { - private final CompletableFuture resultFuture; - - private ByteArrayOutputStream baos = new ByteArrayOutputStream(); - + private final CompletableFuture resultFuture; + private DirectAccessByteArrayOutputStream directAccessOutputStream = new DirectAccessByteArrayOutputStream(); private Subscription subscription; - BaosSubscriber(CompletableFuture resultFuture) { + BaosSubscriber(CompletableFuture resultFuture) { this.resultFuture = resultFuture; } @@ -95,19 +93,38 @@ public void onSubscribe(Subscription s) { @Override public void onNext(ByteBuffer byteBuffer) { - invokeSafely(() -> baos.write(BinaryUtils.copyBytesFrom(byteBuffer))); - subscription.request(1); + invokeSafely(() -> { + if (byteBuffer.hasArray()) { + directAccessOutputStream.write(byteBuffer.array(), byteBuffer.arrayOffset() + byteBuffer.position(), + byteBuffer.remaining()); + } else { + directAccessOutputStream.write(BinaryUtils.copyBytesFrom(byteBuffer)); + } + }); } @Override public void onError(Throwable throwable) { - baos = null; + directAccessOutputStream = null; resultFuture.completeExceptionally(throwable); } @Override public void onComplete() { - resultFuture.complete(baos.toByteArray()); + resultFuture.complete(directAccessOutputStream.toByteBuffer()); + } + } + + /** + * Custom ByteArrayOutputStream that exposes internal buffer without copying + */ + static class DirectAccessByteArrayOutputStream extends ByteArrayOutputStream { + + /** + * Returns the internal buffer wrapped as ByteBuffer with length set to count. + */ + ByteBuffer toByteBuffer() { + return ByteBuffer.wrap(buf, 0, count); } } } diff --git a/core/sdk-core/src/test/java/software/amazon/awssdk/core/ResponseBytesTest.java b/core/sdk-core/src/test/java/software/amazon/awssdk/core/ResponseBytesTest.java index f76b86f73ddf..471d2eb885f4 100644 --- a/core/sdk-core/src/test/java/software/amazon/awssdk/core/ResponseBytesTest.java +++ b/core/sdk-core/src/test/java/software/amazon/awssdk/core/ResponseBytesTest.java @@ -17,13 +17,14 @@ import static org.assertj.core.api.Assertions.assertThat; +import java.nio.ByteBuffer; import org.junit.jupiter.api.Test; public class ResponseBytesTest { private static final Object OBJECT = new Object(); @Test public void fromByteArrayCreatesCopy() { - byte[] input = new byte[] { 'a' }; + byte[] input = {'a'}; byte[] output = ResponseBytes.fromByteArray(OBJECT, input).asByteArrayUnsafe(); input[0] = 'b'; @@ -32,7 +33,7 @@ public void fromByteArrayCreatesCopy() { @Test public void asByteArrayCreatesCopy() { - byte[] input = new byte[] { 'a' }; + byte[] input = {'a'}; byte[] output = ResponseBytes.fromByteArrayUnsafe(OBJECT, input).asByteArray(); input[0] = 'b'; @@ -41,9 +42,27 @@ public void asByteArrayCreatesCopy() { @Test public void fromByteArrayUnsafeAndAsByteArrayUnsafeDoNotCopy() { - byte[] input = new byte[] { 'a' }; + byte[] input = {'a'}; byte[] output = ResponseBytes.fromByteArrayUnsafe(OBJECT, input).asByteArrayUnsafe(); assertThat(output).isSameAs(input); } + + @Test + public void fromByteBufferUnsafe_doNotCopy() { + byte[] inputBytes = {'a'}; + ByteBuffer inputBuffer = ByteBuffer.wrap(inputBytes); + + ResponseBytes responseBytes = ResponseBytes.fromByteBufferUnsafe(OBJECT, inputBuffer); + + ByteBuffer outputBuffer = responseBytes.asByteBuffer(); + byte[] outputBytes = responseBytes.asByteArrayUnsafe(); + + assertThat(outputBuffer).isEqualTo(inputBuffer); + assertThat(outputBytes).isSameAs(inputBytes); + + inputBytes[0] = 'b'; + assertThat(outputBuffer).isEqualTo(inputBuffer); + assertThat(outputBytes).isEqualTo(inputBytes); + } } diff --git a/core/sdk-core/src/test/java/software/amazon/awssdk/core/internal/async/FileAsyncRequestBodyTest.java b/core/sdk-core/src/test/java/software/amazon/awssdk/core/internal/async/FileAsyncRequestBodyTest.java index 5d12035c1879..4f20c77700ea 100644 --- a/core/sdk-core/src/test/java/software/amazon/awssdk/core/internal/async/FileAsyncRequestBodyTest.java +++ b/core/sdk-core/src/test/java/software/amazon/awssdk/core/internal/async/FileAsyncRequestBodyTest.java @@ -18,7 +18,6 @@ import static org.assertj.core.api.Assertions.assertThat; import static org.assertj.core.api.Assertions.assertThatThrownBy; import static org.junit.jupiter.api.Assertions.assertTrue; -import static software.amazon.awssdk.core.internal.async.SplittingPublisherTestUtils.verifyIndividualAsyncRequestBody; import static software.amazon.awssdk.utils.FunctionalUtils.invokeSafely; import java.io.ByteArrayOutputStream; @@ -38,12 +37,9 @@ import org.junit.jupiter.api.AfterEach; import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; -import org.junit.jupiter.params.ParameterizedTest; -import org.junit.jupiter.params.provider.ValueSource; import org.reactivestreams.Subscriber; import org.reactivestreams.Subscription; import software.amazon.awssdk.core.async.AsyncRequestBody; -import software.amazon.awssdk.core.async.SdkPublisher; import software.amazon.awssdk.testutils.RandomTempFile; import software.amazon.awssdk.utils.BinaryUtils; @@ -236,7 +232,7 @@ public void changingFile_fileGetsDeleted_failsBecauseDeleted() throws Exception @Test public void positionNotZero_shouldReadFromPosition() throws Exception { - CompletableFuture future = new CompletableFuture<>(); + CompletableFuture future = new CompletableFuture<>(); long position = 20L; AsyncRequestBody asyncRequestBody = FileAsyncRequestBody.builder() .path(smallFile) @@ -249,7 +245,9 @@ public void positionNotZero_shouldReadFromPosition() throws Exception { asyncRequestBody.subscribe(baosSubscriber); assertThat(asyncRequestBody.contentLength()).contains(80L); - byte[] bytes = future.get(1, TimeUnit.SECONDS); + ByteBuffer buffer = future.get(1, TimeUnit.SECONDS); + byte[] bytes = new byte[buffer.remaining()]; + buffer.get(bytes); byte[] expected = new byte[80]; try(FileInputStream inputStream = new FileInputStream(smallFile.toFile())) { @@ -262,7 +260,7 @@ public void positionNotZero_shouldReadFromPosition() throws Exception { @Test public void bothPositionAndNumBytesToReadConfigured_shouldHonor() throws Exception { - CompletableFuture future = new CompletableFuture<>(); + CompletableFuture future = new CompletableFuture<>(); long position = 20L; long numBytesToRead = 5L; AsyncRequestBody asyncRequestBody = FileAsyncRequestBody.builder() @@ -277,7 +275,9 @@ public void bothPositionAndNumBytesToReadConfigured_shouldHonor() throws Excepti asyncRequestBody.subscribe(baosSubscriber); assertThat(asyncRequestBody.contentLength()).contains(numBytesToRead); - byte[] bytes = future.get(1, TimeUnit.SECONDS); + ByteBuffer buffer = future.get(1, TimeUnit.SECONDS); + byte[] bytes = new byte[buffer.remaining()]; + buffer.get(bytes); byte[] expected = new byte[5]; try (FileInputStream inputStream = new FileInputStream(smallFile.toFile())) { @@ -290,7 +290,7 @@ public void bothPositionAndNumBytesToReadConfigured_shouldHonor() throws Excepti @Test public void numBytesToReadConfigured_shouldHonor() throws Exception { - CompletableFuture future = new CompletableFuture<>(); + CompletableFuture future = new CompletableFuture<>(); AsyncRequestBody asyncRequestBody = FileAsyncRequestBody.builder() .path(smallFile) .numBytesToRead(5L) @@ -302,7 +302,9 @@ public void numBytesToReadConfigured_shouldHonor() throws Exception { asyncRequestBody.subscribe(baosSubscriber); assertThat(asyncRequestBody.contentLength()).contains(5L); - byte[] bytes = future.get(1, TimeUnit.SECONDS); + ByteBuffer buffer = future.get(1, TimeUnit.SECONDS); + byte[] bytes = new byte[buffer.remaining()]; + buffer.get(bytes); byte[] expected = new byte[5]; try (FileInputStream inputStream = new FileInputStream(smallFile.toFile())) { diff --git a/core/sdk-core/src/test/java/software/amazon/awssdk/core/internal/async/PublisherAsyncResponseTransformerTest.java b/core/sdk-core/src/test/java/software/amazon/awssdk/core/internal/async/PublisherAsyncResponseTransformerTest.java index 321993f63838..f638e1feb98f 100644 --- a/core/sdk-core/src/test/java/software/amazon/awssdk/core/internal/async/PublisherAsyncResponseTransformerTest.java +++ b/core/sdk-core/src/test/java/software/amazon/awssdk/core/internal/async/PublisherAsyncResponseTransformerTest.java @@ -83,9 +83,13 @@ void failedStream_completesExceptionally() { } private static String drainPublisherToStr(SdkPublisher publisher) throws Exception { - CompletableFuture bodyFuture = new CompletableFuture<>(); + CompletableFuture bodyFuture = new CompletableFuture<>(); publisher.subscribe(new BaosSubscriber(bodyFuture)); - byte[] body = bodyFuture.get(); - return new String(body); + + ByteBuffer buffer = bodyFuture.get(); + byte[] bytes = new byte[buffer.remaining()]; + buffer.get(bytes); + + return new String(bytes); } } \ No newline at end of file diff --git a/core/sdk-core/src/test/java/software/amazon/awssdk/core/internal/async/SplittingPublisherTestUtils.java b/core/sdk-core/src/test/java/software/amazon/awssdk/core/internal/async/SplittingPublisherTestUtils.java index 04da97adbf42..6291df285d31 100644 --- a/core/sdk-core/src/test/java/software/amazon/awssdk/core/internal/async/SplittingPublisherTestUtils.java +++ b/core/sdk-core/src/test/java/software/amazon/awssdk/core/internal/async/SplittingPublisherTestUtils.java @@ -15,23 +15,16 @@ package software.amazon.awssdk.core.internal.async; -import static org.assertj.core.api.AssertionsForClassTypes.assertThat; - -import java.io.File; import java.io.FileInputStream; +import java.nio.ByteBuffer; import java.nio.file.Path; import java.util.ArrayList; import java.util.List; import java.util.concurrent.CompletableFuture; -import java.util.concurrent.ExecutionException; import java.util.concurrent.TimeUnit; -import java.util.concurrent.TimeoutException; import org.assertj.core.api.Assertions; -import org.reactivestreams.Publisher; import software.amazon.awssdk.core.async.AsyncRequestBody; import software.amazon.awssdk.core.async.SdkPublisher; -import software.amazon.awssdk.core.internal.async.ByteArrayAsyncResponseTransformer; -import software.amazon.awssdk.core.internal.async.SplittingPublisherTest; public final class SplittingPublisherTestUtils { @@ -39,9 +32,9 @@ public static void verifyIndividualAsyncRequestBody(SdkPublisher> futures = new ArrayList<>(); + List> futures = new ArrayList<>(); publisher.subscribe(requestBody -> { - CompletableFuture baosFuture = new CompletableFuture<>(); + CompletableFuture baosFuture = new CompletableFuture<>(); ByteArrayAsyncResponseTransformer.BaosSubscriber subscriber = new ByteArrayAsyncResponseTransformer.BaosSubscriber(baosFuture); requestBody.subscribe(subscriber); @@ -62,7 +55,10 @@ public static void verifyIndividualAsyncRequestBody(SdkPublisher Date: Mon, 18 Aug 2025 17:38:54 -0700 Subject: [PATCH 2/2] Add tests and update javadocs --- .../amazon/awssdk/core/ResponseBytes.java | 23 +++++++-- .../amazon/awssdk/core/ResponseBytesTest.java | 47 +++++++++++++++++-- 2 files changed, 61 insertions(+), 9 deletions(-) diff --git a/core/sdk-core/src/main/java/software/amazon/awssdk/core/ResponseBytes.java b/core/sdk-core/src/main/java/software/amazon/awssdk/core/ResponseBytes.java index 193e0482ca8e..305b826c4340 100644 --- a/core/sdk-core/src/main/java/software/amazon/awssdk/core/ResponseBytes.java +++ b/core/sdk-core/src/main/java/software/amazon/awssdk/core/ResponseBytes.java @@ -70,11 +70,24 @@ public static ResponseBytes fromByteArrayUnsafe(ResponseT } /** - * Creates ResponseBytes from a ByteBuffer without copying the underlying data. + * Create {@link ResponseBytes} from a {@link ByteBuffer} with minimal copying. This method attempts to avoid + * copying data when possible, but introduces concurrency risks in specific scenarios. * - * @param response the response object containing metadata - * @param buffer the ByteBuffer containing the response body data - * @return ResponseBytes wrapping the buffer data + *

Behavior by buffer type: + *

    + *
  • Array-backed ByteBuffer (perfect match): When the buffer represents the entire backing array + * (offset=0, remaining=array.length), the array is shared without copying. This introduces the same + * concurrency risks as {@link #fromByteArrayUnsafe(Object, byte[])}: modifications to the original + * backing array will affect the returned {@link ResponseBytes}.
  • + *
  • Array-backed ByteBuffer (partial): When the buffer represents only a portion of the backing array, + * data is copied to a new array. No concurrency risks.
  • + *
  • Direct ByteBuffer: Data is always copied to a heap array. No concurrency risks.
  • + *
+ * + *

The buffer's position is preserved and not modified by this operation. + * + *

As the method name implies, this is unsafe in the first scenario. Use a safe alternative unless you're + * sure you know the risks. */ public static ResponseBytes fromByteBufferUnsafe(ResponseT response, ByteBuffer buffer) { byte[] array; @@ -91,7 +104,9 @@ public static ResponseBytes fromByteBufferUnsafe(Response } else { // Direct buffer - must copy to array array = new byte[buffer.remaining()]; + int originalPosition = buffer.position(); buffer.get(array); + buffer.position(originalPosition); } return new ResponseBytes<>(response, array); } diff --git a/core/sdk-core/src/test/java/software/amazon/awssdk/core/ResponseBytesTest.java b/core/sdk-core/src/test/java/software/amazon/awssdk/core/ResponseBytesTest.java index 471d2eb885f4..17915a538fbe 100644 --- a/core/sdk-core/src/test/java/software/amazon/awssdk/core/ResponseBytesTest.java +++ b/core/sdk-core/src/test/java/software/amazon/awssdk/core/ResponseBytesTest.java @@ -49,20 +49,57 @@ public void fromByteArrayUnsafeAndAsByteArrayUnsafeDoNotCopy() { } @Test - public void fromByteBufferUnsafe_doNotCopy() { + public void fromByteBufferUnsafe_fullBuffer_doesNotCopy() { byte[] inputBytes = {'a'}; ByteBuffer inputBuffer = ByteBuffer.wrap(inputBytes); ResponseBytes responseBytes = ResponseBytes.fromByteBufferUnsafe(OBJECT, inputBuffer); - - ByteBuffer outputBuffer = responseBytes.asByteBuffer(); byte[] outputBytes = responseBytes.asByteArrayUnsafe(); - assertThat(outputBuffer).isEqualTo(inputBuffer); + assertThat(inputBuffer.hasArray()).isTrue(); + assertThat(inputBuffer.isDirect()).isFalse(); assertThat(outputBytes).isSameAs(inputBytes); inputBytes[0] = 'b'; - assertThat(outputBuffer).isEqualTo(inputBuffer); + assertThat(outputBytes[0]).isEqualTo((byte) 'b'); + } + + @Test + public void fromByteBufferUnsafe_directBuffer_createsCopy() { + byte[] inputBytes = {'a'}; + ByteBuffer directBuffer = ByteBuffer.allocateDirect(1); + directBuffer.put(inputBytes); + directBuffer.flip(); + + ResponseBytes responseBytes = ResponseBytes.fromByteBufferUnsafe(OBJECT, directBuffer); + ByteBuffer outputBuffer = responseBytes.asByteBuffer(); + byte[] outputBytes = responseBytes.asByteArrayUnsafe(); + + assertThat(directBuffer.hasArray()).isFalse(); + assertThat(directBuffer.isDirect()).isTrue(); + assertThat(outputBuffer.isDirect()).isFalse(); assertThat(outputBytes).isEqualTo(inputBytes); + assertThat(outputBytes).isNotSameAs(inputBytes); + + inputBytes[0] = 'b'; + assertThat(outputBytes[0]).isNotEqualTo((byte) 'b'); + } + + @Test + public void fromByteBufferUnsafe_bufferWithOffset_createsCopy() { + byte[] inputBytes = "abcdefgh".getBytes(); + + ByteBuffer slicedBuffer = ByteBuffer.wrap(inputBytes, 2, 3); // "cde" + + ResponseBytes responseBytes = ResponseBytes.fromByteBufferUnsafe(OBJECT, slicedBuffer); + byte[] outputBytes = responseBytes.asByteArrayUnsafe(); + + assertThat(slicedBuffer.hasArray()).isTrue(); + assertThat(outputBytes).isEqualTo("cde".getBytes()); + assertThat(outputBytes.length).isEqualTo(3); + assertThat(outputBytes).isNotSameAs(inputBytes); + + inputBytes[0] = 'X'; + assertThat(outputBytes[0]).isEqualTo((byte) 'c'); } }