Skip to content

Commit 8c2b5a6

Browse files
committed
Address comments and avoid copying buffer
1 parent 0ea7599 commit 8c2b5a6

File tree

6 files changed

+103
-40
lines changed

6 files changed

+103
-40
lines changed

core/sdk-core/src/main/java/software/amazon/awssdk/core/ResponseBytes.java

Lines changed: 28 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@
1919

2020
import java.io.InputStream;
2121
import java.io.UncheckedIOException;
22+
import java.nio.ByteBuffer;
2223
import java.util.Arrays;
2324
import software.amazon.awssdk.annotations.SdkPublicApi;
2425
import software.amazon.awssdk.core.async.AsyncResponseTransformer;
@@ -68,6 +69,33 @@ public static <ResponseT> ResponseBytes<ResponseT> fromByteArrayUnsafe(ResponseT
6869
return new ResponseBytes<>(response, bytes);
6970
}
7071

72+
/**
73+
* Creates ResponseBytes from a ByteBuffer without copying the underlying data.
74+
*
75+
* @param response the response object containing metadata
76+
* @param buffer the ByteBuffer containing the response body data
77+
* @return ResponseBytes wrapping the buffer data
78+
*/
79+
public static <ResponseT> ResponseBytes<ResponseT> fromByteBufferUnsafe(ResponseT response, ByteBuffer buffer) {
80+
byte[] array;
81+
if (buffer.hasArray()) {
82+
array = buffer.array();
83+
int offset = buffer.arrayOffset() + buffer.position();
84+
int length = buffer.remaining();
85+
if (offset == 0 && length == array.length) {
86+
// Perfect match - use array directly
87+
} else {
88+
// Create view of the relevant portion
89+
array = Arrays.copyOfRange(array, offset, offset + length);
90+
}
91+
} else {
92+
// Direct buffer - must copy to array
93+
array = new byte[buffer.remaining()];
94+
buffer.get(array);
95+
}
96+
return new ResponseBytes<>(response, array);
97+
}
98+
7199
/**
72100
* @return the unmarshalled response object from the service.
73101
*/

core/sdk-core/src/main/java/software/amazon/awssdk/core/internal/async/ByteArrayAsyncResponseTransformer.java

