Skip to content

Commit 3811ea8

Browse files
committed
* merge upstream's master
* upgrade to 4.0.0.CR1
1 parent 022e09c commit 3811ea8

File tree

4 files changed

+125
-35
lines changed

4 files changed

+125
-35
lines changed

build.gradle.kts

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
1-
val vertxVersion = "3.9.4"
1+
val vertxVersion = "4.0.0.CR1"
22
val awsSdkVersion = "2.15.23"
33
val junit5Version = "5.4.0"
44
val logbackVersion = "1.2.3"

src/main/java/io/reactiverse/awssdk/VertxNioAsyncHttpClient.java

Lines changed: 29 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -21,7 +21,10 @@
2121
import software.amazon.awssdk.utils.StringUtils;
2222

2323
import java.net.URI;
24+
import java.util.Arrays;
25+
import java.util.List;
2426
import java.util.concurrent.CompletableFuture;
27+
import java.util.stream.Collectors;
2528

2629
import static java.util.Objects.requireNonNull;
2730

@@ -66,42 +69,49 @@ public CompletableFuture<Void> execute(AsyncExecuteRequest asyncExecuteRequest)
6669
void executeOnContext(AsyncExecuteRequest asyncExecuteRequest, CompletableFuture<Void> fut) {
6770
final SdkHttpRequest request = asyncExecuteRequest.request();
6871
final SdkAsyncHttpResponseHandler responseHandler = asyncExecuteRequest.responseHandler();
69-
70-
final HttpMethod method = MethodConverter.awsToVertx(request.method());
7172
final RequestOptions options = getRequestOptions(request);
72-
final HttpClientRequest vRequest = client.request(method, options).setFollowRedirects(true);
73-
request.headers().forEach((headerName, headerValues) ->
74-
vRequest.putHeader(headerName, String.join(",", headerValues))
75-
);
76-
vRequest.putHeader(HttpHeaders.CONNECTION, HttpHeaders.KEEP_ALIVE);
77-
vRequest.exceptionHandler(error -> {
73+
client.request(options, ar -> {
74+
if (ar.failed()) {
75+
responseHandler.onError(ar.cause());
76+
return;
77+
}
78+
HttpClientRequest vRequest = ar.result();
79+
vRequest.onFailure(error -> {
7880
responseHandler.onError(error);
7981
fut.completeExceptionally(error);
80-
});
81-
vRequest.handler(vResponse -> {
82+
});
83+
vRequest.onSuccess(vResponse -> {
8284
final SdkHttpFullResponse.Builder builder = SdkHttpResponse.builder()
83-
.statusCode(vResponse.statusCode())
84-
.statusText(vResponse.statusMessage());
85+
.statusCode(vResponse.statusCode())
86+
.statusText(vResponse.statusMessage());
8587
vResponse.headers().forEach(e ->
86-
builder.appendHeader(e.getKey(), e.getValue())
88+
builder.appendHeader(e.getKey(), e.getValue())
8789
);
8890
responseHandler.onHeaders(builder.build());
8991
responseHandler.onStream(new ReadStreamPublisher<>(vResponse, fut));
90-
});
91-
final SdkHttpContentPublisher publisher = asyncExecuteRequest.requestContentPublisher();
92-
if (publisher != null) {
92+
});
93+
final SdkHttpContentPublisher publisher = asyncExecuteRequest.requestContentPublisher();
94+
if (publisher != null) {
9395
publisher.subscribe(new HttpClientRequestSubscriber(vRequest));
94-
} else {
96+
} else {
9597
vRequest.end();
96-
}
98+
}
99+
});
97100
}
98101

99102
private static RequestOptions getRequestOptions(SdkHttpRequest request) {
100-
return new RequestOptions()
103+
RequestOptions options = new RequestOptions()
104+
.setMethod(MethodConverter.awsToVertx(request.method()))
101105
.setHost(request.host())
102106
.setPort(request.port())
103107
.setURI(createRelativeUri(request.getUri()))
108+
.setFollowRedirects(true)
104109
.setSsl("https".equals(request.protocol()));
110+
request.headers().forEach((name, values) -> {
111+
options.addHeader(name, values.stream().map(s -> (CharSequence)s).collect(Collectors.toList()));
112+
});
113+
options.addHeader(HttpHeaders.CONNECTION, HttpHeaders.KEEP_ALIVE);
114+
return options;
105115
}
106116

107117
private static String createRelativeUri(URI uri) {

src/test/java/io/reactiverse/awssdk/RetryContextTest.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -59,7 +59,7 @@ void setupFailServer(Vertx vertx, VertxTestContext ctx) {
5959
}
6060

6161
})
62-
.listen(ctx.completing());
62+
.listen(ctx.succeedingThenComplete());
6363
}
6464

6565
@Test

src/test/java/io/reactiverse/awssdk/integration/s3/VertxS3ClientSpec.java

