Skip to content

Commit 74f9474

Browse files
chaykinKirill Chaykindavidh44
authored
Allows users to configure contentType for BlockingInputStreamAsyncRequestBody (#5234)
* Allows users to configure contentType for BlockingInputStreamAsyncRequestBody (#5143) * Run new-change (#5143) * Add tests for BlockingInputStreamAsyncRequestBody (#5143) * Update tests --------- Co-authored-by: Kirill Chaykin <[email protected]> Co-authored-by: hdavidh <[email protected]> Co-authored-by: David Ho <[email protected]>
1 parent a683856 commit 74f9474

File tree

5 files changed

+86
-5
lines changed

5 files changed

+86
-5
lines changed
Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,6 @@
1+
{
2+
"type": "feature",
3+
"category": "Amazon S3",
4+
"contributor": "chaykin",
5+
"description": "Allow user to configure content type for BlockingInputStreamAsyncRequestBody"
6+
}

core/sdk-core/src/main/java/software/amazon/awssdk/core/async/AsyncRequestBody.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -384,8 +384,8 @@ static AsyncRequestBody fromInputStream(Consumer<AsyncRequestBodyFromInputStream
384384
/**
385385
* Creates a {@link BlockingInputStreamAsyncRequestBody} to use for writing an input stream to the downstream service.
386386
*
387-
* <p>By default, it will time out if streaming hasn't started within 10 seconds, and you can configure the timeout
388-
* via {@link BlockingInputStreamAsyncRequestBody#builder()}
387+
* <p>By default, it will time out if streaming hasn't started within 10 seconds, and use application/octet-stream as
388+
* content type. You can configure it via {@link BlockingInputStreamAsyncRequestBody#builder()}
389389
* <p><b>Example Usage</b>
390390
*
391391
* <p>

core/sdk-core/src/main/java/software/amazon/awssdk/core/async/BlockingInputStreamAsyncRequestBody.java

Lines changed: 21 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,7 @@
2626
import software.amazon.awssdk.annotations.SdkPublicApi;
2727
import software.amazon.awssdk.core.exception.NonRetryableException;
2828
import software.amazon.awssdk.core.internal.io.SdkLengthAwareInputStream;
29+
import software.amazon.awssdk.core.internal.util.Mimetype;
2930
import software.amazon.awssdk.core.internal.util.NoopSubscription;
3031
import software.amazon.awssdk.utils.Validate;
3132
import software.amazon.awssdk.utils.async.InputStreamConsumingPublisher;
@@ -39,14 +40,17 @@
3940
@SdkPublicApi
4041
public final class BlockingInputStreamAsyncRequestBody implements AsyncRequestBody {
4142
private static final Duration DEFAULT_SUBSCRIBE_TIMEOUT = Duration.ofSeconds(10);
43+
private static final String DEFAULT_CONTENT_TYPE = Mimetype.MIMETYPE_OCTET_STREAM;
4244
private final InputStreamConsumingPublisher delegate = new InputStreamConsumingPublisher();
4345
private final CountDownLatch subscribedLatch = new CountDownLatch(1);
4446
private final AtomicBoolean subscribeCalled = new AtomicBoolean(false);
4547
private final Long contentLength;
48+
private final String contentType;
4649
private final Duration subscribeTimeout;
4750

4851
BlockingInputStreamAsyncRequestBody(Builder builder) {
4952
this.contentLength = builder.contentLength;
53+
this.contentType = builder.contentType != null ? builder.contentType : DEFAULT_CONTENT_TYPE;
5054
this.subscribeTimeout = Validate.isPositiveOrNull(builder.subscribeTimeout, "subscribeTimeout") != null ?
5155
builder.subscribeTimeout :
5256
DEFAULT_SUBSCRIBE_TIMEOUT;
@@ -64,6 +68,11 @@ public Optional<Long> contentLength() {
6468
return Optional.ofNullable(contentLength);
6569
}
6670

71+
@Override
72+
public String contentType() {
73+
return contentType;
74+
}
75+
6776
/**
6877
* Block the calling thread and write the provided input stream to the downstream service.
6978
*
@@ -123,6 +132,7 @@ private void waitForSubscriptionIfNeeded() throws InterruptedException {
123132
public static final class Builder {
124133
private Duration subscribeTimeout;
125134
private Long contentLength;
135+
private String contentType;
126136

127137
private Builder() {
128138
}
@@ -152,6 +162,17 @@ public Builder contentLength(Long contentLength) {
152162
return this;
153163
}
154164

165+
/**
166+
* The content type of the output stream.
167+
*
168+
* @param contentType the content type
169+
* @return Returns a reference to this object so that method calls can be chained together.
170+
*/
171+
public Builder contentType(String contentType) {
172+
this.contentType = contentType;
173+
return this;
174+
}
175+
155176
public BlockingInputStreamAsyncRequestBody build() {
156177
return new BlockingInputStreamAsyncRequestBody(this);
157178
}

core/sdk-core/src/test/java/software/amazon/awssdk/core/async/BlockingInputStreamAsyncRequestBodyTest.java

Lines changed: 23 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,7 @@
2323
import java.io.ByteArrayInputStream;
2424
import java.nio.ByteBuffer;
2525
import java.time.Duration;
26+
import java.util.Optional;
2627
import java.util.concurrent.Executors;
2728
import java.util.concurrent.ScheduledExecutorService;
2829
import org.junit.jupiter.api.Test;
@@ -76,4 +77,26 @@ public void doBlockingWrite_writesToSubscriber() {
7677
}
7778
}
7879

80+
@Test
81+
public void builder_withContentType_buildsCorrectly() {
82+
BlockingInputStreamAsyncRequestBody.Builder requestBodyBuilder = BlockingInputStreamAsyncRequestBody.builder();
83+
BlockingInputStreamAsyncRequestBody requestBody = requestBodyBuilder
84+
.contentLength(20L)
85+
.contentType("text/plain")
86+
.build();
87+
88+
assertThat(requestBody.contentLength()).isEqualTo(Optional.of(20L));
89+
assertThat(requestBody.contentType()).isEqualTo("text/plain");
90+
}
91+
92+
@Test
93+
public void builder_withoutContentType_defaultsToOctetStream() {
94+
BlockingInputStreamAsyncRequestBody.Builder requestBodyBuilder = BlockingInputStreamAsyncRequestBody.builder();
95+
BlockingInputStreamAsyncRequestBody requestBody = requestBodyBuilder
96+
.contentLength(20L)
97+
.build();
98+
99+
assertThat(requestBody.contentLength()).isEqualTo(Optional.of(20L));
100+
assertThat(requestBody.contentType()).isEqualTo("application/octet-stream");
101+
}
79102
}

services/s3/src/it/java/software/amazon/awssdk/services/s3/PutObjectIntegrationTest.java

Lines changed: 34 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -28,19 +28,27 @@
2828
import java.nio.charset.StandardCharsets;
2929
import java.util.ArrayList;
3030
import java.util.List;
31+
import java.util.concurrent.CompletableFuture;
3132
import org.junit.AfterClass;
3233
import org.junit.BeforeClass;
3334
import org.junit.Test;
35+
import software.amazon.awssdk.core.async.BlockingInputStreamAsyncRequestBody;
3436
import software.amazon.awssdk.core.sync.RequestBody;
3537
import software.amazon.awssdk.http.ContentStreamProvider;
38+
import software.amazon.awssdk.services.s3.model.HeadObjectResponse;
39+
import software.amazon.awssdk.services.s3.model.PutObjectRequest;
40+
import software.amazon.awssdk.services.s3.model.PutObjectResponse;
3641

3742
/**
3843
* Integration tests for {@code PutObject}.
3944
*/
4045
public class PutObjectIntegrationTest extends S3IntegrationTestBase {
4146
private static final String BUCKET = temporaryBucketName(PutObjectIntegrationTest.class);
42-
private static final String KEY = "key";
47+
private static final String ASYNC_KEY = "async-key";
48+
private static final String SYNC_KEY = "sync-key";
49+
4350
private static final byte[] CONTENT = "Hello".getBytes(StandardCharsets.UTF_8);
51+
private static final String TEXT_CONTENT_TYPE = "text/plain";
4452

4553
@BeforeClass
4654
public static void setUp() throws Exception {
@@ -56,13 +64,36 @@ public static void tearDown() {
5664
@Test
5765
public void objectInputStreamsAreClosed() {
5866
TestContentProvider provider = new TestContentProvider(CONTENT);
59-
s3.putObject(r -> r.bucket(BUCKET).key(KEY), RequestBody.fromContentProvider(provider, CONTENT.length, "binary/octet-stream"));
67+
s3.putObject(r -> r.bucket(BUCKET).key(SYNC_KEY),
68+
RequestBody.fromContentProvider(provider, CONTENT.length, "binary/octet-stream"));
6069

6170
for (CloseTrackingInputStream is : provider.getCreatedStreams()) {
6271
assertThat(is.isClosed()).isTrue();
6372
}
6473
}
6574

75+
@Test
76+
public void blockingInputStreamAsyncRequestBody_withContentType_isHonored() {
77+
BlockingInputStreamAsyncRequestBody requestBody =
78+
BlockingInputStreamAsyncRequestBody.builder()
79+
.contentLength((long) CONTENT.length)
80+
.contentType(TEXT_CONTENT_TYPE)
81+
.build();
82+
83+
PutObjectRequest.Builder request = PutObjectRequest.builder()
84+
.bucket(BUCKET)
85+
.key(ASYNC_KEY);
86+
87+
CompletableFuture<PutObjectResponse> responseFuture = s3Async.putObject(request.build(), requestBody);
88+
requestBody.writeInputStream(new ByteArrayInputStream(CONTENT));
89+
responseFuture.join();
90+
91+
HeadObjectResponse response = s3Async.headObject(r -> r.bucket(BUCKET).key(ASYNC_KEY)).join();
92+
93+
assertThat(response.contentLength()).isEqualTo(CONTENT.length);
94+
assertThat(response.contentType()).isEqualTo(TEXT_CONTENT_TYPE);
95+
}
96+
6697
@Test
6798
public void s3Client_usingHttpAndDisableChunkedEncoding() {
6899
try (S3Client s3Client = s3ClientBuilder()
@@ -71,7 +102,7 @@ public void s3Client_usingHttpAndDisableChunkedEncoding() {
71102
.chunkedEncodingEnabled(false)
72103
.build())
73104
.build()) {
74-
assertThat(s3Client.putObject(b -> b.bucket(BUCKET).key(KEY), RequestBody.fromBytes(
105+
assertThat(s3Client.putObject(b -> b.bucket(BUCKET).key(SYNC_KEY), RequestBody.fromBytes(
75106
"helloworld".getBytes()))).isNotNull();
76107
}
77108
}

0 commit comments

Comments
 (0)