Lines changed: 33 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -42,19 +42,23 @@
4242
public final class ByteArrayAsyncResponseTransformer<ResponseT> implements
4343
AsyncResponseTransformer<ResponseT, ResponseBytes<ResponseT>> {
4444

45-
private volatile CompletableFuture<byte[]> cf;
45+
private volatile CompletableFuture<ByteBuffer> cf;
4646
private volatile ResponseT response;
4747

4848
@Override
4949
public CompletableFuture<ResponseBytes<ResponseT>> prepare() {
5050
cf = new CompletableFuture<>();
51-
// Using fromByteArrayUnsafe() to avoid unnecessary extra copying of byte array. The data writing has completed and the
51+
// Using fromByteBufferUnsafe() to avoid unnecessary extra copying of byte array. The data writing has completed and the
5252
// byte array will not be further modified so this is safe
53-
return cf.thenApply(arr -> ResponseBytes.fromByteArrayUnsafe(response, arr));
53+
return cf.thenApply(buffer -> ResponseBytes.fromByteBufferUnsafe(response, buffer));
5454
}
5555

5656
@Override
5757
public void onResponse(ResponseT response) {
58+
if (response == null) {
59+
return;
60+
}
61+
5862
this.response = response;
5963
}
6064

@@ -85,13 +89,11 @@ public String name() {
8589
}
8690

8791
static class BaosSubscriber implements Subscriber<ByteBuffer> {
88-
private final CompletableFuture<byte[]> resultFuture;
89-
90-
private ByteArrayOutputStream baos = new ByteArrayOutputStream();
91-
92+
private final CompletableFuture<ByteBuffer> resultFuture;
93+
private DirectAccessByteArrayOutputStream directAccessOutputStream = new DirectAccessByteArrayOutputStream();
9294
private Subscription subscription;
9395

94-
BaosSubscriber(CompletableFuture<byte[]> resultFuture) {
96+
BaosSubscriber(CompletableFuture<ByteBuffer> resultFuture) {
9597
this.resultFuture = resultFuture;
9698
}
9799

@@ -107,19 +109,38 @@ public void onSubscribe(Subscription s) {
107109

108110
@Override
109111
public void onNext(ByteBuffer byteBuffer) {
110-
invokeSafely(() -> baos.write(BinaryUtils.copyBytesFrom(byteBuffer)));
111-
subscription.request(1);
112+
invokeSafely(() -> {
113+
if (byteBuffer.hasArray()) {
114+
directAccessOutputStream.write(byteBuffer.array(), byteBuffer.arrayOffset() + byteBuffer.position(),
115+
byteBuffer.remaining());
116+
} else {
117+
directAccessOutputStream.write(BinaryUtils.copyBytesFrom(byteBuffer));
118+
}
119+
});
112120
}
113121

114122
@Override
115123
public void onError(Throwable throwable) {
116-
baos = null;
124+
directAccessOutputStream = null;
117125
resultFuture.completeExceptionally(throwable);
118126
}
119127

120128
@Override
121129
public void onComplete() {
122-
resultFuture.complete(baos.toByteArray());
130+
resultFuture.complete(directAccessOutputStream.toByteBuffer());
131+
}
132+
}
133+
134+
/**
135+
* Custom ByteArrayOutputStream that exposes internal buffer without copying
136+
*/
137+
static class DirectAccessByteArrayOutputStream extends ByteArrayOutputStream {
138+
139+
/**
140+
* Returns the internal buffer wrapped as ByteBuffer with length set to count.
141+
*/
142+
ByteBuffer toByteBuffer() {
143+
return ByteBuffer.wrap(buf, 0, count);
123144
}
124145
}
125146
}

core/sdk-core/src/main/java/software/amazon/awssdk/core/internal/async/ByteArraySplittingTransformer.java

Lines changed: 16 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -34,6 +34,15 @@
3434
import software.amazon.awssdk.utils.Logger;
3535
import software.amazon.awssdk.utils.async.SimplePublisher;
3636

37+
/**
38+
* A splitting transformer that creates individual {@link ByteArrayAsyncResponseTransformer} instances for each part of a
39+
* multipart download. This is necessary to support retries of individual part downloads.
40+
*
41+
* <p>
42+
* This class is created by {@link ByteArrayAsyncResponseTransformer#split} and used internally by the multipart
43+
* download logic.
44+
*/
45+
3746
@SdkInternalApi
3847
public class ByteArraySplittingTransformer<ResponseT> implements SdkPublisher<AsyncResponseTransformer<ResponseT, ResponseT>> {
3948
private static final Logger log = Logger.loggerFor(ByteArraySplittingTransformer.class);
@@ -56,6 +65,10 @@ public class ByteArraySplittingTransformer<ResponseT> implements SdkPublisher<As
5665
*/
5766
private final AtomicBoolean emitting = new AtomicBoolean(false);
5867

68+
/**
69+
* Synchronization lock that protects the {@code onStreamCalled} flag and cancellation
70+
* workflow from concurrent access. Ensures thread-safety of subscription cancellation.
71+
*/
5972
private final Object lock = new Object();
6073

6174
/**
@@ -134,10 +147,9 @@ private boolean doEmit() {
134147
if (isCancelled.get()) {
135148
return true;
136149
}
137-
if (outstandingDemand.get() > 0) {
138-
demand = outstandingDemand.decrementAndGet();
139-
downstreamSubscriber.onNext(new IndividualTransformer(nextPartNumber.getAndIncrement()));
140-
}
150+
151+
demand = outstandingDemand.decrementAndGet();
152+
downstreamSubscriber.onNext(new IndividualTransformer(nextPartNumber.getAndIncrement()));
141153
}
142154
return false;
143155
}

core/sdk-core/src/test/java/software/amazon/awssdk/core/internal/async/FileAsyncRequestBodyTest.java

Lines changed: 12 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -18,7 +18,6 @@
1818
import static org.assertj.core.api.Assertions.assertThat;
1919
import static org.assertj.core.api.Assertions.assertThatThrownBy;
2020
import static org.junit.jupiter.api.Assertions.assertTrue;
21-
import static software.amazon.awssdk.core.internal.async.SplittingPublisherTestUtils.verifyIndividualAsyncRequestBody;
2221
import static software.amazon.awssdk.utils.FunctionalUtils.invokeSafely;
2322

2423
import java.io.ByteArrayOutputStream;
@@ -38,12 +37,9 @@
3837
import org.junit.jupiter.api.AfterEach;
3938
import org.junit.jupiter.api.BeforeEach;
4039
import org.junit.jupiter.api.Test;
41-
import org.junit.jupiter.params.ParameterizedTest;
42-
import org.junit.jupiter.params.provider.ValueSource;
4340
import org.reactivestreams.Subscriber;
4441
import org.reactivestreams.Subscription;
4542
import software.amazon.awssdk.core.async.AsyncRequestBody;
46-
import software.amazon.awssdk.core.async.SdkPublisher;
4743
import software.amazon.awssdk.testutils.RandomTempFile;
4844
import software.amazon.awssdk.utils.BinaryUtils;
4945

@@ -236,7 +232,7 @@ public void changingFile_fileGetsDeleted_failsBecauseDeleted() throws Exception
236232

237233
@Test
238234
public void positionNotZero_shouldReadFromPosition() throws Exception {
239-
CompletableFuture<byte[]> future = new CompletableFuture<>();
235+
CompletableFuture<ByteBuffer> future = new CompletableFuture<>();
240236
long position = 20L;
241237
AsyncRequestBody asyncRequestBody = FileAsyncRequestBody.builder()
242238
.path(smallFile)
@@ -249,7 +245,9 @@ public void positionNotZero_shouldReadFromPosition() throws Exception {
249245
asyncRequestBody.subscribe(baosSubscriber);
250246
assertThat(asyncRequestBody.contentLength()).contains(80L);
251247

252-
byte[] bytes = future.get(1, TimeUnit.SECONDS);
248+
ByteBuffer buffer = future.get(1, TimeUnit.SECONDS);
249+
byte[] bytes = new byte[buffer.remaining()];
250+
buffer.get(bytes);
253251

254252
byte[] expected = new byte[80];
255253
try(FileInputStream inputStream = new FileInputStream(smallFile.toFile())) {
@@ -262,7 +260,7 @@ public void positionNotZero_shouldReadFromPosition() throws Exception {
262260

263261
@Test
264262
public void bothPositionAndNumBytesToReadConfigured_shouldHonor() throws Exception {
265-
CompletableFuture<byte[]> future = new CompletableFuture<>();
263+
CompletableFuture<ByteBuffer> future = new CompletableFuture<>();
266264
long position = 20L;
267265
long numBytesToRead = 5L;
268266
AsyncRequestBody asyncRequestBody = FileAsyncRequestBody.builder()
@@ -277,7 +275,9 @@ public void bothPositionAndNumBytesToReadConfigured_shouldHonor() throws Excepti
277275
asyncRequestBody.subscribe(baosSubscriber);
278276
assertThat(asyncRequestBody.contentLength()).contains(numBytesToRead);
279277

280-
byte[] bytes = future.get(1, TimeUnit.SECONDS);
278+
ByteBuffer buffer = future.get(1, TimeUnit.SECONDS);
279+
byte[] bytes = new byte[buffer.remaining()];
280+
buffer.get(bytes);
281281

282282
byte[] expected = new byte[5];
283283
try (FileInputStream inputStream = new FileInputStream(smallFile.toFile())) {
@@ -290,7 +290,7 @@ public void bothPositionAndNumBytesToReadConfigured_shouldHonor() throws Excepti
290290

291291
@Test
292292
public void numBytesToReadConfigured_shouldHonor() throws Exception {
293-
CompletableFuture<byte[]> future = new CompletableFuture<>();
293+
CompletableFuture<ByteBuffer> future = new CompletableFuture<>();
294294
AsyncRequestBody asyncRequestBody = FileAsyncRequestBody.builder()
295295
.path(smallFile)
296296
.numBytesToRead(5L)
@@ -302,7 +302,9 @@ public void numBytesToReadConfigured_shouldHonor() throws Exception {
302302
asyncRequestBody.subscribe(baosSubscriber);
303303
assertThat(asyncRequestBody.contentLength()).contains(5L);
304304

305-
byte[] bytes = future.get(1, TimeUnit.SECONDS);
305+
ByteBuffer buffer = future.get(1, TimeUnit.SECONDS);
306+
byte[] bytes = new byte[buffer.remaining()];
307+
buffer.get(bytes);
306308

307309
byte[] expected = new byte[5];
308310
try (FileInputStream inputStream = new FileInputStream(smallFile.toFile())) {

core/sdk-core/src/test/java/software/amazon/awssdk/core/internal/async/PublisherAsyncResponseTransformerTest.java

Lines changed: 7 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -83,9 +83,13 @@ void failedStream_completesExceptionally() {
8383
}
8484

8585
private static String drainPublisherToStr(SdkPublisher<ByteBuffer> publisher) throws Exception {
86-
CompletableFuture<byte[]> bodyFuture = new CompletableFuture<>();
86+
CompletableFuture<ByteBuffer> bodyFuture = new CompletableFuture<>();
8787
publisher.subscribe(new BaosSubscriber(bodyFuture));
88-
byte[] body = bodyFuture.get();
89-
return new String(body);
88+
89+
ByteBuffer buffer = bodyFuture.get();
90+
byte[] bytes = new byte[buffer.remaining()];
91+
buffer.get(bytes);
92+
93+
return new String(bytes);
9094
}
9195
}

core/sdk-core/src/test/java/software/amazon/awssdk/core/internal/async/SplittingPublisherTestUtils.java

Lines changed: 7 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -15,33 +15,26 @@
1515

1616
package software.amazon.awssdk.core.internal.async;
1717

18-
import static org.assertj.core.api.AssertionsForClassTypes.assertThat;
19-
20-
import java.io.File;
2118
import java.io.FileInputStream;
19+
import java.nio.ByteBuffer;
2220
import java.nio.file.Path;
2321
import java.util.ArrayList;
2422
import java.util.List;
2523
import java.util.concurrent.CompletableFuture;
26-
import java.util.concurrent.ExecutionException;
2724
import java.util.concurrent.TimeUnit;
28-
import java.util.concurrent.TimeoutException;
2925
import org.assertj.core.api.Assertions;
30-
import org.reactivestreams.Publisher;
3126
import software.amazon.awssdk.core.async.AsyncRequestBody;
3227
import software.amazon.awssdk.core.async.SdkPublisher;
33-
import software.amazon.awssdk.core.internal.async.ByteArrayAsyncResponseTransformer;
34-
import software.amazon.awssdk.core.internal.async.SplittingPublisherTest;
3528

3629
public final class SplittingPublisherTestUtils {
3730

3831
public static void verifyIndividualAsyncRequestBody(SdkPublisher<AsyncRequestBody> publisher,
3932
Path file,
4033
int chunkSize) throws Exception {
4134

42-
List<CompletableFuture<byte[]>> futures = new ArrayList<>();
35+
List<CompletableFuture<ByteBuffer>> futures = new ArrayList<>();
4336
publisher.subscribe(requestBody -> {
44-
CompletableFuture<byte[]> baosFuture = new CompletableFuture<>();
37+
CompletableFuture<ByteBuffer> baosFuture = new CompletableFuture<>();
4538
ByteArrayAsyncResponseTransformer.BaosSubscriber subscriber =
4639
new ByteArrayAsyncResponseTransformer.BaosSubscriber(baosFuture);
4740
requestBody.subscribe(subscriber);
@@ -62,7 +55,10 @@ public static void verifyIndividualAsyncRequestBody(SdkPublisher<AsyncRequestBod
6255
}
6356
fileInputStream.skip(i * chunkSize);
6457
fileInputStream.read(expected);
65-
byte[] actualBytes = futures.get(i).join();
58+
ByteBuffer actualByteBuffer = futures.get(i).join();
59+
byte[] actualBytes = new byte[actualByteBuffer.remaining()];
60+
actualByteBuffer.get(actualBytes);
61+
6662
Assertions.assertThat(actualBytes).isEqualTo(expected);
6763
}
6864
}

0 commit comments

Comments
 (0)