Lines changed: 94 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -6,11 +6,17 @@
66
import io.reactiverse.awssdk.integration.LocalStackBaseSpec;
77
import io.reactiverse.awssdk.reactivestreams.ReadStreamPublisher;
88
import io.reactivex.Single;
9+
import io.vertx.codegen.annotations.Nullable;
10+
import io.vertx.core.AsyncResult;
911
import io.vertx.core.Context;
12+
import io.vertx.core.Future;
13+
import io.vertx.core.Handler;
1014
import io.vertx.core.Vertx;
1115
import io.vertx.core.buffer.Buffer;
1216
import io.vertx.core.eventbus.MessageProducer;
1317
import io.vertx.core.file.OpenOptions;
18+
import io.vertx.core.streams.Pump;
19+
import io.vertx.core.streams.WriteStream;
1420
import io.vertx.junit5.Timeout;
1521
import io.vertx.junit5.VertxExtension;
1622
import io.vertx.junit5.VertxTestContext;
@@ -155,18 +161,50 @@ void downloadImageFromBucket(Vertx vertx, VertxTestContext ctx) throws Exception
155161
void downloadImageFromBucketToPump(Vertx vertx, VertxTestContext ctx) throws Exception {
156162
final Context originalContext = vertx.getOrCreateContext();
157163
final S3AsyncClient s3 = s3(originalContext);
158-
final String ebAddress = "s3-forwarded";
159-
final MessageProducer<Buffer> producer = vertx.eventBus().sender(ebAddress);
160-
final Buffer received = Buffer.buffer();
164+
Buffer received = Buffer.buffer();
161165
AtomicBoolean handlerCalled = new AtomicBoolean(false);
162-
VertxAsyncResponseTransformer<GetObjectResponse> transformer = new VertxAsyncResponseTransformer<>(producer);
163-
transformer.setResponseHandler(resp -> {
164-
handlerCalled.set(true);
165-
});
166-
vertx.eventBus().<Buffer>consumer(ebAddress, msg -> {
166+
VertxAsyncResponseTransformer<GetObjectResponse> transformer = new VertxAsyncResponseTransformer<>(new WriteStream<Buffer>() {
167+
@Override
168+
public WriteStream<Buffer> exceptionHandler(Handler<Throwable> handler) {
169+
return null;
170+
}
171+
172+
@Override
173+
public Future<Void> write(Buffer data) {
174+
received.appendBuffer(data);
175+
return Future.succeededFuture();
176+
}
177+
178+
@Override
179+
public void write(Buffer data, Handler<AsyncResult<Void>> handler) {
180+
received.appendBuffer(data);
181+
handler.handle(null);
182+
}
183+
184+
@Override
185+
public void end(Handler<AsyncResult<Void>> handler) {
167186
assertTrue(handlerCalled.get(), "Response handler should have been called before first bytes are received");
168-
received.appendBuffer(msg.body());
169187
if (received.length() == fileSize) ctx.completeNow();
188+
handler.handle(null);
189+
}
190+
191+
@Override
192+
public WriteStream<Buffer> setWriteQueueMaxSize(int maxSize) {
193+
return null;
194+
}
195+
196+
@Override
197+
public boolean writeQueueFull() {
198+
return false;
199+
}
200+
201+
@Override
202+
public WriteStream<Buffer> drainHandler(@Nullable Handler<Void> handler) {
203+
return null;
204+
}
205+
});
206+
transformer.setResponseHandler(resp -> {
207+
handlerCalled.set(true);
170208
});
171209
single(s3.getObject(VertxS3ClientSpec::downloadImgReq, transformer))
172210
.subscribe(getRes -> {}, ctx::failNow);
@@ -175,11 +213,53 @@ void downloadImageFromBucketToPump(Vertx vertx, VertxTestContext ctx) throws Exc
175213
@Test
176214
@Order(7)
177215
void downloadImageFromBucketWithoutSettingResponseHandler(Vertx vertx, VertxTestContext ctx) throws Exception {
178-
final Context originalContext = vertx.getOrCreateContext();
179-
final S3AsyncClient s3 = s3(originalContext);
180-
final String ebAddress = "s3-forwarded";
181-
final MessageProducer<Buffer> producer = vertx.eventBus().sender(ebAddress);
182-
VertxAsyncResponseTransformer<GetObjectResponse> transformer = new VertxAsyncResponseTransformer<>(producer);
216+
final Context originalContext = vertx.getOrCreateContext();
217+
final S3AsyncClient s3 = s3(originalContext);
218+
final Buffer received = Buffer.buffer();
219+
AtomicBoolean handlerCalled = new AtomicBoolean(false);
220+
VertxAsyncResponseTransformer<GetObjectResponse> transformer = new VertxAsyncResponseTransformer<>(new WriteStream<Buffer>() {
221+
@Override
222+
public WriteStream<Buffer> exceptionHandler(Handler<Throwable> handler) {
223+
return null;
224+
}
225+
226+
@Override
227+
public Future<Void> write(Buffer data) {
228+
received.appendBuffer(data);
229+
return Future.succeededFuture();
230+
}
231+
232+
@Override
233+
public void write(Buffer data, Handler<AsyncResult<Void>> handler) {
234+
received.appendBuffer(data);
235+
handler.handle(null);
236+
}
237+
238+
@Override
239+
public void end(Handler<AsyncResult<Void>> handler) {
240+
assertTrue(handlerCalled.get(), "Response handler should have been called before first bytes are received");
241+
if (received.length() == fileSize) ctx.completeNow();
242+
handler.handle(null);
243+
}
244+
245+
@Override
246+
public WriteStream<Buffer> setWriteQueueMaxSize(int maxSize) {
247+
return null;
248+
}
249+
250+
@Override
251+
public boolean writeQueueFull() {
252+
return false;
253+
}
254+
255+
@Override
256+
public WriteStream<Buffer> drainHandler(@Nullable Handler<Void> handler) {
257+
return null;
258+
}
259+
});
260+
transformer.setResponseHandler(resp -> {
261+
handlerCalled.set(true);
262+
});
183263
single(s3.getObject(VertxS3ClientSpec::downloadImgReq, transformer))
184264
.subscribe(getRes -> ctx.completeNow(), ctx::failNow);
185265
}

0 commit comments

Comments
 (0)