11package org .testcontainers .containers ;
22
3- import org .apache .pulsar .client .admin .ListTopicsOptions ;
43import org .apache .pulsar .client .admin .PulsarAdmin ;
54import org .apache .pulsar .client .admin .PulsarAdminException ;
6- import org .apache .pulsar .client .api .Consumer ;
7- import org .apache .pulsar .client .api .Message ;
8- import org .apache .pulsar .client .api .Producer ;
9- import org .apache .pulsar .client .api .PulsarClient ;
10- import org .apache .pulsar .client .api .Schema ;
11- import org .apache .pulsar .client .api .SubscriptionInitialPosition ;
12- import org .apache .pulsar .client .api .transaction .Transaction ;
135import org .junit .Test ;
146import org .testcontainers .utility .DockerImageName ;
157
168import java .time .Duration ;
17- import java .util .List ;
18- import java .util .concurrent .CompletableFuture ;
19- import java .util .concurrent .TimeUnit ;
209
2110import static org .assertj .core .api .Assertions .assertThat ;
2211import static org .assertj .core .api .Assertions .assertThatThrownBy ;
2312
24- public class PulsarContainerTest {
25-
26- public static final String TEST_TOPIC = "test_topic" ;
13+ public class PulsarContainerTest extends AbstractPulsar {
2714
2815 private static final DockerImageName PULSAR_IMAGE = DockerImageName .parse ("apachepulsar/pulsar:3.0.0" );
2916
@@ -84,7 +71,8 @@ public void shouldNotEnableFunctionsWorkerByDefault() throws Exception {
8471 public void shouldWaitForFunctionsWorkerStarted () throws Exception {
8572 try (
8673 // constructorWithFunctionsWorker {
87- PulsarContainer pulsar = new PulsarContainer (PULSAR_IMAGE ).withFunctionsWorker ();
74+ PulsarContainer pulsar = new PulsarContainer (DockerImageName .parse ("apachepulsar/pulsar:3.0.0" ))
75+ .withFunctionsWorker ();
8876 // }
8977 ) {
9078 pulsar .start ();
@@ -111,13 +99,6 @@ public void testTransactions() throws Exception {
11199 }
112100 }
113101
114- private void assertTransactionsTopicCreated (PulsarAdmin pulsarAdmin ) throws PulsarAdminException {
115- final List <String > topics = pulsarAdmin
116- .topics ()
117- .getPartitionedTopicList ("pulsar/system" , ListTopicsOptions .builder ().includeSystemTopic (true ).build ());
118- assertThat (topics ).contains ("persistent://pulsar/system/transaction_coordinator_assign" );
119- }
120-
121102 @ Test
122103 public void testTransactionsAndFunctionsWorker () throws Exception {
123104 try (PulsarContainer pulsar = new PulsarContainer (PULSAR_IMAGE ).withTransactions ().withFunctionsWorker ()) {
@@ -149,41 +130,4 @@ public void testStartupTimeoutIsHonored() {
149130 .hasRootCauseMessage ("Precondition failed: timeout must be greater than zero" );
150131 }
151132 }
152-
153- protected void testPulsarFunctionality (String pulsarBrokerUrl ) throws Exception {
154- try (
155- PulsarClient client = PulsarClient .builder ().serviceUrl (pulsarBrokerUrl ).build ();
156- Consumer consumer = client .newConsumer ().topic (TEST_TOPIC ).subscriptionName ("test-subs" ).subscribe ();
157- Producer <byte []> producer = client .newProducer ().topic (TEST_TOPIC ).create ()
158- ) {
159- producer .send ("test containers" .getBytes ());
160- CompletableFuture <Message > future = consumer .receiveAsync ();
161- Message message = future .get (5 , TimeUnit .SECONDS );
162-
163- assertThat (new String (message .getData ())).isEqualTo ("test containers" );
164- }
165- }
166-
167- protected void testTransactionFunctionality (String pulsarBrokerUrl ) throws Exception {
168- try (
169- PulsarClient client = PulsarClient .builder ().serviceUrl (pulsarBrokerUrl ).enableTransaction (true ).build ();
170- Consumer <String > consumer = client
171- .newConsumer (Schema .STRING )
172- .topic ("transaction-topic" )
173- .subscriptionInitialPosition (SubscriptionInitialPosition .Earliest )
174- .subscriptionName ("test-transaction-sub" )
175- .subscribe ();
176- Producer <String > producer = client
177- .newProducer (Schema .STRING )
178- .sendTimeout (0 , TimeUnit .SECONDS )
179- .topic ("transaction-topic" )
180- .create ()
181- ) {
182- final Transaction transaction = client .newTransaction ().build ().get ();
183- producer .newMessage (transaction ).value ("first" ).send ();
184- transaction .commit ();
185- Message <String > message = consumer .receive ();
186- assertThat (message .getValue ()).isEqualTo ("first" );
187- }
188- }
189133}
0 commit comments