Skip to content
This repository was archived by the owner on Sep 26, 2025. It is now read-only.

Commit 425f4d6

Browse files
committed
Allows test suite to run against a RabbitMQ container
1 parent 96ea7d6 commit 425f4d6

File tree

3 files changed

+60
-23
lines changed

3 files changed

+60
-23
lines changed

README.md

Lines changed: 11 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -132,7 +132,17 @@ can specify the path to `rabbitmqctl` like the following:
132132

133133
./gradlew check -Drabbitmqctl.bin=/path/to/rabbitmqctl
134134

135-
You need a local running RabbitMQ instance.
135+
You need a local running RabbitMQ instance.
136+
137+
### Running tests with Docker
138+
139+
Start a RabbitMQ container:
140+
141+
docker run -it --rm --name rabbitmq -p 5672:5672 rabbitmq:3.8
142+
143+
Run the test suite:
144+
145+
./gradlew check -i -s -Drabbitmqctl.bin=DOCKER:rabbitmq
136146

137147
### Building IDE project
138148
./gradlew eclipse

src/test/java/reactor/rabbitmq/ConnectionRecoveryTests.java

Lines changed: 11 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,7 @@
2222
import org.junit.jupiter.api.AfterEach;
2323
import org.junit.jupiter.api.BeforeEach;
2424
import org.junit.jupiter.api.Test;
25+
import org.junit.jupiter.api.TestInfo;
2526
import org.junit.jupiter.params.ParameterizedTest;
2627
import org.junit.jupiter.params.provider.MethodSource;
2728
import org.slf4j.Logger;
@@ -85,7 +86,7 @@ private static void wait(CountDownLatch latch) throws InterruptedException {
8586
}
8687

8788
@BeforeEach
88-
public void init() throws Exception {
89+
public void init(TestInfo info) throws Exception {
8990
ConnectionFactory connectionFactory = new ConnectionFactory();
9091
connectionFactory.useNio();
9192
connectionFactory.setNetworkRecoveryInterval(RECOVERY_INTERVAL);
@@ -96,7 +97,9 @@ public void init() throws Exception {
9697
channel.close();
9798
receiver = null;
9899
sender = null;
99-
connectionMono = Mono.just(connectionFactory.newConnection()).cache();
100+
connectionMono = Mono.just(connectionFactory.newConnection(
101+
info.getTestMethod().get().getName()))
102+
.cache();
100103
}
101104

102105
@AfterEach
@@ -142,7 +145,7 @@ public void consumeConsumerShouldRecoverAutomatically(BiFunction<Receiver, Strin
142145
}
143146
});
144147

145-
closeAndWaitForRecovery((RecoverableConnection) connectionMono.block());
148+
closeAndWaitForRecovery(connectionMono.block());
146149

147150
for (int $$ : IntStream.range(0, nbMessages).toArray()) {
148151
channel.basicPublish("", queue, null, "Hello".getBytes());
@@ -295,7 +298,7 @@ public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProp
295298
)))
296299
.subscribe();
297300

298-
closeAndWaitForRecovery((RecoverableConnection) connectionMono.block());
301+
closeAndWaitForRecovery(connectionMono.block());
299302

300303
assertTrue(latch.await(10, TimeUnit.SECONDS));
301304
assertEquals(nbMessages, counter.get());
@@ -345,7 +348,7 @@ public void sendWithPublishConfirmsAllMessagesShouldBeSentConfirmedAndConsumed()
345348
}
346349
});
347350

348-
closeAndWaitForRecovery((RecoverableConnection) connectionMono.block());
351+
closeAndWaitForRecovery(connectionMono.block());
349352

350353
// we expect all messages to make to the queue (they're retried)
351354
assertTrue(consumedLatch.await(20, TimeUnit.SECONDS));
@@ -382,15 +385,15 @@ public void topologyRecovery() throws Exception {
382385

383386
latch.set(new CountDownLatch(1));
384387

385-
closeAndWaitForRecovery((RecoverableConnection) connectionMono.block());
388+
closeAndWaitForRecovery(connectionMono.block());
386389

387390
ch.basicPublish(e, "", null, "".getBytes());
388391
assertTrue(latch.get().await(5, TimeUnit.SECONDS));
389392
}
390393

