Skip to content

Commit 06bc8dc

Browse files
committed
feat(ws-next): support connect & client options customization
1 parent add2d9a commit 06bc8dc

File tree

8 files changed

+143
-14
lines changed

8 files changed

+143
-14
lines changed

docs/src/main/asciidoc/websockets-next-reference.adoc

Lines changed: 11 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1291,21 +1291,25 @@ public class MyBean {
12911291
WebSocketClientConnection connection = connector
12921292
.baseUri(uri) <2>
12931293
.pathParam("name", "Roxanne") <3>
1294+
.customizeOptions((connectOptions, clientOptions) -> { <4>
1295+
// ...
1296+
})
12941297
.connectAndAwait();
1295-
connection.sendTextAndAwait("Hi!"); <4>
1298+
connection.sendTextAndAwait("Hi!"); <5>
12961299
}
12971300
}
12981301
----
12991302
<1> Inject the connector for `ClientEndpoint`.
13001303
<2> If the base `URI` is not supplied we attempt to obtain the value from the config. The key consists of the client id and the `.base-uri` suffix.
13011304
<3> Set the path param value. Throws `IllegalArgumentException` if the client endpoint path does not contain a parameter with the given name.
1302-
<4> Use the connection to send messages, if needed.
1305+
<4> Optionally, customize client and connection options not configurable with the connector API or Quarkus configuration.
1306+
<5> Use the connection to send messages, if needed.
13031307

13041308
NOTE: If an application attempts to inject a connector for a missing endpoint, an error is thrown.
13051309

13061310
Connectors are not thread-safe and should not be used concurrently.
13071311
Connectors should also not be reused.
1308-
If you need to create multiple connections in a row you'll need to obtain a new connetor instance programmatically using `Instance#get()`:
1312+
If you need to create multiple connections in a row you'll need to obtain a new connector instance programmatically using `Instance#get()`:
13091313

