Skip to content

Commit c46aec2

Browse files
infoShareTomasz Foryseddumelendez
authored
Add support for queue in solace (#10450)
Fixes #10370 --------- Co-authored-by: Tomasz Forys <[email protected]> Co-authored-by: Eddú Meléndez Gonzales <[email protected]>
1 parent 57c2467 commit c46aec2

File tree

6 files changed

+122
-38
lines changed

6 files changed

+122
-38
lines changed

modules/solace/src/main/java/org/testcontainers/solace/Service.java

Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -4,10 +4,25 @@
44
* Services that are supported by Testcontainers implementation
55
*/
66
public enum Service {
7+
/**
8+
* Advanced Message Queuing Protocol
9+
*/
710
AMQP("amqp", 5672, "amqp", false),
11+
/**
12+
* Message Queuing Telemetry Transport
13+
*/
814
MQTT("mqtt", 1883, "tcp", false),
15+
/**
16+
* Representational State Transfer
17+
*/
918
REST("rest", 9000, "http", false),
19+
/**
20+
* Solace Message Format
21+
*/
1022
SMF("smf", 55555, "tcp", true),
23+
/**
24+
* Solace Message Format with SSL
25+
*/
1126
SMF_SSL("smf", 55443, "tcps", true);
1227

1328
private final String name;

modules/solace/src/main/java/org/testcontainers/solace/SolaceContainer.java

Lines changed: 20 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -65,11 +65,19 @@ public SolaceContainer(String dockerImageName) {
6565
this(DockerImageName.parse(dockerImageName));
6666
}
6767

68+
/**
69+
* Create a new solace container with the specified docker image.
70+
*
71+
* @param dockerImageName the image name that should be used.
72+
*/
6873
public SolaceContainer(DockerImageName dockerImageName) {
6974
super(dockerImageName);
7075
dockerImageName.assertCompatibleWith(DEFAULT_IMAGE_NAME);
7176
withCreateContainerCmdModifier(cmd -> {
72-
cmd.getHostConfig().withShmSize(SHM_SIZE).withUlimits(new Ulimit[] { new Ulimit("nofile", 2448L, 6592L) });
77+
cmd
78+
.getHostConfig()
79+
.withShmSize(SHM_SIZE)
80+
.withUlimits(new Ulimit[] { new Ulimit("nofile", 2448L, 1048576L) });
7381
});
7482
this.waitStrategy = Wait.forLogMessage(SOLACE_READY_MESSAGE, 1).withStartupTimeout(Duration.ofSeconds(60));
7583
withExposedPorts(8080);
@@ -103,6 +111,17 @@ private Transferable createConfigurationScript() {
103111
updateConfigScript(scriptBuilder, "create message-vpn " + vpn);
104112
updateConfigScript(scriptBuilder, "no shutdown");
105113
updateConfigScript(scriptBuilder, "exit");
114+
updateConfigScript(scriptBuilder, "client-profile default message-vpn " + vpn);
115+
updateConfigScript(scriptBuilder, "message-spool");
116+
updateConfigScript(scriptBuilder, "allow-guaranteed-message-send");
117+
updateConfigScript(scriptBuilder, "allow-guaranteed-message-receive");
118+
updateConfigScript(scriptBuilder, "allow-guaranteed-endpoint-create");
119+
updateConfigScript(scriptBuilder, "allow-guaranteed-endpoint-create-durability all");
120+
updateConfigScript(scriptBuilder, "exit");
121+
updateConfigScript(scriptBuilder, "exit");
122+
updateConfigScript(scriptBuilder, "message-spool message-vpn " + vpn);
123+
updateConfigScript(scriptBuilder, "max-spool-usage 60000");
124+
updateConfigScript(scriptBuilder, "exit");
106125
}
107126

108127
// Configure username and password

modules/solace/src/test/java/org/testcontainers/solace/SolaceContainerAMQPTest.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -31,7 +31,7 @@ public class SolaceContainerAMQPTest {
3131
@Test
3232
public void testSolaceContainer() throws JMSException {
3333
try (
34-
SolaceContainer solaceContainer = new SolaceContainer("solace/solace-pubsub-standard:10.2")
34+
SolaceContainer solaceContainer = new SolaceContainer("solace/solace-pubsub-standard:10.25.0")
3535
.withTopic(TOPIC_NAME, Service.AMQP)
3636
.withVpn("amqp-vpn")
3737
) {

modules/solace/src/test/java/org/testcontainers/solace/SolaceContainerMQTTTest.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -27,7 +27,7 @@ public class SolaceContainerMQTTTest {
2727
@Test
2828
public void testSolaceContainer() {
2929
try (
30-
SolaceContainer solaceContainer = new SolaceContainer("solace/solace-pubsub-standard:10.2")
30+
SolaceContainer solaceContainer = new SolaceContainer("solace/solace-pubsub-standard:10.25.0")
3131
.withTopic(TOPIC_NAME, Service.MQTT)
3232
.withVpn("mqtt-vpn")
3333
) {

modules/solace/src/test/java/org/testcontainers/solace/SolaceContainerRESTTest.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -28,7 +28,7 @@ public class SolaceContainerRESTTest {
2828
@Test
2929
public void testSolaceContainer() throws IOException {
3030
try (
31-
SolaceContainer solaceContainer = new SolaceContainer("solace/solace-pubsub-standard:10.2")
31+
SolaceContainer solaceContainer = new SolaceContainer("solace/solace-pubsub-standard:10.25.0")
3232
.withTopic(TOPIC_NAME, Service.REST)
3333
.withVpn("rest-vpn")
3434
) {

modules/solace/src/test/java/org/testcontainers/solace/SolaceContainerSMFTest.java

Lines changed: 84 additions & 34 deletions
Original file line numberDiff line numberDiff line change
@@ -1,11 +1,14 @@
11
package org.testcontainers.solace;
22

33
import com.solacesystems.jcsmp.BytesXMLMessage;
4+
import com.solacesystems.jcsmp.ConsumerFlowProperties;
5+
import com.solacesystems.jcsmp.EndpointProperties;
46
import com.solacesystems.jcsmp.JCSMPException;
57
import com.solacesystems.jcsmp.JCSMPFactory;
68
import com.solacesystems.jcsmp.JCSMPProperties;
79
import com.solacesystems.jcsmp.JCSMPSession;
810
import com.solacesystems.jcsmp.JCSMPStreamingPublishCorrelatingEventHandler;
11+
import com.solacesystems.jcsmp.Queue;
912
import com.solacesystems.jcsmp.TextMessage;
1013
import com.solacesystems.jcsmp.Topic;
1114
import com.solacesystems.jcsmp.XMLMessageConsumer;
@@ -30,40 +33,75 @@ public class SolaceContainerSMFTest {
3033

3134
private static final Topic TOPIC = JCSMPFactory.onlyInstance().createTopic("Topic/ActualTopic");
3235

36+
private static final Queue QUEUE = JCSMPFactory.onlyInstance().createQueue("Queue");
37+
3338
@Test
3439
public void testSolaceContainerWithSimpleAuthentication() {
3540
try (
3641
// solaceContainerSetup {
37-
SolaceContainer solaceContainer = new SolaceContainer("solace/solace-pubsub-standard:10.2")
42+
SolaceContainer solaceContainer = new SolaceContainer("solace/solace-pubsub-standard:10.25.0")
3843
.withCredentials("user", "pass")
39-
.withTopic("Topic/ActualTopic", Service.SMF)
44+
.withTopic(TOPIC.getName(), Service.SMF)
4045
.withVpn("test_vpn")
4146
// }
4247
) {
4348
solaceContainer.start();
4449
JCSMPSession session = createSessionWithBasicAuth(solaceContainer);
4550
assertThat(session).isNotNull();
46-
assertThat(consumeMessageFromSolace(session)).isEqualTo(MESSAGE);
51+
consumeMessageFromTopics(session);
4752
session.closeSession();
4853
}
4954
}
5055

56+
@Test
57+
public void testSolaceContainerWithCreateFlow() {
58+
try (
59+
SolaceContainer solaceContainer = new SolaceContainer("solace/solace-pubsub-standard:10.25.0")
60+
.withCredentials("user", "pass")
61+
.withTopic(TOPIC.getName(), Service.SMF)
62+
.withVpn("test_vpn")
63+
) {
64+
solaceContainer.start();
65+
JCSMPSession session = createSessionWithBasicAuth(solaceContainer);
66+
assertThat(session).isNotNull();
67+
testCreateFlow(session);
68+
session.closeSession();
69+
}
70+
}
71+
72+
private static void testCreateFlow(JCSMPSession session) {
73+
try {
74+
EndpointProperties endpointProperties = new EndpointProperties();
75+
endpointProperties.setAccessType(EndpointProperties.ACCESSTYPE_NONEXCLUSIVE);
76+
endpointProperties.setQuota(1000);
77+
session.provision(QUEUE, endpointProperties, JCSMPSession.FLAG_IGNORE_ALREADY_EXISTS);
78+
session.addSubscription(QUEUE, TOPIC, JCSMPSession.WAIT_FOR_CONFIRM);
79+
ConsumerFlowProperties flowProperties = new ConsumerFlowProperties().setEndpoint(QUEUE);
80+
TestConsumer listener = new TestConsumer();
81+
session.createFlow(listener, flowProperties).start();
82+
publishMessageToSolaceTopic(session);
83+
listener.waitForMessage();
84+
} catch (Exception e) {
85+
throw new RuntimeException("Cannot process message using solace topic/queue: " + e.getMessage(), e);
86+
}
87+
}
88+
5189
@Test
5290
public void testSolaceContainerWithCertificates() {
5391
try (
5492
// solaceContainerUsageSSL {
55-
SolaceContainer solaceContainer = new SolaceContainer("solace/solace-pubsub-standard:10.6")
93+
SolaceContainer solaceContainer = new SolaceContainer("solace/solace-pubsub-standard:10.25.0")
5694
.withClientCert(
5795
MountableFile.forClasspathResource("solace.pem"),
5896
MountableFile.forClasspathResource("rootCA.crt")
5997
)
60-
.withTopic("Topic/ActualTopic", Service.SMF_SSL)
98+
.withTopic(TOPIC.getName(), Service.SMF_SSL)
6199
// }
62100
) {
63101
solaceContainer.start();
64102
JCSMPSession session = createSessionWithCertificates(solaceContainer);
65103
assertThat(session).isNotNull();
66-
assertThat(consumeMessageFromSolace(session)).isEqualTo(MESSAGE);
104+
consumeMessageFromTopics(session);
67105
session.closeSession();
68106
}
69107
}
@@ -112,7 +150,7 @@ private static JCSMPSession createSession(JCSMPProperties properties) {
112150
}
113151
}
114152

115-
private void publishMessageToSolace(JCSMPSession session) throws JCSMPException {
153+
private static void publishMessageToSolaceTopic(JCSMPSession session) throws JCSMPException {
116154
XMLMessageProducer producer = session.getMessageProducer(
117155
new JCSMPStreamingPublishCorrelatingEventHandler() {
118156
@Override
@@ -131,37 +169,49 @@ public void handleErrorEx(Object o, JCSMPException e, long l) {
131169
producer.send(msg, TOPIC);
132170
}
133171

134-
private String consumeMessageFromSolace(JCSMPSession session) {
135-
CountDownLatch latch = new CountDownLatch(1);
172+
private static void consumeMessageFromTopics(JCSMPSession session) {
136173
try {
137-
String[] result = new String[1];
138-
XMLMessageConsumer cons = session.getMessageConsumer(
139-
new XMLMessageListener() {
140-
@Override
141-
public void onReceive(BytesXMLMessage msg) {
142-
if (msg instanceof TextMessage) {
143-
TextMessage textMessage = (TextMessage) msg;
144-
String message = textMessage.getText();
145-
result[0] = message;
146-
LOGGER.info("TextMessage received: " + message);
147-
}
148-
latch.countDown();
149-
}
150-
151-
@Override
152-
public void onException(JCSMPException e) {
153-
LOGGER.error("Exception received: " + e.getMessage());
154-
latch.countDown();
155-
}
156-
}
157-
);
174+
TestConsumer listener = new TestConsumer();
175+
XMLMessageConsumer cons = session.getMessageConsumer(listener);
158176
session.addSubscription(TOPIC);
159177
cons.start();
160-
publishMessageToSolace(session);
161-
assertThat(latch.await(10L, TimeUnit.SECONDS)).isTrue();
162-
return result[0];
178+
publishMessageToSolaceTopic(session);
179+
listener.waitForMessage();
163180
} catch (Exception e) {
164-
throw new RuntimeException("Cannot receive message from solace", e);
181+
throw new RuntimeException("Cannot process message using solace: " + e.getMessage(), e);
182+
}
183+
}
184+
185+
static class TestConsumer implements XMLMessageListener {
186+
187+
private final CountDownLatch latch = new CountDownLatch(1);
188+
189+
private String result;
190+
191+
@Override
192+
public void onReceive(BytesXMLMessage msg) {
193+
if (msg instanceof TextMessage) {
194+
TextMessage textMessage = (TextMessage) msg;
195+
String message = textMessage.getText();
196+
result = message;
197+
LOGGER.info("Message received: " + message);
198+
}
199+
latch.countDown();
200+
}
201+
202+
@Override
203+
public void onException(JCSMPException e) {
204+
LOGGER.error("Exception received: " + e.getMessage());
205+
latch.countDown();
206+
}
207+
208+
private void waitForMessage() {
209+
try {
210+
assertThat(latch.await(10L, TimeUnit.SECONDS)).isTrue();
211+
assertThat(result).isEqualTo(MESSAGE);
212+
} catch (Exception e) {
213+
throw new RuntimeException("Cannot receive message from solace: " + e.getMessage(), e);
214+
}
165215
}
166216
}
167217
}

0 commit comments

Comments
 (0)