Skip to content

Commit 9971356

Browse files
committed
Test against WebSocket
1 parent f006302 commit 9971356

File tree

9 files changed

+89
-28
lines changed

9 files changed

+89
-28
lines changed

src/main/java/com/rabbitmq/client/amqp/impl/AmqpConnection.java

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -255,6 +255,13 @@ private NativeConnectionWrapper connect(
255255
sslOptions.verifyHost(tlsSettings.isHostnameVerification());
256256
}
257257
Address address = connectionSettings.selectAddress(addresses);
258+
if (connectionSettings.useWebSocket()) {
259+
LOGGER.trace("Using WebSocket for connection '{}'", this.name());
260+
connectionOptions
261+
.transportOptions()
262+
.useWebSockets(true)
263+
.webSocketPath(connectionSettings.webSocketPath());
264+
}
258265
StopWatch stopWatch = new StopWatch();
259266
try {
260267
LOGGER.trace("Connecting '{}' to {}...", this.name(), address);

src/main/java/com/rabbitmq/client/amqp/impl/DefaultConnectionSettings.java

Lines changed: 31 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -52,10 +52,14 @@ abstract class DefaultConnectionSettings<T> implements ConnectionSettings<T> {
5252
static final String DEFAULT_HOST = "localhost";
5353
static final int DEFAULT_PORT = 5672;
5454
static final int DEFAULT_TLS_PORT = 5671;
55+
static final int DEFAULT_WEB_SOCKET_PORT = 15678;
56+
static final int DEFAULT_WEB_SOCKET_TLS_PORT = 15677;
5557
static final String DEFAULT_VIRTUAL_HOST = "/";
5658

5759
private String host = DEFAULT_HOST;
5860
private int port = DEFAULT_PORT;
61+
private boolean useWebSocket = false;
62+
private String webSocketPath = "/ws";
5963
private CredentialsProvider credentialsProvider;
6064
private String virtualHost = DEFAULT_VIRTUAL_HOST;
6165
private List<URI> uris = Collections.emptyList();
@@ -135,6 +139,11 @@ public T port(int port) {
135139
return toReturn();
136140
}
137141

142+
T useWebSocket(boolean useWebSocket) {
143+
this.useWebSocket = useWebSocket;
144+
return toReturn();
145+
}
146+
138147
@Override
139148
public T virtualHost(String virtualHost) {
140149
this.virtualHost = virtualHost;
@@ -176,6 +185,14 @@ public T saslMechanism(String mechanism) {
176185
return this.toReturn();
177186
}
178187

188+
boolean useWebSocket() {
189+
return this.useWebSocket;
190+
}
191+
192+
String webSocketPath() {
193+
return this.webSocketPath;
194+
}
195+
179196
CredentialsProvider credentialsProvider() {
180197
return credentialsProvider;
181198
}
@@ -213,6 +230,8 @@ DefaultTlsSettings<?> tlsSettings() {
213230
void copyTo(DefaultConnectionSettings<?> copy) {
214231
copy.host(this.host);
215232
copy.port(this.port);
233+
copy.useWebSocket(this.useWebSocket);
234+
copy.webSocketPath = this.webSocketPath;
216235
copy.saslMechanism(this.saslMechanism);
217236
copy.credentialsProvider(this.credentialsProvider);
218237
copy.virtualHost(this.virtualHost);
@@ -234,8 +253,9 @@ void copyTo(DefaultConnectionSettings<?> copy) {
234253
DefaultConnectionSettings<?> consolidate() {
235254
if (this.uris.isEmpty()) {
236255
int p = this.port;
237-
if (this.tlsEnabled() && this.port == DEFAULT_PORT) {
238-
p = DEFAULT_TLS_PORT;
256+
if (this.tlsEnabled()
257+
&& (this.port == DEFAULT_PORT || this.port == DEFAULT_WEB_SOCKET_PORT)) {
258+
p = this.useWebSocket ? DEFAULT_WEB_SOCKET_TLS_PORT : DEFAULT_TLS_PORT;
239259
}
240260
this.addresses.add(new Address(this.host, p));
241261
} else {
@@ -275,7 +295,7 @@ DefaultConnectionSettings<?> consolidate() {
275295
this.tlsEnabled()
276296
|| this.uris.stream().anyMatch(u -> u.getScheme().equalsIgnoreCase("amqps"));
277297

278-
int defaultPort = tls ? DEFAULT_TLS_PORT : DEFAULT_PORT;
298+
int defaultPort = tls ? defaultTlsPort() : defaultPort();
279299
List<Address> addrs =
280300
this.uris.stream()
281301
.map(
@@ -290,6 +310,14 @@ DefaultConnectionSettings<?> consolidate() {
290310
return this;
291311
}
292312

313+
private int defaultPort() {
314+
return this.useWebSocket ? DEFAULT_WEB_SOCKET_PORT : DEFAULT_PORT;
315+
}
316+
317+
private int defaultTlsPort() {
318+
return this.useWebSocket ? DEFAULT_WEB_SOCKET_TLS_PORT : DEFAULT_TLS_PORT;
319+
}
320+
293321
@Override
294322
public TlsSettings<T> tls() {
295323
this.tlsSettings.enable();

src/test/java/com/rabbitmq/client/amqp/impl/AmqpConnectionRecoveryTest.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -21,8 +21,7 @@
2121
import static com.rabbitmq.client.amqp.Resource.State.OPEN;
2222
import static com.rabbitmq.client.amqp.Resource.State.RECOVERING;
2323
import static com.rabbitmq.client.amqp.impl.Assertions.assertThat;
24-
import static com.rabbitmq.client.amqp.impl.TestUtils.name;
25-
import static com.rabbitmq.client.amqp.impl.TestUtils.waitAtMost;
24+
import static com.rabbitmq.client.amqp.impl.TestUtils.*;
2625
import static java.time.Duration.ofMillis;
2726
import static java.util.Arrays.stream;
2827
import static org.assertj.core.api.Assertions.assertThat;
@@ -49,6 +48,7 @@ public class AmqpConnectionRecoveryTest {
4948
@BeforeAll
5049
static void initAll() {
5150
DefaultConnectionSettings<?> connectionSettings = DefaultConnectionSettings.instance();
51+
maybeConfigureWebSocket(connectionSettings);
5252
connectionSettings.addressSelector(
5353
addresses -> {
5454
connectionAttemptCount.incrementAndGet();

src/test/java/com/rabbitmq/client/amqp/impl/ClientTest.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -261,7 +261,7 @@ void management(TestInfo info) throws Exception {
261261
@Test
262262
void queueDeletionImpactOnReceiver(TestInfo info) throws Exception {
263263
String queue = name(info);
264-
try (Environment env = new AmqpEnvironmentBuilder().build();
264+
try (Environment env = TestUtils.environmentBuilder().build();
265265
com.rabbitmq.client.amqp.Connection connection = env.connectionBuilder().build();
266266
Client client = client()) {
267267
connection.management().queue().name(queue).declare();
@@ -285,7 +285,7 @@ void queueDeletionImpactOnReceiver(TestInfo info) throws Exception {
285285
@Test
286286
void exchangeDeletionImpactOnSender(TestInfo info) throws Exception {
287287
String exchange = name(info);
288-
try (Environment env = new AmqpEnvironmentBuilder().build();
288+
try (Environment env = TestUtils.environmentBuilder().build();
289289
com.rabbitmq.client.amqp.Connection connection = env.connectionBuilder().build();
290290
Client client = client()) {
291291
connection.management().exchange().name(exchange).type(FANOUT).declare();

src/test/java/com/rabbitmq/client/amqp/impl/ClusterTest.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -60,7 +60,7 @@ void init(TestInfo info) {
6060
this.q = name(info);
6161
this.name = name(info);
6262
environment =
63-
new AmqpEnvironmentBuilder()
63+
TestUtils.environmentBuilder()
6464
.connectionSettings()
6565
.addressSelector(new RoundRobinAddressSelector())
6666
.uris(URIS)

src/test/java/com/rabbitmq/client/amqp/impl/MicrometerObservationCollectorTest.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -51,7 +51,7 @@ class PublishConsume extends IntegrationTest {
5151
public SampleTestRunner.SampleTestRunnerConsumer yourCode() {
5252
return (buildingBlocks, meterRegistry) -> {
5353
try (Environment env =
54-
new AmqpEnvironmentBuilder()
54+
TestUtils.environmentBuilder()
5555
.observationCollector(
5656
new MicrometerObservationCollectorBuilder()
5757
.registry(getObservationRegistry())
@@ -109,7 +109,7 @@ public SampleTestRunner.SampleTestRunnerConsumer yourCode() {
109109
.hasTag("messaging.message.id", messageId.toString())
110110
.hasTag("messaging.message.conversation_id", String.valueOf(correlationId))
111111
.hasTagWithKey("net.sock.peer.addr")
112-
.hasTag("net.sock.peer.port", "5672")
112+
.hasTag("net.sock.peer.port", String.valueOf(TestUtils.defaultPort()))
113113
.hasTag("net.protocol.name", "amqp")
114114
.hasTag("net.protocol.version", "1.0");
115115

src/test/java/com/rabbitmq/client/amqp/impl/Oauth2Test.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -180,7 +180,7 @@ void tokenShouldBeRefreshedAutomatically(boolean shared, TestInfo info) throws E
180180
this.server = startServer(port, contextPath, httpHandler);
181181

182182
String uri = "http://localhost:" + port + contextPath;
183-
AmqpEnvironmentBuilder envBuilder = new AmqpEnvironmentBuilder();
183+
AmqpEnvironmentBuilder envBuilder = TestUtils.environmentBuilder();
184184
DefaultOAuth2Settings<? extends EnvironmentConnectionSettings> oauth2 =
185185
(DefaultOAuth2Settings<? extends EnvironmentConnectionSettings>)
186186
envBuilder.connectionSettings().oauth2();
@@ -293,7 +293,7 @@ void oauthConfigurationShouldUsePlainSaslMechanism() throws Exception {
293293

294294
String uri = "http://localhost:" + port + contextPath;
295295
try (Environment env =
296-
new AmqpEnvironmentBuilder()
296+
TestUtils.environmentBuilder()
297297
.connectionSettings()
298298
.oauth2()
299299
.tokenEndpointUri(uri)

src/test/java/com/rabbitmq/client/amqp/impl/TestUtils.java

Lines changed: 30 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -54,6 +54,8 @@ public abstract class TestUtils {
5454

5555
static final Duration DEFAULT_CONDITION_TIMEOUT = Duration.ofSeconds(10);
5656
static final Duration DEFAULT_WAIT_TIME = Duration.ofMillis(100);
57+
private static final boolean USE_WEB_SOCKET =
58+
Boolean.parseBoolean(System.getProperty("rabbitmq.use.web.socket", "false"));
5759

5860
private TestUtils() {}
5961

@@ -255,8 +257,11 @@ static Connection connection(
255257
connectionOptions.properties(singletonMap("connection_name", name));
256258
}
257259
optionsCallback.accept(connectionOptions);
260+
if (useWebSocket()) {
261+
connectionOptions.transportOptions().useWebSockets(true).webSocketPath("/ws");
262+
}
258263
try {
259-
return client.connect("localhost", 5672, connectionOptions);
264+
return client.connect("localhost", TestUtils.defaultPort(), connectionOptions);
260265
} catch (ClientException e) {
261266
throw new RuntimeException(e);
262267
}
@@ -271,7 +276,7 @@ static Proxy toxiproxy(ToxiproxyClient client, String name, int proxyPort) throw
271276
client.createProxy(
272277
name,
273278
"localhost:" + proxyPort,
274-
DefaultConnectionSettings.DEFAULT_HOST + ":" + DefaultConnectionSettings.DEFAULT_PORT);
279+
DefaultConnectionSettings.DEFAULT_HOST + ":" + defaultPort());
275280
return proxy;
276281
}
277282

@@ -288,7 +293,16 @@ static UnsignedLong ulong(long value) {
288293
}
289294

290295
public static AmqpEnvironmentBuilder environmentBuilder() {
291-
return new AmqpEnvironmentBuilder();
296+
AmqpEnvironmentBuilder builder = new AmqpEnvironmentBuilder();
297+
maybeConfigureWebSocket((DefaultConnectionSettings<?>) builder.connectionSettings());
298+
return builder;
299+
}
300+
301+
static void maybeConfigureWebSocket(DefaultConnectionSettings<?> connectionSettings) {
302+
if (useWebSocket()) {
303+
connectionSettings.useWebSocket(true);
304+
connectionSettings.port(DefaultConnectionSettings.DEFAULT_WEB_SOCKET_PORT);
305+
}
292306
}
293307

294308
@SuppressWarnings("unchecked")
@@ -307,7 +321,9 @@ static void storeToxiproxyClient(ToxiproxyClient client, ExtensionContext contex
307321
}
308322

309323
static boolean tlsAvailable() {
310-
return Cli.rabbitmqctl("status").output().contains("amqp/ssl");
324+
String status = Cli.rabbitmqctl("status").output();
325+
String shouldContain = useWebSocket() ? "https/web-amqp" : "amqp/ssl";
326+
return status.contains(shouldContain);
311327
}
312328

313329
static boolean addressV1Permitted() {
@@ -321,6 +337,16 @@ static boolean isCluster() {
321337
return !Cli.rabbitmqctl("eval 'nodes().'").output().replace("[", "").replace("]", "").isBlank();
322338
}
323339

340+
static boolean useWebSocket() {
341+
return USE_WEB_SOCKET;
342+
}
343+
344+
static int defaultPort() {
345+
return useWebSocket()
346+
? DefaultConnectionSettings.DEFAULT_WEB_SOCKET_PORT
347+
: DefaultConnectionSettings.DEFAULT_PORT;
348+
}
349+
324350
static class DisabledIfTlsNotEnabledCondition implements ExecutionCondition {
325351

326352
@Override

src/test/resources/logback-test.xml

Lines changed: 12 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -10,18 +10,18 @@
1010
<logger name="com.rabbitmq.client.amqp" level="warn" />
1111
<logger name="org.apache.qpid" level="error" />
1212
<logger name="com.rabbitmq.client.amqp.perf" level="info" />
13-
<logger name="com.rabbitmq.client.amqp.impl.ConnectionUtils" level="debug" />
14-
<logger name="com.rabbitmq.client.amqp.impl.AmqpConnection" level="debug" />
15-
<logger name="com.rabbitmq.client.amqp.impl.RecoveryClusterTest" level="info" />
16-
<logger name="com.rabbitmq.client.amqp.impl.RecordingTopologyListener" level="warn" />
17-
<logger name="com.rabbitmq.client.amqp.impl.ConnectionUtils" level="warn" />
18-
<logger name="com.rabbitmq.client.amqp.impl.AsyncRetry" level="debug" />
19-
<logger name="com.rabbitmq.client.amqp.impl.EntityRecovery" level="warn" />
20-
<logger name="com.rabbitmq.client.amqp.impl.EventLoop" level="warn" />
21-
<logger name="com.rabbitmq.client.amqp.impl.AmqpManagement" level="debug" />
22-
<logger name="com.rabbitmq.client.amqp.impl.AmqpPublisher" level="warn" />
23-
<logger name="com.rabbitmq.client.amqp.impl.AmqpConsumer" level="warn" />
24-
<logger name="com.rabbitmq.client.amqp.impl.RetryUtils" level="debug" />
13+
<!-- <logger name="com.rabbitmq.client.amqp.impl.ConnectionUtils" level="debug" />-->
14+
<!-- <logger name="com.rabbitmq.client.amqp.impl.AmqpConnection" level="debug" />-->
15+
<!-- <logger name="com.rabbitmq.client.amqp.impl.RecoveryClusterTest" level="info" />-->
16+
<!-- <logger name="com.rabbitmq.client.amqp.impl.RecordingTopologyListener" level="warn" />-->
17+
<!-- <logger name="com.rabbitmq.client.amqp.impl.ConnectionUtils" level="warn" />-->
18+
<!-- <logger name="com.rabbitmq.client.amqp.impl.AsyncRetry" level="debug" />-->
19+
<!-- <logger name="com.rabbitmq.client.amqp.impl.EntityRecovery" level="warn" />-->
20+
<!-- <logger name="com.rabbitmq.client.amqp.impl.EventLoop" level="warn" />-->
21+
<!-- <logger name="com.rabbitmq.client.amqp.impl.AmqpManagement" level="debug" />-->
22+
<!-- <logger name="com.rabbitmq.client.amqp.impl.AmqpPublisher" level="warn" />-->
23+
<!-- <logger name="com.rabbitmq.client.amqp.impl.AmqpConsumer" level="warn" />-->
24+
<!-- <logger name="com.rabbitmq.client.amqp.impl.RetryUtils" level="debug" />-->
2525

2626
<root level="warn">
2727
<appender-ref ref="STDOUT" />

0 commit comments

Comments
 (0)