Skip to content

Commit 79c7759

Browse files
committed
Step5: Reproduce and fixes #51
1 parent 09de6c7 commit 79c7759

File tree

2 files changed

+60
-9
lines changed

2 files changed

+60
-9
lines changed

src/main/java/io/reactiverse/awssdk/VertxNioAsyncHttpClient.java

Lines changed: 23 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -5,16 +5,22 @@
55
import io.reactiverse.awssdk.reactivestreams.ReadStreamPublisher;
66
import io.vertx.core.Context;
77
import io.vertx.core.Vertx;
8-
import io.vertx.core.http.*;
8+
import io.vertx.core.http.HttpClient;
9+
import io.vertx.core.http.HttpClientOptions;
10+
import io.vertx.core.http.HttpClientRequest;
11+
import io.vertx.core.http.HttpHeaders;
12+
import io.vertx.core.http.HttpMethod;
13+
import io.vertx.core.http.RequestOptions;
914
import software.amazon.awssdk.http.SdkHttpFullResponse;
1015
import software.amazon.awssdk.http.SdkHttpRequest;
1116
import software.amazon.awssdk.http.SdkHttpResponse;
1217
import software.amazon.awssdk.http.async.AsyncExecuteRequest;
1318
import software.amazon.awssdk.http.async.SdkAsyncHttpClient;
1419
import software.amazon.awssdk.http.async.SdkAsyncHttpResponseHandler;
1520
import software.amazon.awssdk.http.async.SdkHttpContentPublisher;
21+
import software.amazon.awssdk.utils.StringUtils;
1622

17-
import java.util.Optional;
23+
import java.net.URI;
1824
import java.util.concurrent.CompletableFuture;
1925

2026
import static java.util.Objects.requireNonNull;
@@ -90,13 +96,21 @@ void executeOnContext(AsyncExecuteRequest asyncExecuteRequest, CompletableFuture
9096
}
9197
}
9298

93-
private static RequestOptions getRequestOptions(SdkHttpRequest request) {
94-
return new RequestOptions()
95-
.setHost(request.host())
96-
.setPort(request.port())
97-
.setURI(request.encodedPath())
98-
.setSsl("https".equals(request.protocol()));
99-
}
99+
private static RequestOptions getRequestOptions(SdkHttpRequest request) {
100+
return new RequestOptions()
101+
.setHost(request.host())
102+
.setPort(request.port())
103+
.setURI(createRelativeUri(request.getUri()))
104+
.setSsl("https".equals(request.protocol()));
105+
}
106+
107+
private static String createRelativeUri(URI uri) {
108+
return (StringUtils.isEmpty(uri.getPath()) ? "/" : uri.getPath()) +
109+
// AWS requires query parameters to be encoded as defined by RFC 3986.
110+
// see https://docs.aws.amazon.com/general/latest/gr/sigv4-create-canonical-request.html
111+
// uri.toASCIIString() returns the URI encoded in this manner
112+
(StringUtils.isEmpty(uri.getQuery()) ? "" : "?" + uri.toASCIIString().split("\\?")[1]);
113+
}
100114

101115
@Override
102116
public void close() {

src/test/java/io/reactiverse/awssdk/integration/s3/VertxS3ClientSpec.java

Lines changed: 37 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -31,9 +31,12 @@
3131
import software.amazon.awssdk.services.s3.model.GetObjectRequest;
3232
import software.amazon.awssdk.services.s3.model.GetObjectResponse;
3333
import software.amazon.awssdk.services.s3.model.ListObjectsRequest;
34+
import software.amazon.awssdk.services.s3.model.ListObjectsV2Request;
3435
import software.amazon.awssdk.services.s3.model.PutObjectRequest;
3536
import software.amazon.awssdk.services.s3.model.S3Object;
3637

38+
import java.util.ArrayList;
39+
import java.util.List;
3740
import java.util.concurrent.TimeUnit;
3841
import java.util.concurrent.atomic.AtomicBoolean;
3942

@@ -181,6 +184,28 @@ void downloadImageFromBucketWithoutSettingResponseHandler(Vertx vertx, VertxTest
181184
.subscribe(getRes -> ctx.completeNow(), ctx::failNow);
182185
}
183186

187+
@Test
188+
@Order(8)
189+
void listObjectsV2(Vertx vertx, VertxTestContext ctx) throws Exception {
190+
final Context originalContext = vertx.getOrCreateContext();
191+
final S3AsyncClient s3 = s3(originalContext);
192+
single(s3.putObject(b -> putObjectReq(b, "obj1"), AsyncRequestBody.fromString("hello")))
193+
.flatMap(putObjectResponse1 -> single(s3.putObject(b -> putObjectReq(b, "obj2"), AsyncRequestBody.fromString("hi"))))
194+
.flatMap(putObjectResponse2 -> single(s3.listObjectsV2(VertxS3ClientSpec::listObjectsV2Req))
195+
.flatMap(listObjectsV2Response1 -> single(s3.listObjectsV2(b -> listObjectsV2ReqWithContToken(b, listObjectsV2Response1.nextContinuationToken())))
196+
.map(listObjectsV2Response2 -> {
197+
List<S3Object> allObjects = new ArrayList<>(listObjectsV2Response1.contents());
198+
allObjects.addAll(listObjectsV2Response2.contents());
199+
return allObjects;
200+
})
201+
))
202+
.subscribe(allObjects -> {
203+
ctx.verify(() -> {
204+
assertEquals(3, allObjects.size());
205+
ctx.completeNow();
206+
});
207+
}, ctx::failNow);
208+
}
184209

185210
/* Utility methods */
186211
private static Single<AsyncFile> readFileFromDisk(Vertx vertx) {
@@ -207,4 +232,16 @@ private static GetObjectRequest.Builder downloadImgReq(GetObjectRequest.Builder
207232
return gor.key(IMG_S3_NAME).bucket(BUCKET_NAME);
208233
}
209234

235+
private static ListObjectsV2Request.Builder listObjectsV2Req(ListObjectsV2Request.Builder lovr) {
236+
return lovr.maxKeys(2).bucket(BUCKET_NAME);
237+
}
238+
239+
private static ListObjectsV2Request.Builder listObjectsV2ReqWithContToken(ListObjectsV2Request.Builder lovr, String token) {
240+
return lovr.maxKeys(2).bucket(BUCKET_NAME).continuationToken(token);
241+
}
242+
243+
private static PutObjectRequest.Builder putObjectReq(PutObjectRequest.Builder por, String key) {
244+
return por.bucket(BUCKET_NAME).key(key);
245+
}
246+
210247
}

0 commit comments

Comments
 (0)