11package org .testcontainers .solace ;
22
33import com .solacesystems .jcsmp .BytesXMLMessage ;
4+ import com .solacesystems .jcsmp .ConsumerFlowProperties ;
5+ import com .solacesystems .jcsmp .EndpointProperties ;
46import com .solacesystems .jcsmp .JCSMPException ;
57import com .solacesystems .jcsmp .JCSMPFactory ;
68import com .solacesystems .jcsmp .JCSMPProperties ;
79import com .solacesystems .jcsmp .JCSMPSession ;
810import com .solacesystems .jcsmp .JCSMPStreamingPublishCorrelatingEventHandler ;
11+ import com .solacesystems .jcsmp .Queue ;
912import com .solacesystems .jcsmp .TextMessage ;
1013import com .solacesystems .jcsmp .Topic ;
1114import com .solacesystems .jcsmp .XMLMessageConsumer ;
@@ -30,40 +33,77 @@ public class SolaceContainerSMFTest {
3033
3134 private static final Topic TOPIC = JCSMPFactory .onlyInstance ().createTopic ("Topic/ActualTopic" );
3235
36+ private static final Queue QUEUE = JCSMPFactory .onlyInstance ().createQueue ("Queue" );
37+
3338 @ Test
3439 public void testSolaceContainerWithSimpleAuthentication () {
3540 try (
3641 // solaceContainerSetup {
37- SolaceContainer solaceContainer = new SolaceContainer ("solace/solace-pubsub-standard:10.2 " )
42+ SolaceContainer solaceContainer = new SolaceContainer ("solace/solace-pubsub-standard:10.25.0 " )
3843 .withCredentials ("user" , "pass" )
39- .withTopic ("Topic/ActualTopic" , Service .SMF )
44+ .withTopic (TOPIC . getName () , Service .SMF )
4045 .withVpn ("test_vpn" )
4146 // }
4247 ) {
4348 solaceContainer .start ();
4449 JCSMPSession session = createSessionWithBasicAuth (solaceContainer );
4550 assertThat (session ).isNotNull ();
46- assertThat ( consumeMessageFromSolace ( session )). isEqualTo ( MESSAGE );
51+ consumeMessageFromTopics ( session );
4752 session .closeSession ();
4853 }
4954 }
5055
56+ @ Test
57+ public void testSolaceContainerWithCreateFlow () {
58+ try (
59+ // solaceContainerSetup {
60+ SolaceContainer solaceContainer = new SolaceContainer ("solace/solace-pubsub-standard:10.25.0" )
61+ .withCredentials ("user" , "pass" )
62+ .withTopic (TOPIC .getName (), Service .SMF )
63+ .withVpn ("test_vpn" )
64+ // }
65+ ) {
66+ solaceContainer .start ();
67+ JCSMPSession session = createSessionWithBasicAuth (solaceContainer );
68+ assertThat (session ).isNotNull ();
69+ testCreateFlow (session );
70+ session .closeSession ();
71+ }
72+ }
73+
74+ private static void testCreateFlow (JCSMPSession session ) {
75+ try {
76+ EndpointProperties endpointProperties = new EndpointProperties ();
77+ endpointProperties .setAccessType (EndpointProperties .ACCESSTYPE_NONEXCLUSIVE );
78+ endpointProperties .setQuota (1000 );
79+ session .provision (QUEUE , endpointProperties , JCSMPSession .FLAG_IGNORE_ALREADY_EXISTS );
80+ session .addSubscription (QUEUE , TOPIC , JCSMPSession .WAIT_FOR_CONFIRM );
81+ ConsumerFlowProperties flowProperties = new ConsumerFlowProperties ().setEndpoint (QUEUE );
82+ TestConsumer listener = new TestConsumer ();
83+ session .createFlow (listener , flowProperties ).start ();
84+ publishMessageToSolaceTopic (session );
85+ listener .waitForMessage ();
86+ } catch (Exception e ) {
87+ throw new RuntimeException ("Cannot process message using solace topic/queue: " + e .getMessage (), e );
88+ }
89+ }
90+
5191 @ Test
5292 public void testSolaceContainerWithCertificates () {
5393 try (
5494 // solaceContainerUsageSSL {
55- SolaceContainer solaceContainer = new SolaceContainer ("solace/solace-pubsub-standard:10.6 " )
95+ SolaceContainer solaceContainer = new SolaceContainer ("solace/solace-pubsub-standard:10.25.0 " )
5696 .withClientCert (
5797 MountableFile .forClasspathResource ("solace.pem" ),
5898 MountableFile .forClasspathResource ("rootCA.crt" )
5999 )
60- .withTopic ("Topic/ActualTopic" , Service .SMF_SSL )
100+ .withTopic (TOPIC . getName () , Service .SMF_SSL )
61101 // }
62102 ) {
63103 solaceContainer .start ();
64104 JCSMPSession session = createSessionWithCertificates (solaceContainer );
65105 assertThat (session ).isNotNull ();
66- assertThat ( consumeMessageFromSolace ( session )). isEqualTo ( MESSAGE );
106+ consumeMessageFromTopics ( session );
67107 session .closeSession ();
68108 }
69109 }
@@ -112,7 +152,7 @@ private static JCSMPSession createSession(JCSMPProperties properties) {
112152 }
113153 }
114154
115- private void publishMessageToSolace (JCSMPSession session ) throws JCSMPException {
155+ private static void publishMessageToSolaceTopic (JCSMPSession session ) throws JCSMPException {
116156 XMLMessageProducer producer = session .getMessageProducer (
117157 new JCSMPStreamingPublishCorrelatingEventHandler () {
118158 @ Override
@@ -131,37 +171,49 @@ public void handleErrorEx(Object o, JCSMPException e, long l) {
131171 producer .send (msg , TOPIC );
132172 }
133173
134- private String consumeMessageFromSolace (JCSMPSession session ) {
135- CountDownLatch latch = new CountDownLatch (1 );
174+ private static void consumeMessageFromTopics (JCSMPSession session ) {
136175 try {
137- String [] result = new String [1 ];
138- XMLMessageConsumer cons = session .getMessageConsumer (
139- new XMLMessageListener () {
140- @ Override
141- public void onReceive (BytesXMLMessage msg ) {
142- if (msg instanceof TextMessage ) {
143- TextMessage textMessage = (TextMessage ) msg ;
144- String message = textMessage .getText ();
145- result [0 ] = message ;
146- LOGGER .info ("TextMessage received: " + message );
147- }
148- latch .countDown ();
149- }
150-
151- @ Override
152- public void onException (JCSMPException e ) {
153- LOGGER .error ("Exception received: " + e .getMessage ());
154- latch .countDown ();
155- }
156- }
157- );
176+ TestConsumer listener = new TestConsumer ();
177+ XMLMessageConsumer cons = session .getMessageConsumer (listener );
158178 session .addSubscription (TOPIC );
159179 cons .start ();
160- publishMessageToSolace (session );
161- assertThat (latch .await (10L , TimeUnit .SECONDS )).isTrue ();
162- return result [0 ];
180+ publishMessageToSolaceTopic (session );
181+ listener .waitForMessage ();
163182 } catch (Exception e ) {
164- throw new RuntimeException ("Cannot receive message from solace" , e );
183+ throw new RuntimeException ("Cannot process message using solace: " + e .getMessage (), e );
184+ }
185+ }
186+
187+ static class TestConsumer implements XMLMessageListener {
188+
189+ private final CountDownLatch latch = new CountDownLatch (1 );
190+
191+ private String result ;
192+
193+ @ Override
194+ public void onReceive (BytesXMLMessage msg ) {
195+ if (msg instanceof TextMessage ) {
196+ TextMessage textMessage = (TextMessage ) msg ;
197+ String message = textMessage .getText ();
198+ result = message ;
199+ LOGGER .info ("Message received: " + message );
200+ }
201+ latch .countDown ();
202+ }
203+
204+ @ Override
205+ public void onException (JCSMPException e ) {
206+ LOGGER .error ("Exception received: " + e .getMessage ());
207+ latch .countDown ();
208+ }
209+
210+ private void waitForMessage () {
211+ try {
212+ assertThat (latch .await (10L , TimeUnit .SECONDS )).isTrue ();
213+ assertThat (result ).isEqualTo (MESSAGE );
214+ } catch (Exception e ) {
215+ throw new RuntimeException ("Cannot receive message from solace: " + e .getMessage (), e );
216+ }
165217 }
166218 }
167219}
0 commit comments