391-
private void closeAndWaitForRecovery(RecoverableConnection connection) throws IOException, InterruptedException {
394+
private void closeAndWaitForRecovery(Connection connection) throws IOException, InterruptedException {
392395
CountDownLatch latch = prepareForRecovery(connection);
393-
Host.closeConnection((NetworkConnection) connection);
396+
Host.closeConnection(connection);
394397
wait(latch);
395398
}
396399

src/test/java/reactor/rabbitmq/Host.java

Lines changed: 38 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -16,7 +16,7 @@
1616

1717
package reactor.rabbitmq;
1818

19-
import com.rabbitmq.client.impl.NetworkConnection;
19+
import com.rabbitmq.client.Connection;
2020

2121
import java.io.BufferedReader;
2222
import java.io.IOException;
@@ -87,23 +87,34 @@ public static Process rabbitmqctl(String command) throws IOException {
8787
}
8888

8989
public static String rabbitmqctlCommand() {
90-
return System.getProperty("rabbitmqctl.bin");
90+
String rabbitmqCtl = System.getProperty("rabbitmqctl.bin", System.getenv("RABBITMQCTL_BIN"));
91+
if (rabbitmqCtl == null) {
92+
throw new IllegalStateException("Please define the rabbitmqctl.bin system property or "
93+
+ "the RABBITMQCTL_BIN environment variable");
94+
}
95+
if (rabbitmqCtl.startsWith("DOCKER:")) {
96+
String containerId = rabbitmqCtl.split(":")[1];
97+
return "docker exec " + containerId + " rabbitmqctl";
98+
} else {
99+
return rabbitmqCtl;
100+
}
91101
}
92102

93-
public static void closeConnection(String pid) throws IOException {
103+
private static void closeConnection(String pid) throws IOException {
94104
rabbitmqctl("close_connection '" + pid + "' 'Closed via rabbitmqctl'");
95105
}
96106

97-
public static void closeConnection(NetworkConnection c) throws IOException {
107+
public static void closeConnection(Connection c) throws IOException {
98108
Host.ConnectionInfo ci = findConnectionInfoFor(Host.listConnections(), c);
99109
closeConnection(ci.getPid());
100110
}
101111

102112
public static List<ConnectionInfo> listConnections() throws IOException {
103-
String output = capture(rabbitmqctl("list_connections -q pid peer_port").getInputStream());
113+
String output = capture(rabbitmqctl("list_connections -q pid peer_port client_properties")
114+
.getInputStream());
104115
// output (header line presence depends on broker version):
105116
// pid peer_port
106-
// <[email protected]> 58713
117+
// <[email protected]> 58713 [{"product","RabbitMQ"},{"...
107118
String[] allLines = output.split("\n");
108119

109120
ArrayList<ConnectionInfo> result = new ArrayList<ConnectionInfo>();
@@ -112,18 +123,31 @@ public static List<ConnectionInfo> listConnections() throws IOException {
112123
String[] columns = line.split("\t");
113124
// can be also header line, so ignoring NumberFormatException
114125
try {
115-
result.add(new ConnectionInfo(columns[0], Integer.valueOf(columns[1])));
126+
Integer.valueOf(columns[1]); // just to ignore header line
127+
result.add(new ConnectionInfo(columns[0], connectionName(columns[2])));
116128
} catch (NumberFormatException e) {
117129
// OK
118130
}
119131
}
120132
return result;
121133
}
122134

123-
private static Host.ConnectionInfo findConnectionInfoFor(List<ConnectionInfo> xs, NetworkConnection c) {
135+
private static String connectionName(String clientProperties) {
136+
String beginning = "\"connection_name\",\"";
137+
int begin = clientProperties.indexOf(beginning);
138+
if (begin > 0) {
139+
int start = clientProperties.indexOf(beginning) + beginning.length();
140+
int end = clientProperties.indexOf("\"", start);
141+
return clientProperties.substring(start, end);
142+
} else {
143+
return null;
144+
}
145+
}
146+
147+
private static Host.ConnectionInfo findConnectionInfoFor(List<ConnectionInfo> xs, Connection c) {
124148
Host.ConnectionInfo result = null;
125149
for (Host.ConnectionInfo ci : xs) {
126-
if (c.getLocalPort() == ci.getPeerPort()) {
150+
if (c.getClientProvidedName().equals(ci.getName())) {
127151
result = ci;
128152
break;
129153
}
@@ -134,19 +158,19 @@ private static Host.ConnectionInfo findConnectionInfoFor(List<ConnectionInfo> xs
134158
public static class ConnectionInfo {
135159

136160
private final String pid;
137-
private final int peerPort;
161+
private final String name;
138162

139-
public ConnectionInfo(String pid, int peerPort) {
163+
public ConnectionInfo(String pid, String name) {
140164
this.pid = pid;
141-
this.peerPort = peerPort;
165+
this.name = name;
142166
}
143167

144168
public String getPid() {
145169
return pid;
146170
}
147171

148-
public int getPeerPort() {
149-
return peerPort;
172+
public String getName() {
173+
return name;
150174
}
151175
}
152176
}

0 commit comments

Comments
 (0)