Skip to content
Merged
Show file tree
Hide file tree
Changes from 4 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -4,10 +4,25 @@
* Services that are supported by Testcontainers implementation
*/
public enum Service {
/**
* Advanced Message Queuing Protocol
*/
AMQP("amqp", 5672, "amqp", false),
/**
* Message Queuing Telemetry Transport
*/
MQTT("mqtt", 1883, "tcp", false),
/**
* Representational State Transfer
*/
REST("rest", 9000, "http", false),
/**
* Solace Message Format
*/
SMF("smf", 55555, "tcp", true),
/**
* Solace Message Format with SSL
*/
SMF_SSL("smf", 55443, "tcps", true);

private final String name;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -65,11 +65,19 @@ public SolaceContainer(String dockerImageName) {
this(DockerImageName.parse(dockerImageName));
}

/**
* Create a new solace container with the specified docker image.
*
* @param dockerImageName the image name that should be used.
*/
public SolaceContainer(DockerImageName dockerImageName) {
super(dockerImageName);
dockerImageName.assertCompatibleWith(DEFAULT_IMAGE_NAME);
withCreateContainerCmdModifier(cmd -> {
cmd.getHostConfig().withShmSize(SHM_SIZE).withUlimits(new Ulimit[] { new Ulimit("nofile", 2448L, 6592L) });
cmd
.getHostConfig()
.withShmSize(SHM_SIZE)
.withUlimits(new Ulimit[] { new Ulimit("nofile", 2448L, 1048576L) });
});
this.waitStrategy = Wait.forLogMessage(SOLACE_READY_MESSAGE, 1).withStartupTimeout(Duration.ofSeconds(60));
withExposedPorts(8080);
Expand Down Expand Up @@ -103,6 +111,17 @@ private Transferable createConfigurationScript() {
updateConfigScript(scriptBuilder, "create message-vpn " + vpn);
updateConfigScript(scriptBuilder, "no shutdown");
updateConfigScript(scriptBuilder, "exit");
updateConfigScript(scriptBuilder, "client-profile default message-vpn " + vpn);
updateConfigScript(scriptBuilder, "message-spool");
updateConfigScript(scriptBuilder, "allow-guaranteed-message-send");
updateConfigScript(scriptBuilder, "allow-guaranteed-message-receive");
updateConfigScript(scriptBuilder, "allow-guaranteed-endpoint-create");
updateConfigScript(scriptBuilder, "allow-guaranteed-endpoint-create-durability all");
updateConfigScript(scriptBuilder, "exit");
updateConfigScript(scriptBuilder, "exit");
updateConfigScript(scriptBuilder, "message-spool message-vpn " + vpn);
updateConfigScript(scriptBuilder, "max-spool-usage 60000");
updateConfigScript(scriptBuilder, "exit");
}

// Configure username and password
Expand Down Expand Up @@ -235,7 +254,7 @@ public SolaceContainer withCredentials(final String username, final String passw
/**
* Adds the topic configuration
*
* @param topic Name of the topic
* @param topic Name of the topic
* @param service Service to be supported on provided topic
* @return This container.
*/
Expand All @@ -260,7 +279,7 @@ public SolaceContainer withVpn(String vpn) {
* Sets the solace server ceritificates
*
* @param certFile Server certificate
* @param caFile Certified Authority certificate
* @param caFile Certified Authority certificate
* @return This container.
*/
public SolaceContainer withClientCert(final MountableFile certFile, final MountableFile caFile) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ public class SolaceContainerAMQPTest {
@Test
public void testSolaceContainer() throws JMSException {
try (
SolaceContainer solaceContainer = new SolaceContainer("solace/solace-pubsub-standard:10.2")
SolaceContainer solaceContainer = new SolaceContainer("solace/solace-pubsub-standard:10.25.0")
.withTopic(TOPIC_NAME, Service.AMQP)
.withVpn("amqp-vpn")
) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ public class SolaceContainerMQTTTest {
@Test
public void testSolaceContainer() {
try (
SolaceContainer solaceContainer = new SolaceContainer("solace/solace-pubsub-standard:10.2")
SolaceContainer solaceContainer = new SolaceContainer("solace/solace-pubsub-standard:10.25.0")
.withTopic(TOPIC_NAME, Service.MQTT)
.withVpn("mqtt-vpn")
) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ public class SolaceContainerRESTTest {
@Test
public void testSolaceContainer() throws IOException {
try (
SolaceContainer solaceContainer = new SolaceContainer("solace/solace-pubsub-standard:10.2")
SolaceContainer solaceContainer = new SolaceContainer("solace/solace-pubsub-standard:10.25.0")
.withTopic(TOPIC_NAME, Service.REST)
.withVpn("rest-vpn")
) {
Expand Down
Original file line number Diff line number Diff line change
@@ -1,11 +1,14 @@
package org.testcontainers.solace;

import com.solacesystems.jcsmp.BytesXMLMessage;
import com.solacesystems.jcsmp.ConsumerFlowProperties;
import com.solacesystems.jcsmp.EndpointProperties;
import com.solacesystems.jcsmp.JCSMPException;
import com.solacesystems.jcsmp.JCSMPFactory;
import com.solacesystems.jcsmp.JCSMPProperties;
import com.solacesystems.jcsmp.JCSMPSession;
import com.solacesystems.jcsmp.JCSMPStreamingPublishCorrelatingEventHandler;
import com.solacesystems.jcsmp.Queue;
import com.solacesystems.jcsmp.TextMessage;
import com.solacesystems.jcsmp.Topic;
import com.solacesystems.jcsmp.XMLMessageConsumer;
Expand All @@ -30,40 +33,77 @@ public class SolaceContainerSMFTest {

private static final Topic TOPIC = JCSMPFactory.onlyInstance().createTopic("Topic/ActualTopic");

private static final Queue QUEUE = JCSMPFactory.onlyInstance().createQueue("Queue");

@Test
public void testSolaceContainerWithSimpleAuthentication() {
try (
// solaceContainerSetup {
SolaceContainer solaceContainer = new SolaceContainer("solace/solace-pubsub-standard:10.2")
SolaceContainer solaceContainer = new SolaceContainer("solace/solace-pubsub-standard:10.25.0")
.withCredentials("user", "pass")
.withTopic("Topic/ActualTopic", Service.SMF)
.withTopic(TOPIC.getName(), Service.SMF)
.withVpn("test_vpn")
// }
) {
solaceContainer.start();
JCSMPSession session = createSessionWithBasicAuth(solaceContainer);
assertThat(session).isNotNull();
assertThat(consumeMessageFromSolace(session)).isEqualTo(MESSAGE);
consumeMessageFromTopics(session);
session.closeSession();
}
}

@Test
public void testSolaceContainerWithCreateFlow() {
try (
// solaceContainerSetup {
SolaceContainer solaceContainer = new SolaceContainer("solace/solace-pubsub-standard:10.25.0")
.withCredentials("user", "pass")
.withTopic(TOPIC.getName(), Service.SMF)
.withVpn("test_vpn")
// }
) {
solaceContainer.start();
JCSMPSession session = createSessionWithBasicAuth(solaceContainer);
assertThat(session).isNotNull();
testCreateFlow(session);
session.closeSession();
}
}

private static void testCreateFlow(JCSMPSession session) {
try {
EndpointProperties endpointProperties = new EndpointProperties();
endpointProperties.setAccessType(EndpointProperties.ACCESSTYPE_NONEXCLUSIVE);
endpointProperties.setQuota(1000);
session.provision(QUEUE, endpointProperties, JCSMPSession.FLAG_IGNORE_ALREADY_EXISTS);
session.addSubscription(QUEUE, TOPIC, JCSMPSession.WAIT_FOR_CONFIRM);
ConsumerFlowProperties flowProperties = new ConsumerFlowProperties().setEndpoint(QUEUE);
TestConsumer listener = new TestConsumer();
session.createFlow(listener, flowProperties).start();
publishMessageToSolaceTopic(session);
listener.waitForMessage();
} catch (Exception e) {
throw new RuntimeException("Cannot process message using solace topic/queue: " + e.getMessage(), e);
}
}

@Test
public void testSolaceContainerWithCertificates() {
try (
// solaceContainerUsageSSL {
SolaceContainer solaceContainer = new SolaceContainer("solace/solace-pubsub-standard:10.6")
SolaceContainer solaceContainer = new SolaceContainer("solace/solace-pubsub-standard:10.25.0")
.withClientCert(
MountableFile.forClasspathResource("solace.pem"),
MountableFile.forClasspathResource("rootCA.crt")
)
.withTopic("Topic/ActualTopic", Service.SMF_SSL)
.withTopic(TOPIC.getName(), Service.SMF_SSL)
// }
) {
solaceContainer.start();
JCSMPSession session = createSessionWithCertificates(solaceContainer);
assertThat(session).isNotNull();
assertThat(consumeMessageFromSolace(session)).isEqualTo(MESSAGE);
consumeMessageFromTopics(session);
session.closeSession();
}
}
Expand Down Expand Up @@ -112,7 +152,7 @@ private static JCSMPSession createSession(JCSMPProperties properties) {
}
}

private void publishMessageToSolace(JCSMPSession session) throws JCSMPException {
private static void publishMessageToSolaceTopic(JCSMPSession session) throws JCSMPException {
XMLMessageProducer producer = session.getMessageProducer(
new JCSMPStreamingPublishCorrelatingEventHandler() {
@Override
Expand All @@ -131,37 +171,49 @@ public void handleErrorEx(Object o, JCSMPException e, long l) {
producer.send(msg, TOPIC);
}

private String consumeMessageFromSolace(JCSMPSession session) {
CountDownLatch latch = new CountDownLatch(1);
private static void consumeMessageFromTopics(JCSMPSession session) {
try {
String[] result = new String[1];
XMLMessageConsumer cons = session.getMessageConsumer(
new XMLMessageListener() {
@Override
public void onReceive(BytesXMLMessage msg) {
if (msg instanceof TextMessage) {
TextMessage textMessage = (TextMessage) msg;
String message = textMessage.getText();
result[0] = message;
LOGGER.info("TextMessage received: " + message);
}
latch.countDown();
}

@Override
public void onException(JCSMPException e) {
LOGGER.error("Exception received: " + e.getMessage());
latch.countDown();
}
}
);
TestConsumer listener = new TestConsumer();
XMLMessageConsumer cons = session.getMessageConsumer(listener);
session.addSubscription(TOPIC);
cons.start();
publishMessageToSolace(session);
assertThat(latch.await(10L, TimeUnit.SECONDS)).isTrue();
return result[0];
publishMessageToSolaceTopic(session);
listener.waitForMessage();
} catch (Exception e) {
throw new RuntimeException("Cannot receive message from solace", e);
throw new RuntimeException("Cannot process message using solace: " + e.getMessage(), e);
}
}

static class TestConsumer implements XMLMessageListener {

private final CountDownLatch latch = new CountDownLatch(1);

private String result;

@Override
public void onReceive(BytesXMLMessage msg) {
if (msg instanceof TextMessage) {
TextMessage textMessage = (TextMessage) msg;
String message = textMessage.getText();
result = message;
LOGGER.info("Message received: " + message);
}
latch.countDown();
}

@Override
public void onException(JCSMPException e) {
LOGGER.error("Exception received: " + e.getMessage());
latch.countDown();
}

private void waitForMessage() {
try {
assertThat(latch.await(10L, TimeUnit.SECONDS)).isTrue();
assertThat(result).isEqualTo(MESSAGE);
} catch (Exception e) {
throw new RuntimeException("Cannot receive message from solace: " + e.getMessage(), e);
}
}
}
}
Loading