13101314
[source, java]
13111315
----
@@ -1350,6 +1354,9 @@ public class MyBean {
13501354
.onTextMessage((c, m) -> { <5>
13511355
// ...
13521356
})
1357+
.customizeOptions((connectOptions, clientOptions) -> { <6>
1358+
// ...
1359+
})
13531360
.connectAndAwait();
13541361
}
13551362
}
@@ -1359,6 +1366,7 @@ public class MyBean {
13591366
<3> The additional path that should be appended to the base URI.
13601367
<4> Set the execution model for callback handlers. By default, the callback may block the current thread. However in this case, the callback is executed on the event loop and may not block the current thread.
13611368
<5> The lambda will be called for every text message sent from the server.
1369+
<6> Optionally, customize client and connection options not configurable with the connector API or Quarkus configuration.
13621370

13631371
The basic connector is closer to a low-level API and is reserved for advanced users.
13641372
However, unlike others low-level WebSocket clients, it is still a CDI bean and can be injected in other beans.

extensions/websockets-next/deployment/src/test/java/io/quarkus/websockets/next/test/client/BasicConnectorContextTest.java

Lines changed: 11 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,7 @@
1515
import io.quarkus.test.QuarkusUnitTest;
1616
import io.quarkus.test.common.http.TestHTTPResource;
1717
import io.quarkus.websockets.next.BasicWebSocketConnector;
18+
import io.quarkus.websockets.next.HandshakeRequest;
1819
import io.quarkus.websockets.next.OnClose;
1920
import io.quarkus.websockets.next.OnOpen;
2021
import io.quarkus.websockets.next.WebSocket;
@@ -35,6 +36,8 @@ public class BasicConnectorContextTest {
3536

3637
static final Set<String> THREADS = ConcurrentHashMap.newKeySet();
3738

39+
static final Set<String> MESSAGES = ConcurrentHashMap.newKeySet();
40+
3841
static final CountDownLatch CLOSED_LATCH = new CountDownLatch(2);
3942

4043
@Test
@@ -46,10 +49,14 @@ void testClient() throws InterruptedException {
4649
String thread = Thread.currentThread().getName();
4750
THREADS.add(thread);
4851
MESSAGE_LATCH.countDown();
52+
MESSAGES.add(m);
4953
})
5054
.onClose((c, cr) -> {
5155
CLOSED_LATCH.countDown();
5256
})
57+
.customizeOptions(((connectOptions, clientOptions) -> {
58+
connectOptions.addHeader("Foo", "Eric");
59+
}))
5360
.baseUri(uri);
5461
WebSocketClientConnection conn1 = connector.connectAndAwait();
5562
WebSocketClientConnection conn2 = connector.connectAndAwait();
@@ -67,6 +74,8 @@ void testClient() throws InterruptedException {
6774
conn2.closeAndAwait();
6875
assertTrue(ServerEndpoint.CLOSED_LATCH.await(5, TimeUnit.SECONDS));
6976
assertTrue(CLOSED_LATCH.await(5, TimeUnit.SECONDS));
77+
assertTrue(MESSAGES.stream().allMatch("Hello Eric!"::equals),
78+
() -> "Expected messages equal to 'Hello Eric!', but got " + MESSAGES);
7079
}
7180

7281
@WebSocket(path = "/end")
@@ -75,8 +84,8 @@ public static class ServerEndpoint {
7584
static final CountDownLatch CLOSED_LATCH = new CountDownLatch(1);
7685

7786
@OnOpen
78-
String open() {
79-
return "Hello!";
87+
String open(HandshakeRequest handshakeRequest) {
88+
return "Hello " + handshakeRequest.header("Foo") + "!";
8089
}
8190

8291
@OnClose

extensions/websockets-next/deployment/src/test/java/io/quarkus/websockets/next/test/client/BasicConnectorTest.java

Lines changed: 31 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -1,18 +1,22 @@
11
package io.quarkus.websockets.next.test.client;
22

33
import static org.assertj.core.api.Assertions.assertThat;
4+
import static org.assertj.core.api.Assertions.assertThatThrownBy;
45
import static org.junit.jupiter.api.Assertions.assertEquals;
56
import static org.junit.jupiter.api.Assertions.assertNotNull;
67
import static org.junit.jupiter.api.Assertions.assertNull;
78
import static org.junit.jupiter.api.Assertions.assertThrows;
89
import static org.junit.jupiter.api.Assertions.assertTrue;
910

1011
import java.net.URI;
12+
import java.net.UnknownHostException;
13+
import java.time.Duration;
1114
import java.util.List;
1215
import java.util.concurrent.CopyOnWriteArrayList;
1316
import java.util.concurrent.CountDownLatch;
1417
import java.util.concurrent.TimeUnit;
1518
import java.util.concurrent.atomic.AtomicReference;
19+
import java.util.function.BiConsumer;
1620

1721
import jakarta.inject.Inject;
1822

@@ -32,6 +36,9 @@
3236
import io.quarkus.websockets.next.WebSocket;
3337
import io.quarkus.websockets.next.WebSocketClientConnection;
3438
import io.vertx.core.Context;
39+
import io.vertx.core.http.WebSocketClientOptions;
40+
import io.vertx.core.http.WebSocketConnectOptions;
41+
import io.vertx.core.net.ProxyOptions;
3542

3643
public class BasicConnectorTest {
3744

@@ -117,12 +124,35 @@ void testClient() throws InterruptedException {
117124
assertTrue(ServerEndpoint.CLOSED_LATCH.await(5, TimeUnit.SECONDS));
118125

119126
CountDownLatch conn2Latch = new CountDownLatch(1);
120-
WebSocketClientConnection connection2 = BasicWebSocketConnector
127+
WebSocketClientConnection connection2 = createConnection2(conn2Latch, (o1, o2) -> {
128+
});
129+
assertNotNull(connection2);
130+
assertTrue(connection2.handshakeRequest().localAddress().matches("127\\.0\\.0\\.1:\\d+"));
131+
assertEquals("127.0.0.1:8081", connection2.handshakeRequest().remoteAddress());
132+
assertTrue(conn2Latch.await(5, TimeUnit.SECONDS));
133+
}
134+
135+
@Test
136+
void testClientOptionsCustomization() {
137+
// same connection with empty customizer succeeds in the 'testClient' test method,
138+
// together with the thrown exception, this verifies that client options customization works
139+
assertThatThrownBy(() -> createConnection2(new CountDownLatch(1), (ignored, clientOptions) -> {
140+
clientOptions.setProxyOptions(new ProxyOptions()
141+
.setHost("robert")
142+
.setPort(999)
143+
.setConnectTimeout(Duration.ofMillis(500)));
144+
})).rootCause().isInstanceOf(UnknownHostException.class).hasMessageContaining("robert");
145+
}
146+
147+
private WebSocketClientConnection createConnection2(CountDownLatch conn2Latch,
148+
BiConsumer<WebSocketConnectOptions, WebSocketClientOptions> customizer) {
149+
return BasicWebSocketConnector
121150
.create()
122151
.baseUri(uri)
123152
.path("/Cool")
124153
.executionModel(ExecutionModel.NON_BLOCKING)
125154
.addHeader("X-Test", "foo")
155+
.customizeOptions(customizer)
126156
.onTextMessage((c, m) -> {
127157
assertTrue(Context.isOnEventLoopThread());
128158
// Path params not set
@@ -132,10 +162,6 @@ void testClient() throws InterruptedException {
132162
conn2Latch.countDown();
133163
})
134164
.connectAndAwait();
135-
assertNotNull(connection2);
136-
assertTrue(connection2.handshakeRequest().localAddress().matches("127\\.0\\.0\\.1:\\d+"));
137-
assertEquals("127.0.0.1:8081", connection2.handshakeRequest().remoteAddress());
138-
assertTrue(conn2Latch.await(5, TimeUnit.SECONDS));
139165
}
140166

141167
@WebSocket(path = "/end/{name}")

extensions/websockets-next/deployment/src/test/java/io/quarkus/websockets/next/test/client/ClientEndpointTest.java

Lines changed: 17 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,10 +1,13 @@
11
package io.quarkus.websockets.next.test.client;
22

3+
import static org.assertj.core.api.Assertions.assertThatThrownBy;
34
import static org.junit.jupiter.api.Assertions.assertEquals;
45
import static org.junit.jupiter.api.Assertions.assertInstanceOf;
56
import static org.junit.jupiter.api.Assertions.assertTrue;
67

78
import java.net.URI;
9+
import java.net.UnknownHostException;
10+
import java.time.Duration;
811
import java.util.List;
912
import java.util.concurrent.CopyOnWriteArrayList;
1013
import java.util.concurrent.CountDownLatch;
@@ -30,6 +33,7 @@
3033
import io.quarkus.websockets.next.WebSocketConnection;
3134
import io.quarkus.websockets.next.WebSocketConnector;
3235
import io.vertx.core.buffer.Buffer;
36+
import io.vertx.core.net.ProxyOptions;
3337

3438
public class ClientEndpointTest {
3539

@@ -68,6 +72,19 @@ void testClient() throws InterruptedException {
6872
connection.closeAndAwait();
6973
assertTrue(ClientEndpoint.CLOSED_LATCH.await(5, TimeUnit.SECONDS));
7074
assertTrue(ServerEndpoint.CLOSED_LATCH.await(5, TimeUnit.SECONDS));
75+
76+
// use the same connector instance, but this time configure unknown host and expect failure
77+
assertThatThrownBy(() -> connector
78+
.baseUri(uri)
79+
.pathParam("name", "Lu=")
80+
.customizeOptions((ignored, clientOptions) -> {
81+
clientOptions.setProxyOptions(new ProxyOptions()
82+
.setHost("robert")
83+
.setPort(999)
84+
.setConnectTimeout(Duration.ofMillis(500)));
85+
})
86+
.connectAndAwait())
87+
.rootCause().isInstanceOf(UnknownHostException.class).hasMessageContaining("robert");
7188
}
7289

7390
@WebSocket(path = "/endpoint/{name}")

extensions/websockets-next/deployment/src/test/java/io/quarkus/websockets/next/test/client/programmatic/ClientEndpointProgrammaticTest.java

Lines changed: 14 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -55,6 +55,13 @@ void testClient() throws InterruptedException {
5555
.userData(TypedKey.forInt("int"), Integer.MAX_VALUE)
5656
.userData(TypedKey.forLong("long"), Long.MAX_VALUE)
5757
.userData(TypedKey.forString("string"), "Lu")
58+
.customizeOptions((connectOptions, clientOptions) -> {
59+
connectOptions
60+
// test that port configuration is overridden
61+
.setPort(123456)
62+
// test that "Bar" header is sent with the opening handshake request
63+
.addHeader("Bar", "Adam");
64+
})
5865
.connectAndAwait();
5966
assertTrue(connection1.userData().get(TypedKey.forBoolean("boolean")));
6067
assertEquals(Integer.MAX_VALUE, connection1.userData().get(TypedKey.forInt("int")));
@@ -89,7 +96,7 @@ void testClient() throws InterruptedException {
8996
assertEquals("Ma", ClientEndpoint.CONNECTION_USER_DATA.get(connection2.id()).get(TypedKey.forString("string")));
9097

9198
assertTrue(ClientEndpoint.MESSAGE_LATCH.await(5, TimeUnit.SECONDS));
92-
assertTrue(ClientEndpoint.MESSAGES.contains("Lu:Hello Lu!"));
99+
assertTrue(ClientEndpoint.MESSAGES.contains("Lu:Hello Lu Adam!"));
93100
assertTrue(ClientEndpoint.MESSAGES.contains("Lu:Hi!"));
94101
assertTrue(ClientEndpoint.MESSAGES.contains("Ma:Hello Ma!"));
95102
assertTrue(ClientEndpoint.MESSAGES.contains("Ma:Hi!"), ClientEndpoint.MESSAGES.toString());
@@ -107,7 +114,12 @@ public static class ServerEndpoint {
107114

108115
@OnOpen
109116
String open(HandshakeRequest handshakeRequest) {
110-
return "Hello " + handshakeRequest.header("Foo") + "!";
117+
StringBuilder result = new StringBuilder("Hello " + handshakeRequest.header("Foo"));
118+
if (handshakeRequest.header("Bar") != null) {
119+
result.append(" ").append(handshakeRequest.header("Bar"));
120+
}
121+
result.append("!");
122+
return result.toString();
111123
}
112124

113125
@OnTextMessage

extensions/websockets-next/runtime/src/main/java/io/quarkus/websockets/next/BasicWebSocketConnector.java

Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,8 @@
1414
import io.smallrye.common.annotation.CheckReturnValue;
1515
import io.smallrye.mutiny.Uni;
1616
import io.vertx.core.buffer.Buffer;
17+
import io.vertx.core.http.WebSocketClientOptions;
18+
import io.vertx.core.http.WebSocketConnectOptions;
1719

1820
/**
1921
* A basic connector can be used to configure and open a new client connection. Unlike with {@link WebSocketConnector} a
@@ -245,4 +247,15 @@ enum ExecutionModel {
245247
VIRTUAL_THREAD,
246248
}
247249

250+
/**
251+
* This method allows to customize {@link WebSocketConnectOptions} and {@link WebSocketClientOptions}.
252+
* Connection options configured directly with this API, such as host and port, have higher priority than this customizer.
253+
* Client options configured directly with Quarkus configuration (e.g. if you configure the maximum size of a frame)
254+
* have also higher priority than this customizer.
255+
* Purpose of this customizer is to complement configuration options not configured elsewhere.
256+
*
257+
* @param customizer options customizer; must not be null
258+
* @return self
259+
*/
260+
BasicWebSocketConnector customizeOptions(BiConsumer<WebSocketConnectOptions, WebSocketClientOptions> customizer);
248261
}

extensions/websockets-next/runtime/src/main/java/io/quarkus/websockets/next/WebSocketConnector.java

Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@
22

33
import java.net.URI;
44
import java.net.URLEncoder;
5+
import java.util.function.BiConsumer;
56

67
import jakarta.enterprise.inject.Default;
78
import jakarta.enterprise.inject.Instance;
@@ -10,6 +11,8 @@
1011
import io.quarkus.websockets.next.UserData.TypedKey;
1112
import io.smallrye.common.annotation.CheckReturnValue;
1213
import io.smallrye.mutiny.Uni;
14+
import io.vertx.core.http.WebSocketClientOptions;
15+
import io.vertx.core.http.WebSocketConnectOptions;
1316

1417
/**
1518
* A connector can be used to configure and open a new client connection backed by a client endpoint that is used to
@@ -132,4 +135,16 @@ default WebSocketClientConnection connectAndAwait() {
132135
return connect().await().indefinitely();
133136
}
134137

138+
/**
139+
* This method allows to customize {@link WebSocketConnectOptions} and {@link WebSocketClientOptions}.
140+
* Connection options configured directly with this API, such as host and port, have higher priority than this customizer.
141+
* Client options configured directly with Quarkus configuration (e.g. if you configure the maximum size of a frame)
142+
* have also higher priority than this customizer.
143+
* Purpose of this customizer is to complement configuration options not configured elsewhere.
144+
*
145+
* @param customizer options customizer; must not be null
146+
* @return self
147+
*/
148+
WebSocketConnector<CLIENT> customizeOptions(BiConsumer<WebSocketConnectOptions, WebSocketClientOptions> customizer);
149+
135150
}

extensions/websockets-next/runtime/src/main/java/io/quarkus/websockets/next/runtime/WebSocketConnectorBase.java

Lines changed: 31 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,7 @@
1313
import java.util.Optional;
1414
import java.util.Set;
1515
import java.util.concurrent.TimeUnit;
16+
import java.util.function.BiConsumer;
1617
import java.util.function.Consumer;
1718
import java.util.regex.Matcher;
1819
import java.util.regex.Pattern;
@@ -43,6 +44,10 @@ abstract class WebSocketConnectorBase<THIS extends WebSocketConnectorBase<THIS>>
4344

4445
protected URI baseUri;
4546

47+
protected WebSocketConnectOptions customWebSocketConnectOptions;
48+
49+
protected WebSocketClientOptions customWebSocketClientOptions;
50+
4651
protected final Map<String, String> pathParams;
4752

4853
protected final Map<String, List<String>> headers;
@@ -83,6 +88,8 @@ abstract class WebSocketConnectorBase<THIS extends WebSocketConnectorBase<THIS>>
8388
this.path = "";
8489
this.pathParamNames = Set.of();
8590
this.userData = new HashMap<>();
91+
this.customWebSocketConnectOptions = null;
92+
this.customWebSocketClientOptions = null;
8693
}
8794

8895
public THIS baseUri(URI baseUri) {
@@ -170,7 +177,12 @@ String replacePathParameters(String path) {
170177
}
171178

172179
protected WebSocketClientOptions populateClientOptions() {
173-
WebSocketClientOptions clientOptions = new WebSocketClientOptions();
180+
final WebSocketClientOptions clientOptions;
181+
if (customWebSocketClientOptions != null) {
182+
clientOptions = new WebSocketClientOptions(customWebSocketClientOptions);
183+
} else {
184+
clientOptions = new WebSocketClientOptions();
185+
}
174186
if (config.offerPerMessageCompression()) {
175187
clientOptions.setTryUsePerMessageCompression(true);
176188
if (config.compressionLevel().isPresent()) {
@@ -210,7 +222,13 @@ protected WebSocketClientOptions populateClientOptions() {
210222
}
211223

212224
protected WebSocketConnectOptions newConnectOptions(URI serverEndpointUri) {
213-
WebSocketConnectOptions connectOptions = new WebSocketConnectOptions()
225+
final WebSocketConnectOptions connectOptions;
226+
if (customWebSocketConnectOptions != null) {
227+
connectOptions = new WebSocketConnectOptions(customWebSocketConnectOptions);
228+
} else {
229+
connectOptions = new WebSocketConnectOptions();
230+
}
231+
connectOptions
214232
.setSsl(isSecure(serverEndpointUri))
215233
.setHost(serverEndpointUri.getHost());
216234
if (serverEndpointUri.getPort() != -1) {
@@ -249,4 +267,15 @@ public void accept(WebSocketClientConnection conn) {
249267
};
250268
}
251269

270+
public THIS customizeOptions(BiConsumer<WebSocketConnectOptions, WebSocketClientOptions> customizer) {
271+
Objects.requireNonNull(customizer, "Options customizer must not be null");
272+
if (customWebSocketClientOptions == null) {
273+
customWebSocketClientOptions = new WebSocketClientOptions();
274+
}
275+
if (customWebSocketConnectOptions == null) {
276+
customWebSocketConnectOptions = new WebSocketConnectOptions();
277+
}
278+
customizer.accept(customWebSocketConnectOptions, customWebSocketClientOptions);
279+
return self();
280+
}
252281
}

0 commit comments

Comments
 (0)