Skip to content

Commit 08bae37

Browse files
authored
Avoid extra byte array copying when downloading to memory with AsyncResponseTransformer (#6355)
* Avoid extra byte array copying when downloading to memory with AsyncResponseTransformer * Add tests and update javadocs
1 parent 09a4b8f commit 08bae37

File tree

7 files changed

+164
-40
lines changed

7 files changed

+164
-40
lines changed
Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,6 @@
1+
{
2+
"type": "feature",
3+
"category": "AWS SDK for Java v2",
4+
"contributor": "",
5+
"description": "Avoid extra byte array copying when downloading to memory with AsyncResponseTransformer"
6+
}

core/sdk-core/src/main/java/software/amazon/awssdk/core/ResponseBytes.java

Lines changed: 43 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@
1919

2020
import java.io.InputStream;
2121
import java.io.UncheckedIOException;
22+
import java.nio.ByteBuffer;
2223
import java.util.Arrays;
2324
import software.amazon.awssdk.annotations.SdkPublicApi;
2425
import software.amazon.awssdk.core.async.AsyncResponseTransformer;
@@ -68,6 +69,48 @@ public static <ResponseT> ResponseBytes<ResponseT> fromByteArrayUnsafe(ResponseT
6869
return new ResponseBytes<>(response, bytes);
6970
}
7071

72+
/**
73+
* Create {@link ResponseBytes} from a {@link ByteBuffer} with minimal copying. This method attempts to avoid
74+
* copying data when possible, but introduces concurrency risks in specific scenarios.
75+
*
76+
* <p><b>Behavior by buffer type:</b>
77+
* <ul>
78+
* <li><b>Array-backed ByteBuffer (perfect match):</b> When the buffer represents the entire backing array
79+
* (offset=0, remaining=array.length), the array is shared <b>without</b> copying. This introduces the same
80+
* concurrency risks as {@link #fromByteArrayUnsafe(Object, byte[])}: modifications to the original
81+
* backing array will affect the returned {@link ResponseBytes}.</li>
82+
* <li><b>Array-backed ByteBuffer (partial):</b> When the buffer represents only a portion of the backing array,
83+
* data is copied to a new array. No concurrency risks.</li>
84+
* <li><b>Direct ByteBuffer:</b> Data is always copied to a heap array. No concurrency risks.</li>
85+
* </ul>
86+
*
87+
* <p>The buffer's position is preserved and not modified by this operation.
88+
*
89+
* <p>As the method name implies, this is unsafe in the first scenario. Use a safe alternative unless you're
90+
* sure you know the risks.
91+
*/
92+
public static <ResponseT> ResponseBytes<ResponseT> fromByteBufferUnsafe(ResponseT response, ByteBuffer buffer) {
93+
byte[] array;
94+
if (buffer.hasArray()) {
95+
array = buffer.array();
96+
int offset = buffer.arrayOffset() + buffer.position();
97+
int length = buffer.remaining();
98+
if (offset == 0 && length == array.length) {
99+
// Perfect match - use array directly
100+
} else {
101+
// Create view of the relevant portion
102+
array = Arrays.copyOfRange(array, offset, offset + length);
103+
}
104+
} else {
105+
// Direct buffer - must copy to array
106+
array = new byte[buffer.remaining()];
107+
int originalPosition = buffer.position();
108+
buffer.get(array);
109+
buffer.position(originalPosition);
110+
}
111+
return new ResponseBytes<>(response, array);
112+
}
113+
71114
/**
72115
* @return the unmarshalled response object from the service.
73116
*/

core/sdk-core/src/main/java/software/amazon/awssdk/core/internal/async/ByteArrayAsyncResponseTransformer.java

Lines changed: 30 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -41,15 +41,15 @@
4141
public final class ByteArrayAsyncResponseTransformer<ResponseT> implements
4242
AsyncResponseTransformer<ResponseT, ResponseBytes<ResponseT>> {
4343

44-
private volatile CompletableFuture<byte[]> cf;
44+
private volatile CompletableFuture<ByteBuffer> cf;
4545
private volatile ResponseT response;
4646

4747
@Override
4848
public CompletableFuture<ResponseBytes<ResponseT>> prepare() {
4949
cf = new CompletableFuture<>();
50-
// Using fromByteArrayUnsafe() to avoid unnecessary extra copying of byte array. The data writing has completed and the
51-
// byte array will not be further modified so this is safe
52-
return cf.thenApply(arr -> ResponseBytes.fromByteArrayUnsafe(response, arr));
50+
// Using fromByteBufferUnsafe() to avoid unnecessary extra copying of byte array. The data writing has completed and the
51+
// byte buffer will not be further modified so this is safe
52+
return cf.thenApply(buffer -> ResponseBytes.fromByteBufferUnsafe(response, buffer));
5353
}
5454

5555
@Override
@@ -73,13 +73,11 @@ public String name() {
7373
}
7474

7575
static class BaosSubscriber implements Subscriber<ByteBuffer> {
76-
private final CompletableFuture<byte[]> resultFuture;
77-
78-
private ByteArrayOutputStream baos = new ByteArrayOutputStream();
79-
76+
private final CompletableFuture<ByteBuffer> resultFuture;
77+
private DirectAccessByteArrayOutputStream directAccessOutputStream = new DirectAccessByteArrayOutputStream();
8078
private Subscription subscription;
8179

82-
BaosSubscriber(CompletableFuture<byte[]> resultFuture) {
80+
BaosSubscriber(CompletableFuture<ByteBuffer> resultFuture) {
8381
this.resultFuture = resultFuture;
8482
}
8583

@@ -95,19 +93,38 @@ public void onSubscribe(Subscription s) {
9593

9694
@Override
9795
public void onNext(ByteBuffer byteBuffer) {
98-
invokeSafely(() -> baos.write(BinaryUtils.copyBytesFrom(byteBuffer)));
99-
subscription.request(1);
96+
invokeSafely(() -> {
97+
if (byteBuffer.hasArray()) {
98+
directAccessOutputStream.write(byteBuffer.array(), byteBuffer.arrayOffset() + byteBuffer.position(),
99+
byteBuffer.remaining());
100+
} else {
101+
directAccessOutputStream.write(BinaryUtils.copyBytesFrom(byteBuffer));
102+
}
103+
});
100104
}
101105

102106
@Override
103107
public void onError(Throwable throwable) {
104-
baos = null;
108+
directAccessOutputStream = null;
105109
resultFuture.completeExceptionally(throwable);
106110
}
107111

108112
@Override
109113
public void onComplete() {
110-
resultFuture.complete(baos.toByteArray());
114+
resultFuture.complete(directAccessOutputStream.toByteBuffer());
115+
}
116+
}
117+
118+
/**
119+
* Custom ByteArrayOutputStream that exposes internal buffer without copying
120+
*/
121+
static class DirectAccessByteArrayOutputStream extends ByteArrayOutputStream {
122+
123+
/**
124+
* Returns the internal buffer wrapped as ByteBuffer with length set to count.
125+
*/
126+
ByteBuffer toByteBuffer() {
127+
return ByteBuffer.wrap(buf, 0, count);
111128
}
112129
}
113130
}

core/sdk-core/src/test/java/software/amazon/awssdk/core/ResponseBytesTest.java

Lines changed: 59 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -17,13 +17,14 @@
1717

1818
import static org.assertj.core.api.Assertions.assertThat;
1919

20+
import java.nio.ByteBuffer;
2021
import org.junit.jupiter.api.Test;
2122

2223
public class ResponseBytesTest {
2324
private static final Object OBJECT = new Object();
2425
@Test
2526
public void fromByteArrayCreatesCopy() {
26-
byte[] input = new byte[] { 'a' };
27+
byte[] input = {'a'};
2728
byte[] output = ResponseBytes.fromByteArray(OBJECT, input).asByteArrayUnsafe();
2829

2930
input[0] = 'b';
@@ -32,7 +33,7 @@ public void fromByteArrayCreatesCopy() {
3233

3334
@Test
3435
public void asByteArrayCreatesCopy() {
35-
byte[] input = new byte[] { 'a' };
36+
byte[] input = {'a'};
3637
byte[] output = ResponseBytes.fromByteArrayUnsafe(OBJECT, input).asByteArray();
3738

3839
input[0] = 'b';
@@ -41,9 +42,64 @@ public void asByteArrayCreatesCopy() {
4142

4243
@Test
4344
public void fromByteArrayUnsafeAndAsByteArrayUnsafeDoNotCopy() {
44-
byte[] input = new byte[] { 'a' };
45+
byte[] input = {'a'};
4546
byte[] output = ResponseBytes.fromByteArrayUnsafe(OBJECT, input).asByteArrayUnsafe();
4647

4748
assertThat(output).isSameAs(input);
4849
}
50+
51+
@Test
52+
public void fromByteBufferUnsafe_fullBuffer_doesNotCopy() {
53+
byte[] inputBytes = {'a'};
54+
ByteBuffer inputBuffer = ByteBuffer.wrap(inputBytes);
55+
56+
ResponseBytes<Object> responseBytes = ResponseBytes.fromByteBufferUnsafe(OBJECT, inputBuffer);
57+
byte[] outputBytes = responseBytes.asByteArrayUnsafe();
58+
59+
assertThat(inputBuffer.hasArray()).isTrue();
60+
assertThat(inputBuffer.isDirect()).isFalse();
61+
assertThat(outputBytes).isSameAs(inputBytes);
62+
63+
inputBytes[0] = 'b';
64+
assertThat(outputBytes[0]).isEqualTo((byte) 'b');
65+
}
66+
67+
@Test
68+
public void fromByteBufferUnsafe_directBuffer_createsCopy() {
69+
byte[] inputBytes = {'a'};
70+
ByteBuffer directBuffer = ByteBuffer.allocateDirect(1);
71+
directBuffer.put(inputBytes);
72+
directBuffer.flip();
73+
74+
ResponseBytes<Object> responseBytes = ResponseBytes.fromByteBufferUnsafe(OBJECT, directBuffer);
75+
ByteBuffer outputBuffer = responseBytes.asByteBuffer();
76+
byte[] outputBytes = responseBytes.asByteArrayUnsafe();
77+
78+
assertThat(directBuffer.hasArray()).isFalse();
79+
assertThat(directBuffer.isDirect()).isTrue();
80+
assertThat(outputBuffer.isDirect()).isFalse();
81+
assertThat(outputBytes).isEqualTo(inputBytes);
82+
assertThat(outputBytes).isNotSameAs(inputBytes);
83+
84+
inputBytes[0] = 'b';
85+
assertThat(outputBytes[0]).isNotEqualTo((byte) 'b');
86+
}
87+
88+
@Test
89+
public void fromByteBufferUnsafe_bufferWithOffset_createsCopy() {
90+
byte[] inputBytes = "abcdefgh".getBytes();
91+
92+
ByteBuffer slicedBuffer = ByteBuffer.wrap(inputBytes, 2, 3); // "cde"
93+
94+
ResponseBytes<Object> responseBytes = ResponseBytes.fromByteBufferUnsafe(OBJECT, slicedBuffer);
95+
byte[] outputBytes = responseBytes.asByteArrayUnsafe();
96+
97+
assertThat(slicedBuffer.hasArray()).isTrue();
98+
assertThat(outputBytes).isEqualTo("cde".getBytes());
99+
assertThat(outputBytes.length).isEqualTo(3);
100+
assertThat(outputBytes).isNotSameAs(inputBytes);
101+
102+
inputBytes[0] = 'X';
103+
assertThat(outputBytes[0]).isEqualTo((byte) 'c');
104+
}
49105
}

core/sdk-core/src/test/java/software/amazon/awssdk/core/internal/async/FileAsyncRequestBodyTest.java

Lines changed: 12 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -18,7 +18,6 @@
1818
import static org.assertj.core.api.Assertions.assertThat;
1919
import static org.assertj.core.api.Assertions.assertThatThrownBy;
2020
import static org.junit.jupiter.api.Assertions.assertTrue;
21-
import static software.amazon.awssdk.core.internal.async.SplittingPublisherTestUtils.verifyIndividualAsyncRequestBody;
2221
import static software.amazon.awssdk.utils.FunctionalUtils.invokeSafely;
2322

2423
import java.io.ByteArrayOutputStream;
@@ -38,12 +37,9 @@
3837
import org.junit.jupiter.api.AfterEach;
3938
import org.junit.jupiter.api.BeforeEach;
4039
import org.junit.jupiter.api.Test;
41-
import org.junit.jupiter.params.ParameterizedTest;
42-
import org.junit.jupiter.params.provider.ValueSource;
4340
import org.reactivestreams.Subscriber;
4441
import org.reactivestreams.Subscription;
4542
import software.amazon.awssdk.core.async.AsyncRequestBody;
46-
import software.amazon.awssdk.core.async.SdkPublisher;
4743
import software.amazon.awssdk.testutils.RandomTempFile;
4844
import software.amazon.awssdk.utils.BinaryUtils;
4945

@@ -236,7 +232,7 @@ public void changingFile_fileGetsDeleted_failsBecauseDeleted() throws Exception
236232

237233
@Test
238234
public void positionNotZero_shouldReadFromPosition() throws Exception {
239-
CompletableFuture<byte[]> future = new CompletableFuture<>();
235+
CompletableFuture<ByteBuffer> future = new CompletableFuture<>();
240236
long position = 20L;
241237
AsyncRequestBody asyncRequestBody = FileAsyncRequestBody.builder()
242238
.path(smallFile)
@@ -249,7 +245,9 @@ public void positionNotZero_shouldReadFromPosition() throws Exception {
249245
asyncRequestBody.subscribe(baosSubscriber);
250246
assertThat(asyncRequestBody.contentLength()).contains(80L);
251247

252-
byte[] bytes = future.get(1, TimeUnit.SECONDS);
248+
ByteBuffer buffer = future.get(1, TimeUnit.SECONDS);
249+
byte[] bytes = new byte[buffer.remaining()];
250+
buffer.get(bytes);
253251

254252
byte[] expected = new byte[80];
255253
try(FileInputStream inputStream = new FileInputStream(smallFile.toFile())) {
@@ -262,7 +260,7 @@ public void positionNotZero_shouldReadFromPosition() throws Exception {
262260

263261
@Test
264262
public void bothPositionAndNumBytesToReadConfigured_shouldHonor() throws Exception {
265-
CompletableFuture<byte[]> future = new CompletableFuture<>();
263+
CompletableFuture<ByteBuffer> future = new CompletableFuture<>();
266264
long position = 20L;
267265
long numBytesToRead = 5L;
268266
AsyncRequestBody asyncRequestBody = FileAsyncRequestBody.builder()
@@ -277,7 +275,9 @@ public void bothPositionAndNumBytesToReadConfigured_shouldHonor() throws Excepti
277275
asyncRequestBody.subscribe(baosSubscriber);
278276
assertThat(asyncRequestBody.contentLength()).contains(numBytesToRead);
279277

280-
byte[] bytes = future.get(1, TimeUnit.SECONDS);
278+
ByteBuffer buffer = future.get(1, TimeUnit.SECONDS);
279+
byte[] bytes = new byte[buffer.remaining()];
280+
buffer.get(bytes);
281281

282282
byte[] expected = new byte[5];
283283
try (FileInputStream inputStream = new FileInputStream(smallFile.toFile())) {
@@ -290,7 +290,7 @@ public void bothPositionAndNumBytesToReadConfigured_shouldHonor() throws Excepti
290290

291291
@Test
292292
public void numBytesToReadConfigured_shouldHonor() throws Exception {
293-
CompletableFuture<byte[]> future = new CompletableFuture<>();
293+
CompletableFuture<ByteBuffer> future = new CompletableFuture<>();
294294
AsyncRequestBody asyncRequestBody = FileAsyncRequestBody.builder()
295295
.path(smallFile)
296296
.numBytesToRead(5L)
@@ -302,7 +302,9 @@ public void numBytesToReadConfigured_shouldHonor() throws Exception {
302302
asyncRequestBody.subscribe(baosSubscriber);
303303
assertThat(asyncRequestBody.contentLength()).contains(5L);
304304

305-
byte[] bytes = future.get(1, TimeUnit.SECONDS);
305+
ByteBuffer buffer = future.get(1, TimeUnit.SECONDS);
306+
byte[] bytes = new byte[buffer.remaining()];
307+
buffer.get(bytes);
306308

307309
byte[] expected = new byte[5];
308310
try (FileInputStream inputStream = new FileInputStream(smallFile.toFile())) {

core/sdk-core/src/test/java/software/amazon/awssdk/core/internal/async/PublisherAsyncResponseTransformerTest.java

Lines changed: 7 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -83,9 +83,13 @@ void failedStream_completesExceptionally() {
8383
}
8484

8585
private static String drainPublisherToStr(SdkPublisher<ByteBuffer> publisher) throws Exception {
86-
CompletableFuture<byte[]> bodyFuture = new CompletableFuture<>();
86+
CompletableFuture<ByteBuffer> bodyFuture = new CompletableFuture<>();
8787
publisher.subscribe(new BaosSubscriber(bodyFuture));
88-
byte[] body = bodyFuture.get();
89-
return new String(body);
88+
89+
ByteBuffer buffer = bodyFuture.get();
90+
byte[] bytes = new byte[buffer.remaining()];
91+
buffer.get(bytes);
92+
93+
return new String(bytes);
9094
}
9195
}

core/sdk-core/src/test/java/software/amazon/awssdk/core/internal/async/SplittingPublisherTestUtils.java

Lines changed: 7 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -15,33 +15,26 @@
1515

1616
package software.amazon.awssdk.core.internal.async;
1717

18-
import static org.assertj.core.api.AssertionsForClassTypes.assertThat;
19-
20-
import java.io.File;
2118
import java.io.FileInputStream;
19+
import java.nio.ByteBuffer;
2220
import java.nio.file.Path;
2321
import java.util.ArrayList;
2422
import java.util.List;
2523
import java.util.concurrent.CompletableFuture;
26-
import java.util.concurrent.ExecutionException;
2724
import java.util.concurrent.TimeUnit;
28-
import java.util.concurrent.TimeoutException;
2925
import org.assertj.core.api.Assertions;
30-
import org.reactivestreams.Publisher;
3126
import software.amazon.awssdk.core.async.AsyncRequestBody;
3227
import software.amazon.awssdk.core.async.SdkPublisher;
33-
import software.amazon.awssdk.core.internal.async.ByteArrayAsyncResponseTransformer;
34-
import software.amazon.awssdk.core.internal.async.SplittingPublisherTest;
3528

3629
public final class SplittingPublisherTestUtils {
3730

3831
public static void verifyIndividualAsyncRequestBody(SdkPublisher<AsyncRequestBody> publisher,
3932
Path file,
4033
int chunkSize) throws Exception {
4134

42-
List<CompletableFuture<byte[]>> futures = new ArrayList<>();
35+
List<CompletableFuture<ByteBuffer>> futures = new ArrayList<>();
4336
publisher.subscribe(requestBody -> {
44-
CompletableFuture<byte[]> baosFuture = new CompletableFuture<>();
37+
CompletableFuture<ByteBuffer> baosFuture = new CompletableFuture<>();
4538
ByteArrayAsyncResponseTransformer.BaosSubscriber subscriber =
4639
new ByteArrayAsyncResponseTransformer.BaosSubscriber(baosFuture);
4740
requestBody.subscribe(subscriber);
@@ -62,7 +55,10 @@ public static void verifyIndividualAsyncRequestBody(SdkPublisher<AsyncRequestBod
6255
}
6356
fileInputStream.skip(i * chunkSize);
6457
fileInputStream.read(expected);
65-
byte[] actualBytes = futures.get(i).join();
58+
ByteBuffer actualByteBuffer = futures.get(i).join();
59+
byte[] actualBytes = new byte[actualByteBuffer.remaining()];
60+
actualByteBuffer.get(actualBytes);
61+
6662
Assertions.assertThat(actualBytes).isEqualTo(expected);
6763
}
6864
}

0 commit comments

Comments
 (0)