diff --git a/pom.xml b/pom.xml
index b7db4e586ba55..48fee44bb2ed5 100644
--- a/pom.xml
+++ b/pom.xml
@@ -2565,6 +2565,9 @@ flexible messaging model and an intuitive client API.
pulsar-package-management
+ pulsar-inttest-lib
+ pulsar-inttest-client
+
distribution
docker
@@ -2621,6 +2624,9 @@ flexible messaging model and an intuitive client API.
pulsar-client-messagecrypto-bc
+ pulsar-inttest-lib
+ pulsar-inttest-client
+
distribution
pulsar-metadata
diff --git a/pulsar-inttest-client/pom.xml b/pulsar-inttest-client/pom.xml
new file mode 100644
index 0000000000000..70ff707364693
--- /dev/null
+++ b/pulsar-inttest-client/pom.xml
@@ -0,0 +1,100 @@
+
+
+
+ 4.0.0
+
+ org.apache.pulsar
+ pulsar
+ 4.1.0-SNAPSHOT
+
+
+ pulsar-inttest-client
+ Pulsar Integration Common Client Test Classes
+
+
+
+ org.apache.pulsar
+ pulsar-inttest-lib
+ ${project.version}
+
+
+ org.assertj
+ assertj-core
+ ${assertj-core.version}
+
+
+ org.apache.pulsar
+ pulsar-client-api
+ ${project.version}
+
+
+ org.apache.pulsar
+ pulsar-client-admin-api
+ ${project.version}
+
+
+ org.awaitility
+ awaitility
+ ${awaitility.version}
+ compile
+
+
+
+
+
+
+ org.gaul
+ modernizer-maven-plugin
+
+ true
+ 8
+
+
+
+ modernizer
+ verify
+
+ modernizer
+
+
+
+
+
+
+ org.apache.maven.plugins
+ maven-checkstyle-plugin
+
+
+ checkstyle
+ verify
+
+ check
+
+
+
+
+
+
+
+
diff --git a/pulsar-inttest-client/src/main/java/org/apache/pulsar/tests/integration/GeoRepIntegTest.java b/pulsar-inttest-client/src/main/java/org/apache/pulsar/tests/integration/GeoRepIntegTest.java
new file mode 100644
index 0000000000000..b23100d67972a
--- /dev/null
+++ b/pulsar-inttest-client/src/main/java/org/apache/pulsar/tests/integration/GeoRepIntegTest.java
@@ -0,0 +1,44 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.tests.integration;
+
+import java.io.IOException;
+import org.apache.pulsar.client.admin.PulsarAdmin;
+import org.apache.pulsar.client.api.PulsarClient;
+
+public class GeoRepIntegTest implements Cloneable {
+ public PulsarClient clientA;
+ public PulsarClient clientB;
+ public PulsarAdmin adminA;
+ public PulsarAdmin adminB;
+
+ public GeoRepIntegTest(PulsarClient clientA, PulsarAdmin adminA, PulsarClient clientB, PulsarAdmin adminB) {
+ this.clientA = clientA;
+ this.adminA = adminA;
+ this.clientB = clientB;
+ this.adminB = adminB;
+ }
+
+ public void close() throws IOException {
+ clientA.close();
+ clientB.close();
+ adminA.close();
+ adminB.close();
+ }
+}
diff --git a/pulsar-inttest-client/src/main/java/org/apache/pulsar/tests/integration/IntegTest.java b/pulsar-inttest-client/src/main/java/org/apache/pulsar/tests/integration/IntegTest.java
new file mode 100644
index 0000000000000..a49f50694e990
--- /dev/null
+++ b/pulsar-inttest-client/src/main/java/org/apache/pulsar/tests/integration/IntegTest.java
@@ -0,0 +1,38 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.tests.integration;
+
+import java.io.IOException;
+import org.apache.pulsar.client.admin.PulsarAdmin;
+import org.apache.pulsar.client.api.PulsarClient;
+
+public class IntegTest implements Cloneable {
+ public PulsarClient client;
+ public PulsarAdmin admin;
+
+ public IntegTest(PulsarClient client, PulsarAdmin admin) {
+ this.client = client;
+ this.admin = admin;
+ }
+
+ public void close() throws IOException {
+ client.close();
+ admin.close();
+ }
+}
diff --git a/pulsar-inttest-client/src/main/java/org/apache/pulsar/tests/integration/messaging/DelayMessaging.java b/pulsar-inttest-client/src/main/java/org/apache/pulsar/tests/integration/messaging/DelayMessaging.java
new file mode 100644
index 0000000000000..d6e4c0dd2ebd0
--- /dev/null
+++ b/pulsar-inttest-client/src/main/java/org/apache/pulsar/tests/integration/messaging/DelayMessaging.java
@@ -0,0 +1,105 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.tests.integration.messaging;
+
+import static org.apache.pulsar.tests.integration.utils.IntegTestUtils.getPartitionedTopic;
+import static org.assertj.core.api.Assertions.assertThat;
+import java.util.concurrent.TimeUnit;
+import lombok.Cleanup;
+import lombok.extern.slf4j.Slf4j;
+import org.apache.pulsar.client.admin.PulsarAdmin;
+import org.apache.pulsar.client.api.Consumer;
+import org.apache.pulsar.client.api.DeadLetterPolicy;
+import org.apache.pulsar.client.api.Message;
+import org.apache.pulsar.client.api.Producer;
+import org.apache.pulsar.client.api.PulsarClient;
+import org.apache.pulsar.client.api.SubscriptionInitialPosition;
+import org.apache.pulsar.client.api.SubscriptionType;
+import org.apache.pulsar.tests.integration.IntegTest;
+
+/**
+ * Delay messaging test.
+ */
+@Slf4j
+public class DelayMessaging extends IntegTest {
+
+ public DelayMessaging(PulsarClient client, PulsarAdmin admin) {
+ super(client, admin);
+ }
+
+ public void delayMsgBlockTest() throws Exception {
+
+ String topic = getPartitionedTopic(admin, "testDelayMsgBlock", true, 3);
+
+ String retryTopic = topic + "-RETRY";
+ String deadLetterTopic = topic + "-DLT";
+
+ @Cleanup
+ Producer producer = client.newProducer()
+ .topic(topic)
+ .create();
+
+ final int redeliverCnt = 10;
+ final int delayTimeSeconds = 5;
+ @Cleanup
+ Consumer consumer = client.newConsumer()
+ .topic(topic)
+ .subscriptionName("test")
+ .subscriptionType(SubscriptionType.Shared)
+ .subscriptionInitialPosition(SubscriptionInitialPosition.Earliest)
+ .enableRetry(true)
+ .deadLetterPolicy(DeadLetterPolicy.builder()
+ .maxRedeliverCount(redeliverCnt)
+ .retryLetterTopic(retryTopic)
+ .deadLetterTopic(deadLetterTopic)
+ .build())
+ .receiverQueueSize(100)
+ .ackTimeout(60, TimeUnit.SECONDS)
+ .subscribe();
+
+ producer.newMessage().value("hello".getBytes()).send();
+
+ // receive message at first time
+ Message message = consumer.receive(delayTimeSeconds * 2, TimeUnit.SECONDS);
+ assertThat(message).isNotNull().as("Can't receive message at the first time.");
+ consumer.reconsumeLater(message, delayTimeSeconds, TimeUnit.SECONDS);
+
+ // receive retry messages
+ for (int i = 0; i < redeliverCnt; i++) {
+ message = consumer.receive(delayTimeSeconds * 2, TimeUnit.SECONDS);
+ assertThat(message)
+ .isNotNull()
+ .as("Consumer can't receive message in double delayTimeSeconds time "
+ + delayTimeSeconds * 2 + "s");
+ log.info("receive msg. reConsumeTimes: {}", message.getProperty("RECONSUMETIMES"));
+ consumer.reconsumeLater(message, delayTimeSeconds, TimeUnit.SECONDS);
+ }
+
+ @Cleanup
+ Consumer dltConsumer = client.newConsumer()
+ .topic(deadLetterTopic)
+ .subscriptionInitialPosition(SubscriptionInitialPosition.Earliest)
+ .subscriptionName("test")
+ .subscribe();
+
+ message = dltConsumer.receive(10, TimeUnit.SECONDS);
+ assertThat(message).isNotNull().as("Dead letter topic consumer can't receive message.");
+ }
+
+}
diff --git a/pulsar-inttest-client/src/main/java/org/apache/pulsar/tests/integration/messaging/GeoReplication.java b/pulsar-inttest-client/src/main/java/org/apache/pulsar/tests/integration/messaging/GeoReplication.java
new file mode 100644
index 0000000000000..03b14ae9a1d7d
--- /dev/null
+++ b/pulsar-inttest-client/src/main/java/org/apache/pulsar/tests/integration/messaging/GeoReplication.java
@@ -0,0 +1,84 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.tests.integration.messaging;
+
+import static org.assertj.core.api.Assertions.assertThat;
+import static org.assertj.core.api.Assertions.fail;
+import java.nio.charset.StandardCharsets;
+import java.util.UUID;
+import java.util.concurrent.TimeUnit;
+import lombok.Cleanup;
+import lombok.extern.slf4j.Slf4j;
+import org.apache.pulsar.client.admin.PulsarAdmin;
+import org.apache.pulsar.client.api.Consumer;
+import org.apache.pulsar.client.api.Message;
+import org.apache.pulsar.client.api.Producer;
+import org.apache.pulsar.client.api.PulsarClient;
+import org.apache.pulsar.tests.integration.GeoRepIntegTest;
+import org.awaitility.Awaitility;
+
+/**
+ * Geo replication test.
+ */
+@Slf4j
+public class GeoReplication extends GeoRepIntegTest {
+
+ public GeoReplication(PulsarClient clientA, PulsarAdmin adminA, PulsarClient clientB, PulsarAdmin adminB) {
+ super(clientA, adminA, clientB, adminB);
+ }
+
+ public void testTopicReplication(String domain) throws Exception {
+
+ String topic = domain + "://public/default/testTopicReplication-" + UUID.randomUUID();
+ Awaitility.await().atMost(10, TimeUnit.SECONDS).untilAsserted(() -> {
+ try {
+ adminA.topics().createPartitionedTopic(topic, 10);
+ } catch (Exception e) {
+ log.error("Failed to create partitioned topic {}.", topic, e);
+ fail("Failed to create partitioned topic " + topic);
+ }
+ assertThat(adminA.topics().getPartitionedTopicMetadata(topic).partitions).isEqualTo(10);
+ });
+ log.info("Test geo-replication produce and consume for topic {}.", topic);
+
+ @Cleanup
+ Producer p = clientA.newProducer()
+ .topic(topic)
+ .create();
+ log.info("Successfully create producer in cluster {} for topic {}.", "cluster1", topic);
+
+ @Cleanup
+ Consumer c = clientB.newConsumer()
+ .topic(topic)
+ .subscriptionName("geo-sub")
+ .subscribe();
+ log.info("Successfully create consumer in cluster {} for topic {}.", "cluster2", topic);
+
+ for (int i = 0; i < 10; i++) {
+ p.send(String.format("Message [%d]", i).getBytes(StandardCharsets.UTF_8));
+ }
+ log.info("Successfully produce message to cluster {} for topic {}.", "cluster1", topic);
+
+ for (int i = 0; i < 10; i++) {
+ Message message = c.receive(10, TimeUnit.SECONDS);
+ assertThat(message).isNotNull();
+ }
+ log.info("Successfully consume message from cluster {} for topic {}.", "cluster2", topic);
+ }
+}
diff --git a/pulsar-inttest-client/src/main/java/org/apache/pulsar/tests/integration/messaging/NonDurableConsumerMessaging.java b/pulsar-inttest-client/src/main/java/org/apache/pulsar/tests/integration/messaging/NonDurableConsumerMessaging.java
new file mode 100644
index 0000000000000..55b3d7e5c11d7
--- /dev/null
+++ b/pulsar-inttest-client/src/main/java/org/apache/pulsar/tests/integration/messaging/NonDurableConsumerMessaging.java
@@ -0,0 +1,71 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.tests.integration.messaging;
+
+import static java.nio.charset.StandardCharsets.UTF_8;
+import static org.apache.pulsar.tests.integration.utils.IntegTestUtils.getNonPartitionedTopic;
+import static org.assertj.core.api.Assertions.assertThat;
+import java.util.stream.IntStream;
+import org.apache.pulsar.client.admin.PulsarAdmin;
+import org.apache.pulsar.client.api.Consumer;
+import org.apache.pulsar.client.api.Message;
+import org.apache.pulsar.client.api.Producer;
+import org.apache.pulsar.client.api.PulsarClient;
+import org.apache.pulsar.client.api.SubscriptionInitialPosition;
+import org.apache.pulsar.client.api.SubscriptionMode;
+import org.apache.pulsar.tests.integration.IntegTest;
+
+public class NonDurableConsumerMessaging extends IntegTest {
+
+ public NonDurableConsumerMessaging(PulsarClient client, PulsarAdmin admin) {
+ super(client, admin);
+ }
+
+ public void testNonDurableConsumer() throws Exception {
+ final String topicName = getNonPartitionedTopic(admin, "test-non-durable-consumer", false);
+
+ int numMessages = 20;
+
+ try (final Producer producer = client.newProducer()
+ .topic(topicName)
+ .create()) {
+
+ IntStream.range(0, numMessages).forEach(i -> {
+ String payload = "message-" + i;
+ producer.sendAsync(payload.getBytes(UTF_8));
+ });
+ // flush the producer to make sure all messages are persisted
+ producer.flush();
+
+ try (final Consumer consumer = client.newConsumer()
+ .topic(topicName)
+ .subscriptionName("non-durable-consumer")
+ .subscriptionMode(SubscriptionMode.NonDurable)
+ .subscriptionInitialPosition(SubscriptionInitialPosition.Earliest)
+ .subscribe()) {
+
+ for (int i = 0; i < numMessages; i++) {
+ Message msg = consumer.receive();
+ assertThat(new String(msg.getValue(), UTF_8)).isEqualTo("message-" + i);
+ }
+ }
+ }
+
+ }
+}
diff --git a/pulsar-inttest-client/src/main/java/org/apache/pulsar/tests/integration/messaging/ReaderMessaging.java b/pulsar-inttest-client/src/main/java/org/apache/pulsar/tests/integration/messaging/ReaderMessaging.java
new file mode 100644
index 0000000000000..972c763129086
--- /dev/null
+++ b/pulsar-inttest-client/src/main/java/org/apache/pulsar/tests/integration/messaging/ReaderMessaging.java
@@ -0,0 +1,127 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.tests.integration.messaging;
+
+import static org.apache.pulsar.tests.integration.utils.IntegTestUtils.getNonPartitionedTopic;
+import static org.assertj.core.api.Assertions.assertThat;
+import java.util.concurrent.TimeUnit;
+import lombok.Cleanup;
+import lombok.extern.slf4j.Slf4j;
+import org.apache.pulsar.client.admin.PulsarAdmin;
+import org.apache.pulsar.client.api.Message;
+import org.apache.pulsar.client.api.MessageId;
+import org.apache.pulsar.client.api.Producer;
+import org.apache.pulsar.client.api.PulsarClient;
+import org.apache.pulsar.client.api.Reader;
+import org.apache.pulsar.client.api.Schema;
+import org.apache.pulsar.tests.integration.IntegTest;
+
+@Slf4j
+public class ReaderMessaging extends IntegTest {
+
+ public ReaderMessaging(PulsarClient client, PulsarAdmin admin) {
+ super(client, admin);
+ }
+
+ public void testReaderReconnectAndRead() throws Exception {
+ log.info("-- Starting testReaderReconnectAndRead test --");
+ final String topicName = getNonPartitionedTopic(admin, "test-reader-reconnect-read", false);
+ @Cleanup final Reader reader = client.newReader(Schema.STRING)
+ .topic(topicName)
+ .subscriptionName("test-sub")
+ // Here we need to make sure that setting the startMessageId should not cause a change in the
+ // behavior of the reader under non.
+ .startMessageId(MessageId.earliest)
+ .create();
+
+ final int messagesToSend = 10;
+ @Cleanup final Producer producer = client.newProducer(Schema.STRING)
+ .topic(topicName)
+ .enableBatching(false)
+ .create();
+ for (int i = 0; i < messagesToSend; i++) {
+ MessageId messageId = producer.newMessage().value("message-" + i).send();
+ assertThat(messageId).isNotNull();
+ }
+
+ for (int i = 0; i < messagesToSend; i++) {
+ Message msg = reader.readNext();
+ assertThat(msg.getValue()).isEqualTo("message-" + i);
+ }
+
+ admin.topics().unload(topicName);
+
+ for (int i = 0; i < messagesToSend; i++) {
+ MessageId messageId = producer.newMessage().value("message-" + i).send();
+ assertThat(messageId).isNotNull();
+ }
+
+ for (int i = 0; i < messagesToSend; i++) {
+ Message msg = reader.readNext();
+ assertThat(msg.getValue()).isEqualTo("message-" + i);
+ }
+
+ log.info("-- Exiting testReaderReconnectAndRead test --");
+ }
+
+ public void testReaderReconnectAndReadBatchMessages()
+ throws Exception {
+ log.info("-- Starting testReaderReconnectAndReadBatchMessages test --");
+ final String topicName = getNonPartitionedTopic(admin, "test-reader-reconnect-read-batch", false);
+ @Cleanup final Reader reader = client.newReader(Schema.STRING)
+ .topic(topicName)
+ .subscriptionName("test-sub")
+ // Here we need to make sure that setting the startMessageId should not cause a change in the
+ // behavior of the reader under non.
+ .startMessageId(MessageId.earliest)
+ .create();
+
+ final int messagesToSend = 10;
+ @Cleanup final Producer producer = client.newProducer(Schema.STRING)
+ .topic(topicName)
+ .enableBatching(true)
+ .batchingMaxPublishDelay(5, TimeUnit.SECONDS)
+ .batchingMaxMessages(5)
+ .create();
+
+ for (int i = 0; i < messagesToSend; i++) {
+ MessageId messageId = producer.newMessage().value("message-" + i).send();
+ assertThat(messageId).isNotNull();
+ }
+
+ for (int i = 0; i < messagesToSend; i++) {
+ Message msg = reader.readNext();
+ assertThat(msg.getValue()).isEqualTo("message-" + i);
+ }
+
+ admin.topics().unload(topicName);
+
+ for (int i = 0; i < messagesToSend; i++) {
+ MessageId messageId = producer.newMessage().value("message-" + i).send();
+ assertThat(messageId).isNotNull();
+ }
+
+ for (int i = 0; i < messagesToSend; i++) {
+ Message msg = reader.readNext();
+ assertThat(msg.getValue()).isEqualTo("message-" + i);
+ }
+
+ log.info("-- Exiting testReaderReconnectAndReadBatchMessages test --");
+ }
+}
diff --git a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/messaging/TopicMessagingBase.java b/pulsar-inttest-client/src/main/java/org/apache/pulsar/tests/integration/messaging/TopicMessaging.java
similarity index 55%
rename from tests/integration/src/test/java/org/apache/pulsar/tests/integration/messaging/TopicMessagingBase.java
rename to pulsar-inttest-client/src/main/java/org/apache/pulsar/tests/integration/messaging/TopicMessaging.java
index 97e642adf37db..6a9e4e7be56a6 100644
--- a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/messaging/TopicMessagingBase.java
+++ b/pulsar-inttest-client/src/main/java/org/apache/pulsar/tests/integration/messaging/TopicMessaging.java
@@ -18,14 +18,26 @@
*/
package org.apache.pulsar.tests.integration.messaging;
-import static org.testng.Assert.assertEquals;
-import static org.testng.Assert.assertNotNull;
-import static org.testng.Assert.assertTrue;
-import static org.testng.Assert.fail;
-
+import static org.apache.pulsar.tests.integration.utils.IntegTestUtils.getNonPartitionedTopic;
+import static org.apache.pulsar.tests.integration.utils.IntegTestUtils.getPartitionedTopic;
+import static org.assertj.core.api.Assertions.assertThat;
+import static org.assertj.core.api.Assertions.fail;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.Objects;
+import java.util.Set;
+import java.util.UUID;
+import java.util.concurrent.TimeUnit;
import lombok.Cleanup;
import lombok.extern.slf4j.Slf4j;
+import org.apache.pulsar.client.admin.PulsarAdmin;
import org.apache.pulsar.client.api.Consumer;
+import org.apache.pulsar.client.api.Message;
import org.apache.pulsar.client.api.MessageId;
import org.apache.pulsar.client.api.MessageRoutingMode;
import org.apache.pulsar.client.api.Producer;
@@ -33,24 +45,19 @@
import org.apache.pulsar.client.api.PulsarClientException;
import org.apache.pulsar.client.api.Schema;
import org.apache.pulsar.client.api.SubscriptionType;
-
-import java.util.ArrayList;
-import java.util.Collections;
-import java.util.List;
-import java.util.UUID;
+import org.apache.pulsar.tests.integration.IntegTest;
@Slf4j
-public class TopicMessagingBase extends MessagingBase {
+public class TopicMessaging extends IntegTest {
- protected void nonPartitionedTopicSendAndReceiveWithExclusive(String serviceUrl, boolean isPersistent) throws Exception {
- log.info("-- Starting {} test --", methodName);
- final String topicName = getNonPartitionedTopic("test-non-partitioned-consume-exclusive", isPersistent);
- @Cleanup
- final PulsarClient client = PulsarClient.builder()
- .serviceUrl(serviceUrl)
- .build();
- @Cleanup
- final Consumer consumer = client.newConsumer(Schema.STRING)
+ public TopicMessaging(PulsarClient client, PulsarAdmin admin) {
+ super(client, admin);
+ }
+
+ public void nonPartitionedTopicSendAndReceiveWithExclusive(boolean isPersistent) throws Exception {
+ log.info("-- Starting nonPartitionedTopicSendAndReceiveWithExclusive test --");
+ final String topicName = getNonPartitionedTopic(admin, "test-non-partitioned-consume-exclusive", isPersistent);
+ @Cleanup final Consumer consumer = client.newConsumer(Schema.STRING)
.topic(topicName)
.subscriptionName("test-sub")
.subscriptionType(SubscriptionType.Exclusive)
@@ -66,29 +73,24 @@ protected void nonPartitionedTopicSendAndReceiveWithExclusive(String serviceUrl,
}
final int messagesToSend = 10;
final String producerName = "producerForExclusive";
- @Cleanup
- final Producer producer = client.newProducer(Schema.STRING)
+ @Cleanup final Producer producer = client.newProducer(Schema.STRING)
.topic(topicName)
.enableBatching(false)
.producerName(producerName)
.create();
for (int i = 0; i < messagesToSend; i++) {
MessageId messageId = producer.newMessage().value(producer.getProducerName() + "-" + i).send();
- assertNotNull(messageId);
+ assertThat(messageId).isNotNull();
}
log.info("public messages complete.");
receiveMessagesCheckOrderAndDuplicate(Collections.singletonList(consumer), messagesToSend);
- log.info("-- Exiting {} test --", methodName);
+ log.info("-- Exiting nonPartitionedTopicSendAndReceiveWithExclusive test --");
}
- protected void partitionedTopicSendAndReceiveWithExclusive(String serviceUrl, boolean isPersistent) throws Exception {
- log.info("-- Starting {} test --", methodName);
+ public void partitionedTopicSendAndReceiveWithExclusive(boolean isPersistent) throws Exception {
+ log.info("-- Starting partitionedTopicSendAndReceiveWithExclusive test --");
final int partitions = 3;
- String topicName = getPartitionedTopic("test-partitioned-consume-exclusive", isPersistent, partitions);
- @Cleanup
- final PulsarClient client = PulsarClient.builder()
- .serviceUrl(serviceUrl)
- .build();
+ String topicName = getPartitionedTopic(admin, "test-partitioned-consume-exclusive", isPersistent, partitions);
List> consumerList = new ArrayList<>(3);
for (int i = 0; i < partitions; i++) {
Consumer consumer = client.newConsumer(Schema.STRING)
@@ -98,7 +100,7 @@ protected void partitionedTopicSendAndReceiveWithExclusive(String serviceUrl, bo
.subscribe();
consumerList.add(consumer);
}
- assertEquals(partitions, consumerList.size());
+ assertThat(consumerList.size()).isEqualTo(partitions);
try {
client.newConsumer(Schema.STRING)
.topic(topicName + "-partition-" + 0)
@@ -110,8 +112,7 @@ protected void partitionedTopicSendAndReceiveWithExclusive(String serviceUrl, bo
}
final int messagesToSend = 9;
final String producerName = "producerForExclusive";
- @Cleanup
- final Producer producer = client.newProducer(Schema.STRING)
+ @Cleanup final Producer producer = client.newProducer(Schema.STRING)
.topic(topicName)
.enableBatching(false)
.producerName(producerName)
@@ -119,7 +120,7 @@ protected void partitionedTopicSendAndReceiveWithExclusive(String serviceUrl, bo
.create();
for (int i = 0; i < messagesToSend; i++) {
MessageId messageId = producer.newMessage().value(producer.getProducerName() + "-" + i).send();
- assertNotNull(messageId);
+ assertThat(messageId).isNotNull();
}
log.info("public messages complete.");
receiveMessagesCheckOrderAndDuplicate(consumerList, messagesToSend);
@@ -128,20 +129,16 @@ protected void partitionedTopicSendAndReceiveWithExclusive(String serviceUrl, bo
crashedConsumer.close();
for (int i = 0; i < messagesToSend; i++) {
MessageId messageId = producer.newMessage().value(producer.getProducerName() + "-" + i).send();
- assertNotNull(messageId);
+ assertThat(messageId).isNotNull();
}
receiveMessagesCheckOrderAndDuplicate(consumerList, messagesToSend - 3);
closeConsumers(consumerList);
- log.info("-- Exiting {} test --", methodName);
+ log.info("-- Exiting partitionedTopicSendAndReceiveWithExclusive test --");
}
- protected void nonPartitionedTopicSendAndReceiveWithFailover(String serviceUrl, boolean isPersistent) throws Exception {
- log.info("-- Starting {} test --", methodName);
- final String topicName = getNonPartitionedTopic("test-non-partitioned-consume-failover", isPersistent);
- @Cleanup
- final PulsarClient client = PulsarClient.builder()
- .serviceUrl(serviceUrl)
- .build();
+ public void nonPartitionedTopicSendAndReceiveWithFailover(boolean isPersistent) throws Exception {
+ log.info("-- Starting nonPartitionedTopicSendAndReceiveWithFailover test --");
+ final String topicName = getNonPartitionedTopic(admin, "test-non-partitioned-consume-failover", isPersistent);
List> consumerList = new ArrayList<>(2);
final Consumer consumer = client.newConsumer(Schema.STRING)
.topic(topicName)
@@ -154,20 +151,19 @@ protected void nonPartitionedTopicSendAndReceiveWithFailover(String serviceUrl,
.subscriptionName("test-sub")
.subscriptionType(SubscriptionType.Failover)
.subscribe();
- assertNotNull(standbyConsumer);
- assertTrue(standbyConsumer.isConnected());
+ assertThat(standbyConsumer).isNotNull();
+ assertThat(standbyConsumer.isConnected()).isTrue();
consumerList.add(standbyConsumer);
final int messagesToSend = 10;
final String producerName = "producerForFailover";
- @Cleanup
- final Producer producer = client.newProducer(Schema.STRING)
+ @Cleanup final Producer producer = client.newProducer(Schema.STRING)
.topic(topicName)
.enableBatching(false)
.producerName(producerName)
.create();
for (int i = 0; i < messagesToSend; i++) {
MessageId messageId = producer.newMessage().value(producer.getProducerName() + "-" + i).send();
- assertNotNull(messageId);
+ assertThat(messageId).isNotNull();
}
log.info("public messages complete.");
receiveMessagesCheckOrderAndDuplicate(consumerList, messagesToSend);
@@ -177,22 +173,18 @@ protected void nonPartitionedTopicSendAndReceiveWithFailover(String serviceUrl,
Thread.sleep(3000);
crashedConsumer.close();
for (int i = 0; i < messagesToSend; i++) {
- MessageId messageId = producer.newMessage().value(producer.getProducerName() + "-" + i).send();
- assertNotNull(messageId);
+ MessageId messageId = producer.newMessage().value(producer.getProducerName() + "-" + i).send();
+ assertThat(messageId).isNotNull();
}
receiveMessagesCheckOrderAndDuplicate(consumerList, messagesToSend);
closeConsumers(consumerList);
- log.info("-- Exiting {} test --", methodName);
+ log.info("-- Exiting nonPartitionedTopicSendAndReceiveWithFailover test --");
}
- protected void partitionedTopicSendAndReceiveWithFailover(String serviceUrl, boolean isPersistent) throws Exception {
- log.info("-- Starting {} test --", methodName);
+ public void partitionedTopicSendAndReceiveWithFailover(boolean isPersistent) throws Exception {
+ log.info("-- Starting partitionedTopicSendAndReceiveWithFailover test --");
final int partitions = 3;
- String topicName = getPartitionedTopic("test-partitioned-consume-failover", isPersistent, partitions);
- @Cleanup
- final PulsarClient client = PulsarClient.builder()
- .serviceUrl(serviceUrl)
- .build();
+ String topicName = getPartitionedTopic(admin, "test-partitioned-consume-failover", isPersistent, partitions);
List> consumerList = new ArrayList<>(3);
Consumer consumer = client.newConsumer(Schema.STRING)
.topic(topicName)
@@ -205,21 +197,20 @@ protected void partitionedTopicSendAndReceiveWithFailover(String serviceUrl, boo
.subscriptionName("test-sub")
.subscriptionType(SubscriptionType.Failover)
.subscribe();
- assertNotNull(standbyConsumer);
- assertTrue(standbyConsumer.isConnected());
+ assertThat(standbyConsumer).isNotNull();
+ assertThat(standbyConsumer.isConnected()).isTrue();
consumerList.add(standbyConsumer);
- assertEquals(consumerList.size(), 2);
+ assertThat(consumerList.size()).isEqualTo(2);
final int messagesToSend = 9;
final String producerName = "producerForFailover";
- @Cleanup
- final Producer producer = client.newProducer(Schema.STRING)
+ @Cleanup final Producer producer = client.newProducer(Schema.STRING)
.topic(topicName)
.enableBatching(false)
.producerName(producerName)
.create();
for (int i = 0; i < messagesToSend; i++) {
MessageId messageId = producer.newMessage().value(producer.getProducerName() + "-" + i).send();
- assertNotNull(messageId);
+ assertThat(messageId).isNotNull();
}
log.info("public messages complete.");
receiveMessagesCheckOrderAndDuplicate(consumerList, messagesToSend);
@@ -230,20 +221,16 @@ protected void partitionedTopicSendAndReceiveWithFailover(String serviceUrl, boo
crashedConsumer.close();
for (int i = 0; i < messagesToSend; i++) {
MessageId messageId = producer.newMessage().value(producer.getProducerName() + "-" + i).send();
- assertNotNull(messageId);
+ assertThat(messageId).isNotNull();
}
receiveMessagesCheckOrderAndDuplicate(consumerList, messagesToSend);
closeConsumers(consumerList);
- log.info("-- Exiting {} test --", methodName);
+ log.info("-- Exiting partitionedTopicSendAndReceiveWithFailover test --");
}
- protected void nonPartitionedTopicSendAndReceiveWithShared(String serviceUrl, boolean isPersistent) throws Exception {
- log.info("-- Starting {} test --", methodName);
- final String topicName = getNonPartitionedTopic("test-non-partitioned-consume-shared", isPersistent);
- @Cleanup
- final PulsarClient client = PulsarClient.builder()
- .serviceUrl(serviceUrl)
- .build();
+ public void nonPartitionedTopicSendAndReceiveWithShared(boolean isPersistent) throws Exception {
+ log.info("-- Starting nonPartitionedTopicSendAndReceiveWithShared test --");
+ final String topicName = getNonPartitionedTopic(admin, "test-non-partitioned-consume-shared", isPersistent);
List> consumerList = new ArrayList<>(2);
final Consumer consumer = client.newConsumer(Schema.STRING)
.topic(topicName)
@@ -256,20 +243,19 @@ protected void nonPartitionedTopicSendAndReceiveWithShared(String serviceUrl, bo
.subscriptionName("test-sub")
.subscriptionType(SubscriptionType.Shared)
.subscribe();
- assertNotNull(moreConsumer);
- assertTrue(moreConsumer.isConnected());
+ assertThat(moreConsumer).isNotNull();
+ assertThat(moreConsumer.isConnected()).isTrue();
consumerList.add(moreConsumer);
final int messagesToSend = 10;
final String producerName = "producerForShared";
- @Cleanup
- final Producer producer = client.newProducer(Schema.STRING)
+ @Cleanup final Producer producer = client.newProducer(Schema.STRING)
.topic(topicName)
.enableBatching(false)
.producerName(producerName)
.create();
for (int i = 0; i < messagesToSend; i++) {
MessageId messageId = producer.newMessage().value(producer.getProducerName() + "-" + i).send();
- assertNotNull(messageId);
+ assertThat(messageId).isNotNull();
}
log.info("public messages complete.");
receiveMessagesCheckDuplicate(consumerList, messagesToSend);
@@ -278,21 +264,17 @@ protected void nonPartitionedTopicSendAndReceiveWithShared(String serviceUrl, bo
crashedConsumer.close();
for (int i = 0; i < messagesToSend; i++) {
MessageId messageId = producer.newMessage().value(producer.getProducerName() + "-" + i).send();
- assertNotNull(messageId);
+ assertThat(messageId).isNotNull();
}
receiveMessagesCheckDuplicate(consumerList, messagesToSend);
closeConsumers(consumerList);
- log.info("-- Exiting {} test --", methodName);
+ log.info("-- Exiting nonPartitionedTopicSendAndReceiveWithShared test --");
}
- protected void partitionedTopicSendAndReceiveWithShared(String serviceUrl, boolean isPersistent) throws Exception {
- log.info("-- Starting {} test --", methodName);
+ public void partitionedTopicSendAndReceiveWithShared(boolean isPersistent) throws Exception {
+ log.info("-- Starting partitionedTopicSendAndReceiveWithShared test --");
final int partitions = 3;
- String topicName = getPartitionedTopic("test-partitioned-consume-shared", isPersistent, partitions);
- @Cleanup
- final PulsarClient client = PulsarClient.builder()
- .serviceUrl(serviceUrl)
- .build();
+ String topicName = getPartitionedTopic(admin, "test-partitioned-consume-shared", isPersistent, partitions);
List> consumerList = new ArrayList<>(3);
for (int i = 0; i < partitions; i++) {
Consumer consumer = client.newConsumer(Schema.STRING)
@@ -302,18 +284,17 @@ protected void partitionedTopicSendAndReceiveWithShared(String serviceUrl, boole
.subscribe();
consumerList.add(consumer);
}
- assertEquals(partitions, consumerList.size());
+ assertThat(consumerList.size()).isEqualTo(partitions);
final int messagesToSend = 10;
final String producerName = "producerForFailover";
- @Cleanup
- final Producer producer = client.newProducer(Schema.STRING)
+ @Cleanup final Producer producer = client.newProducer(Schema.STRING)
.topic(topicName)
.enableBatching(false)
.producerName(producerName)
.create();
for (int i = 0; i < messagesToSend; i++) {
MessageId messageId = producer.newMessage().value(producer.getProducerName() + "-" + i).send();
- assertNotNull(messageId);
+ assertThat(messageId).isNotNull();
}
log.info("public messages complete.");
receiveMessagesCheckDuplicate(consumerList, messagesToSend);
@@ -322,40 +303,35 @@ protected void partitionedTopicSendAndReceiveWithShared(String serviceUrl, boole
crashedConsumer.close();
for (int i = 0; i < messagesToSend; i++) {
MessageId messageId = producer.newMessage().value(producer.getProducerName() + "-" + i).send();
- assertNotNull(messageId);
+ assertThat(messageId).isNotNull();
}
receiveMessagesCheckDuplicate(consumerList, messagesToSend);
closeConsumers(consumerList);
- log.info("-- Exiting {} test --", methodName);
+ log.info("-- Exiting partitionedTopicSendAndReceiveWithShared test --");
}
- protected void nonPartitionedTopicSendAndReceiveWithKeyShared(String serviceUrl, boolean isPersistent) throws Exception {
- log.info("-- Starting {} test --", methodName);
- final String topicName = getNonPartitionedTopic("test-non-partitioned-consume-key-shared", isPersistent);
- @Cleanup
- final PulsarClient client = PulsarClient.builder()
- .serviceUrl(serviceUrl)
- .build();
+ public void nonPartitionedTopicSendAndReceiveWithKeyShared(boolean isPersistent) throws Exception {
+ log.info("-- Starting nonPartitionedTopicSendAndReceiveWithKeyShared test --");
+ final String topicName = getNonPartitionedTopic(admin, "test-non-partitioned-consume-key-shared", isPersistent);
List> consumerList = new ArrayList<>(2);
Consumer consumer = client.newConsumer(Schema.STRING)
.topic(topicName)
.subscriptionName("test-sub")
.subscriptionType(SubscriptionType.Key_Shared)
.subscribe();
- assertTrue(consumer.isConnected());
+ assertThat(consumer.isConnected()).isTrue();
consumerList.add(consumer);
Consumer moreConsumer = client.newConsumer(Schema.STRING)
.topic(topicName)
.subscriptionName("test-sub")
.subscriptionType(SubscriptionType.Key_Shared)
.subscribe();
- assertNotNull(moreConsumer);
- assertTrue(moreConsumer.isConnected());
+ assertThat(moreConsumer).isNotNull();
+ assertThat(moreConsumer.isConnected()).isTrue();
consumerList.add(moreConsumer);
final int messagesToSend = 10;
final String producerName = "producerForKeyShared";
- @Cleanup
- final Producer producer = client.newProducer(Schema.STRING)
+ @Cleanup final Producer producer = client.newProducer(Schema.STRING)
.topic(topicName)
.enableBatching(false)
.producerName(producerName)
@@ -365,7 +341,7 @@ protected void nonPartitionedTopicSendAndReceiveWithKeyShared(String serviceUrl,
.key(UUID.randomUUID().toString())
.value(producer.getProducerName() + "-" + i)
.send();
- assertNotNull(messageId);
+ assertThat(messageId).isNotNull();
}
log.info("publish messages complete.");
receiveMessagesCheckStickyKeyAndDuplicate(consumerList, messagesToSend);
@@ -377,41 +353,36 @@ protected void nonPartitionedTopicSendAndReceiveWithKeyShared(String serviceUrl,
.key(UUID.randomUUID().toString())
.value(producer.getProducerName() + "-" + i)
.send();
- assertNotNull(messageId);
+ assertThat(messageId).isNotNull();
}
receiveMessagesCheckStickyKeyAndDuplicate(consumerList, messagesToSend);
closeConsumers(consumerList);
- log.info("-- Exiting {} test --", methodName);
+ log.info("-- Exiting nonPartitionedTopicSendAndReceiveWithKeyShared test --");
}
- protected void partitionedTopicSendAndReceiveWithKeyShared(String serviceUrl, boolean isPersistent) throws Exception {
- log.info("-- Starting {} test --", methodName);
+ public void partitionedTopicSendAndReceiveWithKeyShared(boolean isPersistent) throws Exception {
+ log.info("-- Starting partitionedTopicSendAndReceiveWithKeyShared test --");
final int partitions = 3;
- String topicName = getPartitionedTopic("test-partitioned-consume-key-shared", isPersistent, partitions);
- @Cleanup
- final PulsarClient client = PulsarClient.builder()
- .serviceUrl(serviceUrl)
- .build();
+ String topicName = getPartitionedTopic(admin, "test-partitioned-consume-key-shared", isPersistent, partitions);
List> consumerList = new ArrayList<>(2);
Consumer consumer = client.newConsumer(Schema.STRING)
.topic(topicName)
.subscriptionName("test-sub")
.subscriptionType(SubscriptionType.Key_Shared)
.subscribe();
- assertTrue(consumer.isConnected());
+ assertThat(consumer.isConnected()).isTrue();
consumerList.add(consumer);
Consumer moreConsumer = client.newConsumer(Schema.STRING)
.topic(topicName)
.subscriptionName("test-sub")
.subscriptionType(SubscriptionType.Key_Shared)
.subscribe();
- assertNotNull(moreConsumer);
- assertTrue(moreConsumer.isConnected());
+ assertThat(moreConsumer).isNotNull();
+ assertThat(moreConsumer.isConnected()).isTrue();
consumerList.add(moreConsumer);
final int messagesToSend = 10;
final String producerName = "producerForKeyShared";
- @Cleanup
- final Producer producer = client.newProducer(Schema.STRING)
+ @Cleanup final Producer producer = client.newProducer(Schema.STRING)
.topic(topicName)
.enableBatching(false)
.producerName(producerName)
@@ -421,7 +392,7 @@ protected void partitionedTopicSendAndReceiveWithKeyShared(String serviceUrl, bo
.key(UUID.randomUUID().toString())
.value(producer.getProducerName() + "-" + i)
.send();
- assertNotNull(messageId);
+ assertThat(messageId).isNotNull();
}
log.info("publish messages complete.");
receiveMessagesCheckStickyKeyAndDuplicate(consumerList, messagesToSend);
@@ -433,11 +404,118 @@ protected void partitionedTopicSendAndReceiveWithKeyShared(String serviceUrl, bo
.key(UUID.randomUUID().toString())
.value(producer.getProducerName() + "-" + i)
.send();
- assertNotNull(messageId);
+ assertThat(messageId).isNotNull();
}
receiveMessagesCheckStickyKeyAndDuplicate(consumerList, messagesToSend);
closeConsumers(consumerList);
- log.info("-- Exiting {} test --", methodName);
+ log.info("-- Exiting partitionedTopicSendAndReceiveWithKeyShared test --");
+ }
+
+
+ protected static > void receiveMessagesCheckOrderAndDuplicate
+ (List> consumerList, int messagesToReceive) throws PulsarClientException {
+ Set messagesReceived = new HashSet<>();
+ for (Consumer consumer : consumerList) {
+ Message currentReceived;
+ Map> lastReceivedMap = new HashMap<>();
+ while (true) {
+ try {
+ currentReceived = consumer.receive(3, TimeUnit.SECONDS);
+ } catch (PulsarClientException e) {
+ log.info("no more messages to receive for consumer {}", consumer.getConsumerName());
+ break;
+ }
+ // Make sure that messages are received in order
+ if (currentReceived != null) {
+ consumer.acknowledge(currentReceived);
+ if (lastReceivedMap.containsKey(currentReceived.getTopicName())) {
+ assertThat(currentReceived.getMessageId().compareTo(
+ lastReceivedMap.get(currentReceived.getTopicName()).getMessageId()) > 0)
+ .as("Received messages are not in order.")
+ .isTrue();
+ }
+ } else {
+ break;
+ }
+ lastReceivedMap.put(currentReceived.getTopicName(), currentReceived);
+ // Make sure that there are no duplicates
+ assertThat(messagesReceived.add(currentReceived.getValue())).
+ isTrue()
+ .as("Received duplicate message " + currentReceived.getValue());
+ }
+ }
+ assertThat(messagesReceived.size()).isEqualTo(messagesToReceive);
+ }
+
+ protected static void receiveMessagesCheckDuplicate
+ (List> consumerList, int messagesToReceive) throws PulsarClientException {
+ Set messagesReceived = new HashSet<>();
+ for (Consumer consumer : consumerList) {
+ Message currentReceived = null;
+ while (true) {
+ try {
+ currentReceived = consumer.receive(3, TimeUnit.SECONDS);
+ } catch (PulsarClientException e) {
+ log.info("no more messages to receive for consumer {}", consumer.getConsumerName());
+ break;
+ }
+ if (currentReceived != null) {
+ consumer.acknowledge(currentReceived);
+ // Make sure that there are no duplicates
+ assertThat(messagesReceived.add(currentReceived.getValue()))
+ .isTrue()
+ .as("Received duplicate message " + currentReceived.getValue());
+ } else {
+ break;
+ }
+ }
+ }
+ assertThat(messagesReceived.size()).isEqualTo(messagesToReceive);
+ }
+
+ protected static void receiveMessagesCheckStickyKeyAndDuplicate
+ (List> consumerList, int messagesToReceive) throws PulsarClientException {
+ Map> consumerKeys = new HashMap<>();
+ Set messagesReceived = new HashSet<>();
+ for (Consumer consumer : consumerList) {
+ Message currentReceived;
+ while (true) {
+ try {
+ currentReceived = consumer.receive(3, TimeUnit.SECONDS);
+ } catch (PulsarClientException e) {
+ log.info("no more messages to receive for consumer {}", consumer.getConsumerName());
+ break;
+ }
+ if (currentReceived != null) {
+ consumer.acknowledge(currentReceived);
+ assertThat(currentReceived.getKey()).isNotNull();
+ consumerKeys.putIfAbsent(consumer.getConsumerName(), new HashSet<>());
+ consumerKeys.get(consumer.getConsumerName()).add(currentReceived.getKey());
+ // Make sure that there are no duplicates
+ assertThat(messagesReceived.add(currentReceived.getValue()))
+ .isTrue()
+ .as("Received duplicate message " + currentReceived.getValue());
+ } else {
+ break;
+ }
+ }
+ }
+ // Make sure key will not be distributed to multiple consumers (except null key)
+ Set allKeys = new HashSet<>();
+ consumerKeys.forEach((k, v) -> v.stream().filter(Objects::nonNull).forEach(key -> {
+ assertThat(allKeys.add(key))
+ .isTrue()
+ .as("Key " + key + " is distributed to multiple consumers");
+ }));
+ assertThat(messagesReceived.size()).isEqualTo(messagesToReceive);
+ }
+
+ protected static void closeConsumers(List> consumerList) throws PulsarClientException {
+ Iterator> iterator = consumerList.iterator();
+ while (iterator.hasNext()) {
+ iterator.next().close();
+ iterator.remove();
+ }
}
}
diff --git a/pulsar-inttest-client/src/main/java/org/apache/pulsar/tests/integration/messaging/package-info.java b/pulsar-inttest-client/src/main/java/org/apache/pulsar/tests/integration/messaging/package-info.java
new file mode 100644
index 0000000000000..7d9f1b036c659
--- /dev/null
+++ b/pulsar-inttest-client/src/main/java/org/apache/pulsar/tests/integration/messaging/package-info.java
@@ -0,0 +1,22 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+/**
+ * Implementation of policies.
+ */
+package org.apache.pulsar.tests.integration.messaging;
\ No newline at end of file
diff --git a/pulsar-inttest-client/src/main/java/org/apache/pulsar/tests/integration/package-info.java b/pulsar-inttest-client/src/main/java/org/apache/pulsar/tests/integration/package-info.java
new file mode 100644
index 0000000000000..68c0199d5f32f
--- /dev/null
+++ b/pulsar-inttest-client/src/main/java/org/apache/pulsar/tests/integration/package-info.java
@@ -0,0 +1,22 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+/**
+ * Implementation of policies.
+ */
+package org.apache.pulsar.tests.integration;
\ No newline at end of file
diff --git a/pulsar-inttest-lib/pom.xml b/pulsar-inttest-lib/pom.xml
new file mode 100644
index 0000000000000..8cde5a3b3a745
--- /dev/null
+++ b/pulsar-inttest-lib/pom.xml
@@ -0,0 +1,104 @@
+
+
+
+ 4.0.0
+
+ org.apache.pulsar
+ pulsar
+ 4.1.0-SNAPSHOT
+
+
+ pulsar-inttest-lib
+ Pulsar Integration Common Library
+
+
+
+ com.github.docker-java
+ docker-java-core
+ ${docker-java.version}
+
+
+ org.apache.pulsar
+ pulsar-client-original
+ ${project.version}
+
+
+ org.apache.pulsar
+ pulsar-client-admin-api
+ ${project.version}
+
+
+ org.testcontainers
+ testcontainers
+ ${testcontainers.version}
+
+
+ org.apache.httpcomponents
+ httpcore
+ ${apache-httpcomponents.version}
+
+
+ org.testng
+ testng
+ ${testng.version}
+
+
+
+
+
+
+ org.gaul
+ modernizer-maven-plugin
+
+ true
+ 8
+
+
+
+ modernizer
+ verify
+
+ modernizer
+
+
+
+
+
+
+ org.apache.maven.plugins
+ maven-checkstyle-plugin
+
+
+ checkstyle
+ verify
+
+ check
+
+
+
+
+
+
+
+
diff --git a/pulsar-inttest-lib/src/main/java/org/apache/pulsar/tests/TestRetrySupport.java b/pulsar-inttest-lib/src/main/java/org/apache/pulsar/tests/TestRetrySupport.java
new file mode 100644
index 0000000000000..fd368bd01915f
--- /dev/null
+++ b/pulsar-inttest-lib/src/main/java/org/apache/pulsar/tests/TestRetrySupport.java
@@ -0,0 +1,111 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.tests;
+
+import java.lang.reflect.Method;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.testng.ITestResult;
+import org.testng.annotations.AfterMethod;
+import org.testng.annotations.BeforeMethod;
+
+/**
+ * Adds support for resetting the internal state of the test
+ * by calling "cleanup" and "setup" methods before running a test method
+ * after a previous test method has failed.
+ *
+ * This is useful for making test retries to work on classes which use BeforeClass
+ * and AfterClass methods to setup a test environment that is shared across all test methods in the test
+ * class.
+ *
+ * The setup method implementation must call incrementSetupNumber method and the cleanup method must call
+ * markCurrentSetupNumberCleaned method. This is required by the state tracking logic.
+ *
+ */
+public abstract class TestRetrySupport {
+ private static final Logger LOG = LoggerFactory.getLogger(TestRetrySupport.class);
+ private int currentSetupNumber;
+ private int failedSetupNumber = -1;
+ private int cleanedUpSetupNumber;
+
+ @BeforeMethod(alwaysRun = true)
+ public final void stateCheck(Method method) throws Exception {
+ // run cleanup and setup if the current setup number is the one where a failure happened
+ // this is to cleanup state before retrying
+ if (currentSetupNumber == failedSetupNumber
+ && cleanedUpSetupNumber != failedSetupNumber) {
+ LOG.info("Previous test run has failed before {}.{}, failedSetupNumber={}. Running cleanup and setup.",
+ method.getDeclaringClass().getSimpleName(), method.getName(), failedSetupNumber);
+ try {
+ cleanup();
+ } catch (Exception e) {
+ LOG.error("Cleanup failed, ignoring this.", e);
+ }
+ setup();
+ LOG.info("State cleanup finished.");
+ failedSetupNumber = -1;
+ }
+ }
+
+ @AfterMethod(alwaysRun = true)
+ public final void failureCheck(ITestResult testResult, Method method) {
+ // track the setup number where the failure happened
+ if (!testResult.isSuccess()) {
+ LOG.info("Detected test failure in test {}.{}, currentSetupNumber={}",
+ method.getDeclaringClass().getSimpleName(), method.getName(),
+ currentSetupNumber);
+ failedSetupNumber = currentSetupNumber;
+ }
+ }
+
+ /**
+ * This method should be called in the setup method of the concrete class.
+ *
+ * This increases an internal counter and resets the failure state which are used to determine
+ * whether cleanup is needed before a test method is called.
+ *
+ */
+ protected final void incrementSetupNumber() {
+ currentSetupNumber++;
+ failedSetupNumber = -1;
+ LOG.debug("currentSetupNumber={}", currentSetupNumber);
+ }
+
+ /**
+ * This method should be called in the cleanup method of the concrete class.
+ */
+ protected final void markCurrentSetupNumberCleaned() {
+ cleanedUpSetupNumber = currentSetupNumber;
+ LOG.debug("cleanedUpSetupNumber={}", cleanedUpSetupNumber);
+ }
+
+ /**
+ * Initializes the test environment state.
+ *
+ * The implementation of this method must call incrementSetupNumber method.
+ */
+ protected abstract void setup() throws Exception;
+
+ /**
+ * Cleans up the state of the environment.
+ *
+ * The implementation of this method must call the markCurrentSetupNumberCleaned method.
+ */
+ protected abstract void cleanup() throws Exception;
+}
diff --git a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/containers/BKContainer.java b/pulsar-inttest-lib/src/main/java/org/apache/pulsar/tests/integration/containers/BKContainer.java
similarity index 100%
rename from tests/integration/src/test/java/org/apache/pulsar/tests/integration/containers/BKContainer.java
rename to pulsar-inttest-lib/src/main/java/org/apache/pulsar/tests/integration/containers/BKContainer.java
diff --git a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/containers/BrokerContainer.java b/pulsar-inttest-lib/src/main/java/org/apache/pulsar/tests/integration/containers/BrokerContainer.java
similarity index 100%
rename from tests/integration/src/test/java/org/apache/pulsar/tests/integration/containers/BrokerContainer.java
rename to pulsar-inttest-lib/src/main/java/org/apache/pulsar/tests/integration/containers/BrokerContainer.java
diff --git a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/containers/CSContainer.java b/pulsar-inttest-lib/src/main/java/org/apache/pulsar/tests/integration/containers/CSContainer.java
similarity index 100%
rename from tests/integration/src/test/java/org/apache/pulsar/tests/integration/containers/CSContainer.java
rename to pulsar-inttest-lib/src/main/java/org/apache/pulsar/tests/integration/containers/CSContainer.java
diff --git a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/containers/CassandraContainer.java b/pulsar-inttest-lib/src/main/java/org/apache/pulsar/tests/integration/containers/CassandraContainer.java
similarity index 100%
rename from tests/integration/src/test/java/org/apache/pulsar/tests/integration/containers/CassandraContainer.java
rename to pulsar-inttest-lib/src/main/java/org/apache/pulsar/tests/integration/containers/CassandraContainer.java
diff --git a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/containers/ChaosContainer.java b/pulsar-inttest-lib/src/main/java/org/apache/pulsar/tests/integration/containers/ChaosContainer.java
similarity index 98%
rename from tests/integration/src/test/java/org/apache/pulsar/tests/integration/containers/ChaosContainer.java
rename to pulsar-inttest-lib/src/main/java/org/apache/pulsar/tests/integration/containers/ChaosContainer.java
index eb0acf33a892c..b00346681c4de 100644
--- a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/containers/ChaosContainer.java
+++ b/pulsar-inttest-lib/src/main/java/org/apache/pulsar/tests/integration/containers/ChaosContainer.java
@@ -19,11 +19,9 @@
package org.apache.pulsar.tests.integration.containers;
import com.github.dockerjava.api.DockerClient;
-
import java.util.Base64;
import java.util.Objects;
import java.util.concurrent.CompletableFuture;
-
import lombok.extern.slf4j.Slf4j;
import org.apache.pulsar.tests.integration.docker.ContainerExecResult;
import org.apache.pulsar.tests.integration.utils.DockerUtils;
@@ -98,7 +96,8 @@ public ContainerExecResult execCmdAsUser(String userId, String... commands) thro
return DockerUtils.runCommandAsUser(userId, client, dockerId, commands);
}
- public CompletableFuture execCmdAsyncAsUser(String userId, String... commands) throws Exception {
+ public CompletableFuture execCmdAsyncAsUser(String userId, String... commands)
+ throws Exception {
DockerClient client = this.getDockerClient();
String dockerId = this.getContainerId();
return DockerUtils.runCommandAsyncAsUser(userId, client, dockerId, commands);
diff --git a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/containers/DebeziumMongoDbContainer.java b/pulsar-inttest-lib/src/main/java/org/apache/pulsar/tests/integration/containers/DebeziumMongoDbContainer.java
similarity index 100%
rename from tests/integration/src/test/java/org/apache/pulsar/tests/integration/containers/DebeziumMongoDbContainer.java
rename to pulsar-inttest-lib/src/main/java/org/apache/pulsar/tests/integration/containers/DebeziumMongoDbContainer.java
diff --git a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/containers/DebeziumMsSqlContainer.java b/pulsar-inttest-lib/src/main/java/org/apache/pulsar/tests/integration/containers/DebeziumMsSqlContainer.java
similarity index 99%
rename from tests/integration/src/test/java/org/apache/pulsar/tests/integration/containers/DebeziumMsSqlContainer.java
rename to pulsar-inttest-lib/src/main/java/org/apache/pulsar/tests/integration/containers/DebeziumMsSqlContainer.java
index 61acdae37696b..bee6fefaa8e07 100644
--- a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/containers/DebeziumMsSqlContainer.java
+++ b/pulsar-inttest-lib/src/main/java/org/apache/pulsar/tests/integration/containers/DebeziumMsSqlContainer.java
@@ -19,10 +19,9 @@
package org.apache.pulsar.tests.integration.containers;
-import org.testcontainers.containers.wait.strategy.Wait;
-
import java.time.Duration;
import java.time.temporal.ChronoUnit;
+import org.testcontainers.containers.wait.strategy.Wait;
public class DebeziumMsSqlContainer extends ChaosContainer {
diff --git a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/containers/DebeziumMySQLContainer.java b/pulsar-inttest-lib/src/main/java/org/apache/pulsar/tests/integration/containers/DebeziumMySQLContainer.java
similarity index 100%
rename from tests/integration/src/test/java/org/apache/pulsar/tests/integration/containers/DebeziumMySQLContainer.java
rename to pulsar-inttest-lib/src/main/java/org/apache/pulsar/tests/integration/containers/DebeziumMySQLContainer.java
diff --git a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/containers/DebeziumOracleDbContainer.java b/pulsar-inttest-lib/src/main/java/org/apache/pulsar/tests/integration/containers/DebeziumOracleDbContainer.java
similarity index 99%
rename from tests/integration/src/test/java/org/apache/pulsar/tests/integration/containers/DebeziumOracleDbContainer.java
rename to pulsar-inttest-lib/src/main/java/org/apache/pulsar/tests/integration/containers/DebeziumOracleDbContainer.java
index 8579f00dd9126..a8ff57d220372 100644
--- a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/containers/DebeziumOracleDbContainer.java
+++ b/pulsar-inttest-lib/src/main/java/org/apache/pulsar/tests/integration/containers/DebeziumOracleDbContainer.java
@@ -19,10 +19,9 @@
package org.apache.pulsar.tests.integration.containers;
-import org.testcontainers.containers.wait.strategy.HostPortWaitStrategy;
-
import java.time.Duration;
import java.time.temporal.ChronoUnit;
+import org.testcontainers.containers.wait.strategy.HostPortWaitStrategy;
public class DebeziumOracleDbContainer extends ChaosContainer {
diff --git a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/containers/DebeziumPostgreSqlContainer.java b/pulsar-inttest-lib/src/main/java/org/apache/pulsar/tests/integration/containers/DebeziumPostgreSqlContainer.java
similarity index 100%
rename from tests/integration/src/test/java/org/apache/pulsar/tests/integration/containers/DebeziumPostgreSqlContainer.java
rename to pulsar-inttest-lib/src/main/java/org/apache/pulsar/tests/integration/containers/DebeziumPostgreSqlContainer.java
diff --git a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/containers/HdfsContainer.java b/pulsar-inttest-lib/src/main/java/org/apache/pulsar/tests/integration/containers/HdfsContainer.java
similarity index 60%
rename from tests/integration/src/test/java/org/apache/pulsar/tests/integration/containers/HdfsContainer.java
rename to pulsar-inttest-lib/src/main/java/org/apache/pulsar/tests/integration/containers/HdfsContainer.java
index 948f3ca468165..dd00c510e9607 100644
--- a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/containers/HdfsContainer.java
+++ b/pulsar-inttest-lib/src/main/java/org/apache/pulsar/tests/integration/containers/HdfsContainer.java
@@ -22,30 +22,28 @@
public class HdfsContainer extends ChaosContainer {
- public static final String NAME = "HDFS";
- static final Integer[] PORTS = { 8020, 8032, 8088, 9000, 10020, 19888, 50010, 50020, 50070, 50070, 50090 };
-
- private static final String IMAGE_NAME = "harisekhon/hadoop:latest";
-
- public HdfsContainer(String clusterName) {
- super(clusterName, IMAGE_NAME);
- }
-
- @Override
+ public static final String NAME = "HDFS";
+ static final Integer[] PORTS = { 8020, 8032, 8088, 9000, 10020, 19888, 50010, 50020, 50070, 50070, 50090 };
+ private static final String IMAGE_NAME = "harisekhon/hadoop:latest";
+
+ public HdfsContainer(String clusterName) {
+ super(clusterName, IMAGE_NAME);
+ }
+
+ @Override
public String getContainerName() {
return clusterName;
}
-
- @Override
- protected void configure() {
- super.configure();
- this.withNetworkAliases(NAME)
- .withExposedPorts(PORTS)
- .withCreateContainerCmdModifier(createContainerCmd -> {
- createContainerCmd.withHostName(NAME);
- createContainerCmd.withName(clusterName + "-" + NAME);
- })
- .waitingFor(new HostPortWaitStrategy());
- }
+ @Override
+ protected void configure() {
+ super.configure();
+ this.withNetworkAliases(NAME)
+ .withExposedPorts(PORTS)
+ .withCreateContainerCmdModifier(createContainerCmd -> {
+ createContainerCmd.withHostName(NAME);
+ createContainerCmd.withName(clusterName + "-" + NAME);
+ })
+ .waitingFor(new HostPortWaitStrategy());
+ }
}
diff --git a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/containers/OpenTelemetryCollectorContainer.java b/pulsar-inttest-lib/src/main/java/org/apache/pulsar/tests/integration/containers/OpenTelemetryCollectorContainer.java
similarity index 100%
rename from tests/integration/src/test/java/org/apache/pulsar/tests/integration/containers/OpenTelemetryCollectorContainer.java
rename to pulsar-inttest-lib/src/main/java/org/apache/pulsar/tests/integration/containers/OpenTelemetryCollectorContainer.java
diff --git a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/oxia/OxiaContainer.java b/pulsar-inttest-lib/src/main/java/org/apache/pulsar/tests/integration/containers/OxiaContainer.java
similarity index 92%
rename from tests/integration/src/test/java/org/apache/pulsar/tests/integration/oxia/OxiaContainer.java
rename to pulsar-inttest-lib/src/main/java/org/apache/pulsar/tests/integration/containers/OxiaContainer.java
index 18d2dd77b7d46..e8ef81ffbf949 100644
--- a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/oxia/OxiaContainer.java
+++ b/pulsar-inttest-lib/src/main/java/org/apache/pulsar/tests/integration/containers/OxiaContainer.java
@@ -17,11 +17,9 @@
* under the License.
*/
-package org.apache.pulsar.tests.integration.oxia;
+package org.apache.pulsar.tests.integration.containers;
import java.time.Duration;
-import org.apache.pulsar.tests.integration.containers.ChaosContainer;
-import org.apache.pulsar.tests.integration.containers.PulsarContainer;
import org.testcontainers.containers.wait.strategy.Wait;
public class OxiaContainer extends ChaosContainer {
diff --git a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/containers/ProxyContainer.java b/pulsar-inttest-lib/src/main/java/org/apache/pulsar/tests/integration/containers/ProxyContainer.java
similarity index 100%
rename from tests/integration/src/test/java/org/apache/pulsar/tests/integration/containers/ProxyContainer.java
rename to pulsar-inttest-lib/src/main/java/org/apache/pulsar/tests/integration/containers/ProxyContainer.java
diff --git a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/containers/PulsarContainer.java b/pulsar-inttest-lib/src/main/java/org/apache/pulsar/tests/integration/containers/PulsarContainer.java
similarity index 99%
rename from tests/integration/src/test/java/org/apache/pulsar/tests/integration/containers/PulsarContainer.java
rename to pulsar-inttest-lib/src/main/java/org/apache/pulsar/tests/integration/containers/PulsarContainer.java
index 3cdb048aea55f..02f0ec9df8dfe 100644
--- a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/containers/PulsarContainer.java
+++ b/pulsar-inttest-lib/src/main/java/org/apache/pulsar/tests/integration/containers/PulsarContainer.java
@@ -19,7 +19,6 @@
package org.apache.pulsar.tests.integration.containers;
import static java.time.temporal.ChronoUnit.SECONDS;
-
import java.io.File;
import java.io.IOException;
import java.io.UncheckedIOException;
diff --git a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/containers/PulsarInitMetadataContainer.java b/pulsar-inttest-lib/src/main/java/org/apache/pulsar/tests/integration/containers/PulsarInitMetadataContainer.java
similarity index 98%
rename from tests/integration/src/test/java/org/apache/pulsar/tests/integration/containers/PulsarInitMetadataContainer.java
rename to pulsar-inttest-lib/src/main/java/org/apache/pulsar/tests/integration/containers/PulsarInitMetadataContainer.java
index 4251ed3bd57ac..68757d9fecef8 100644
--- a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/containers/PulsarInitMetadataContainer.java
+++ b/pulsar-inttest-lib/src/main/java/org/apache/pulsar/tests/integration/containers/PulsarInitMetadataContainer.java
@@ -25,7 +25,7 @@
import org.testcontainers.containers.Network;
/**
- * Initialize the Pulsar metadata
+ * Initialize the Pulsar metadata.
*/
@Slf4j
public class PulsarInitMetadataContainer extends GenericContainer {
diff --git a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/containers/RabbitMQContainer.java b/pulsar-inttest-lib/src/main/java/org/apache/pulsar/tests/integration/containers/RabbitMQContainer.java
similarity index 100%
rename from tests/integration/src/test/java/org/apache/pulsar/tests/integration/containers/RabbitMQContainer.java
rename to pulsar-inttest-lib/src/main/java/org/apache/pulsar/tests/integration/containers/RabbitMQContainer.java
diff --git a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/containers/S3Container.java b/pulsar-inttest-lib/src/main/java/org/apache/pulsar/tests/integration/containers/S3Container.java
similarity index 98%
rename from tests/integration/src/test/java/org/apache/pulsar/tests/integration/containers/S3Container.java
rename to pulsar-inttest-lib/src/main/java/org/apache/pulsar/tests/integration/containers/S3Container.java
index 439b79990f6d9..5ffa5ea65e1ec 100644
--- a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/containers/S3Container.java
+++ b/pulsar-inttest-lib/src/main/java/org/apache/pulsar/tests/integration/containers/S3Container.java
@@ -21,7 +21,7 @@
import lombok.extern.slf4j.Slf4j;
/**
- * S3 simulation container
+ * S3 simulation container.
*/
@Slf4j
public class S3Container extends ChaosContainer {
diff --git a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/containers/StandaloneContainer.java b/pulsar-inttest-lib/src/main/java/org/apache/pulsar/tests/integration/containers/StandaloneContainer.java
similarity index 99%
rename from tests/integration/src/test/java/org/apache/pulsar/tests/integration/containers/StandaloneContainer.java
rename to pulsar-inttest-lib/src/main/java/org/apache/pulsar/tests/integration/containers/StandaloneContainer.java
index 3a7f679953300..7e0b347640d59 100644
--- a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/containers/StandaloneContainer.java
+++ b/pulsar-inttest-lib/src/main/java/org/apache/pulsar/tests/integration/containers/StandaloneContainer.java
@@ -19,7 +19,6 @@
package org.apache.pulsar.tests.integration.containers;
import static java.time.temporal.ChronoUnit.SECONDS;
-
import java.time.Duration;
import org.testcontainers.containers.wait.strategy.HttpWaitStrategy;
diff --git a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/containers/WebSocketContainer.java b/pulsar-inttest-lib/src/main/java/org/apache/pulsar/tests/integration/containers/WebSocketContainer.java
similarity index 100%
rename from tests/integration/src/test/java/org/apache/pulsar/tests/integration/containers/WebSocketContainer.java
rename to pulsar-inttest-lib/src/main/java/org/apache/pulsar/tests/integration/containers/WebSocketContainer.java
diff --git a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/containers/WorkerContainer.java b/pulsar-inttest-lib/src/main/java/org/apache/pulsar/tests/integration/containers/WorkerContainer.java
similarity index 100%
rename from tests/integration/src/test/java/org/apache/pulsar/tests/integration/containers/WorkerContainer.java
rename to pulsar-inttest-lib/src/main/java/org/apache/pulsar/tests/integration/containers/WorkerContainer.java
diff --git a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/containers/ZKContainer.java b/pulsar-inttest-lib/src/main/java/org/apache/pulsar/tests/integration/containers/ZKContainer.java
similarity index 100%
rename from tests/integration/src/test/java/org/apache/pulsar/tests/integration/containers/ZKContainer.java
rename to pulsar-inttest-lib/src/main/java/org/apache/pulsar/tests/integration/containers/ZKContainer.java
diff --git a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/containers/package-info.java b/pulsar-inttest-lib/src/main/java/org/apache/pulsar/tests/integration/containers/package-info.java
similarity index 100%
rename from tests/integration/src/test/java/org/apache/pulsar/tests/integration/containers/package-info.java
rename to pulsar-inttest-lib/src/main/java/org/apache/pulsar/tests/integration/containers/package-info.java
diff --git a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/docker/ContainerExecException.java b/pulsar-inttest-lib/src/main/java/org/apache/pulsar/tests/integration/docker/ContainerExecException.java
similarity index 100%
rename from tests/integration/src/test/java/org/apache/pulsar/tests/integration/docker/ContainerExecException.java
rename to pulsar-inttest-lib/src/main/java/org/apache/pulsar/tests/integration/docker/ContainerExecException.java
diff --git a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/docker/ContainerExecResult.java b/pulsar-inttest-lib/src/main/java/org/apache/pulsar/tests/integration/docker/ContainerExecResult.java
similarity index 81%
rename from tests/integration/src/test/java/org/apache/pulsar/tests/integration/docker/ContainerExecResult.java
rename to pulsar-inttest-lib/src/main/java/org/apache/pulsar/tests/integration/docker/ContainerExecResult.java
index 1df417863b15e..0e795764c447a 100644
--- a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/docker/ContainerExecResult.java
+++ b/pulsar-inttest-lib/src/main/java/org/apache/pulsar/tests/integration/docker/ContainerExecResult.java
@@ -18,7 +18,6 @@
*/
package org.apache.pulsar.tests.integration.docker;
-import static org.testng.Assert.assertTrue;
import lombok.Data;
/**
@@ -37,12 +36,14 @@ public void assertNoOutput() {
}
public void assertNoStdout() {
- assertTrue(stdout.isEmpty(),
- "stdout should be empty, but was '" + stdout + "'");
+ if (!stdout.isEmpty()) {
+ throw new IllegalArgumentException("stdout should be empty, but was '" + stdout + "'");
+ }
}
public void assertNoStderr() {
- assertTrue(stderr.isEmpty(),
- "stderr should be empty, but was '" + stderr + "'");
+ if (!stderr.isEmpty()) {
+ throw new IllegalArgumentException("stderr should be empty, but was '" + stderr + "'");
+ }
}
}
diff --git a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/docker/ContainerExecResultBytes.java b/pulsar-inttest-lib/src/main/java/org/apache/pulsar/tests/integration/docker/ContainerExecResultBytes.java
similarity index 100%
rename from tests/integration/src/test/java/org/apache/pulsar/tests/integration/docker/ContainerExecResultBytes.java
rename to pulsar-inttest-lib/src/main/java/org/apache/pulsar/tests/integration/docker/ContainerExecResultBytes.java
diff --git a/pulsar-inttest-lib/src/main/java/org/apache/pulsar/tests/integration/docker/package-info.java b/pulsar-inttest-lib/src/main/java/org/apache/pulsar/tests/integration/docker/package-info.java
new file mode 100644
index 0000000000000..566c3c18d0334
--- /dev/null
+++ b/pulsar-inttest-lib/src/main/java/org/apache/pulsar/tests/integration/docker/package-info.java
@@ -0,0 +1,22 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+/**
+ * Implementation of policies.
+ */
+package org.apache.pulsar.tests.integration.docker;
\ No newline at end of file
diff --git a/pulsar-inttest-lib/src/main/java/org/apache/pulsar/tests/integration/package-info.java b/pulsar-inttest-lib/src/main/java/org/apache/pulsar/tests/integration/package-info.java
new file mode 100644
index 0000000000000..68c0199d5f32f
--- /dev/null
+++ b/pulsar-inttest-lib/src/main/java/org/apache/pulsar/tests/integration/package-info.java
@@ -0,0 +1,22 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+/**
+ * Implementation of policies.
+ */
+package org.apache.pulsar.tests.integration;
\ No newline at end of file
diff --git a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/suites/PulsarCliTestSuite.java b/pulsar-inttest-lib/src/main/java/org/apache/pulsar/tests/integration/suites/PulsarCliTestSuite.java
similarity index 100%
rename from tests/integration/src/test/java/org/apache/pulsar/tests/integration/suites/PulsarCliTestSuite.java
rename to pulsar-inttest-lib/src/main/java/org/apache/pulsar/tests/integration/suites/PulsarCliTestSuite.java
diff --git a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/suites/PulsarStandaloneTestSuite.java b/pulsar-inttest-lib/src/main/java/org/apache/pulsar/tests/integration/suites/PulsarStandaloneTestSuite.java
similarity index 100%
rename from tests/integration/src/test/java/org/apache/pulsar/tests/integration/suites/PulsarStandaloneTestSuite.java
rename to pulsar-inttest-lib/src/main/java/org/apache/pulsar/tests/integration/suites/PulsarStandaloneTestSuite.java
diff --git a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/suites/PulsarTestSuite.java b/pulsar-inttest-lib/src/main/java/org/apache/pulsar/tests/integration/suites/PulsarTestSuite.java
similarity index 88%
rename from tests/integration/src/test/java/org/apache/pulsar/tests/integration/suites/PulsarTestSuite.java
rename to pulsar-inttest-lib/src/main/java/org/apache/pulsar/tests/integration/suites/PulsarTestSuite.java
index 02c4e3c7ec60d..3b88d394bc9d8 100644
--- a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/suites/PulsarTestSuite.java
+++ b/pulsar-inttest-lib/src/main/java/org/apache/pulsar/tests/integration/suites/PulsarTestSuite.java
@@ -35,11 +35,13 @@ public final void tearDownAfterClass() throws Exception {
cleanup();
}
- public static void retryStrategically(Predicate predicate, int retryCount, long intSleepTimeInMillis) throws Exception {
+ public static void retryStrategically(Predicate predicate, int retryCount, long intSleepTimeInMillis)
+ throws Exception {
retryStrategically(predicate, retryCount, intSleepTimeInMillis, false);
}
- public static void retryStrategically(Predicate predicate, int retryCount, long intSleepTimeInMillis, boolean throwException)
+ public static void retryStrategically(Predicate predicate, int retryCount, long intSleepTimeInMillis,
+ boolean throwException)
throws Exception {
for (int i = 0; i < retryCount; i++) {
@@ -56,7 +58,7 @@ public static void retryStrategically(Predicate predicate, int retryCount,
}
}
- Thread.sleep(intSleepTimeInMillis + (intSleepTimeInMillis * i));
+ Thread.sleep(intSleepTimeInMillis + (intSleepTimeInMillis * i));
}
}
}
diff --git a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/suites/PulsarTieredStorageTestSuite.java b/pulsar-inttest-lib/src/main/java/org/apache/pulsar/tests/integration/suites/PulsarTieredStorageTestSuite.java
similarity index 100%
rename from tests/integration/src/test/java/org/apache/pulsar/tests/integration/suites/PulsarTieredStorageTestSuite.java
rename to pulsar-inttest-lib/src/main/java/org/apache/pulsar/tests/integration/suites/PulsarTieredStorageTestSuite.java
diff --git a/pulsar-inttest-lib/src/main/java/org/apache/pulsar/tests/integration/suites/package-info.java b/pulsar-inttest-lib/src/main/java/org/apache/pulsar/tests/integration/suites/package-info.java
new file mode 100644
index 0000000000000..e3c4925aa7352
--- /dev/null
+++ b/pulsar-inttest-lib/src/main/java/org/apache/pulsar/tests/integration/suites/package-info.java
@@ -0,0 +1,22 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+/**
+ * Implementation of policies.
+ */
+package org.apache.pulsar.tests.integration.suites;
\ No newline at end of file
diff --git a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/topologies/ClientTestBase.java b/pulsar-inttest-lib/src/main/java/org/apache/pulsar/tests/integration/topologies/ClientTestBase.java
similarity index 99%
rename from tests/integration/src/test/java/org/apache/pulsar/tests/integration/topologies/ClientTestBase.java
rename to pulsar-inttest-lib/src/main/java/org/apache/pulsar/tests/integration/topologies/ClientTestBase.java
index 2426039e58b45..ddfa621303237 100644
--- a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/topologies/ClientTestBase.java
+++ b/pulsar-inttest-lib/src/main/java/org/apache/pulsar/tests/integration/topologies/ClientTestBase.java
@@ -18,6 +18,9 @@
*/
package org.apache.pulsar.tests.integration.topologies;
+import static org.testng.Assert.assertEquals;
+import static org.testng.Assert.assertNotNull;
+import java.util.concurrent.TimeUnit;
import lombok.Cleanup;
import org.apache.pulsar.client.admin.PulsarAdmin;
import org.apache.pulsar.client.api.Consumer;
@@ -26,11 +29,6 @@
import org.apache.pulsar.client.api.PulsarClient;
import org.apache.pulsar.client.api.Schema;
-import java.util.concurrent.TimeUnit;
-
-import static org.testng.Assert.assertEquals;
-import static org.testng.Assert.assertNotNull;
-
public class ClientTestBase {
private static final int RECEIVE_TIMEOUT_SECONDS = 3;
diff --git a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/topologies/FunctionRuntimeType.java b/pulsar-inttest-lib/src/main/java/org/apache/pulsar/tests/integration/topologies/FunctionRuntimeType.java
similarity index 100%
rename from tests/integration/src/test/java/org/apache/pulsar/tests/integration/topologies/FunctionRuntimeType.java
rename to pulsar-inttest-lib/src/main/java/org/apache/pulsar/tests/integration/topologies/FunctionRuntimeType.java
diff --git a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/topologies/PulsarCluster.java b/pulsar-inttest-lib/src/main/java/org/apache/pulsar/tests/integration/topologies/PulsarCluster.java
similarity index 98%
rename from tests/integration/src/test/java/org/apache/pulsar/tests/integration/topologies/PulsarCluster.java
rename to pulsar-inttest-lib/src/main/java/org/apache/pulsar/tests/integration/topologies/PulsarCluster.java
index 35fb453c4bb8e..56a4c342fcfb1 100644
--- a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/topologies/PulsarCluster.java
+++ b/pulsar-inttest-lib/src/main/java/org/apache/pulsar/tests/integration/topologies/PulsarCluster.java
@@ -25,8 +25,6 @@
import static org.apache.pulsar.tests.integration.containers.PulsarContainer.CS_PORT;
import static org.apache.pulsar.tests.integration.containers.PulsarContainer.PULSAR_CONTAINERS_LEAVE_RUNNING;
import static org.apache.pulsar.tests.integration.containers.PulsarContainer.ZK_PORT;
-import com.google.common.collect.Lists;
-import com.google.common.collect.Maps;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
@@ -34,6 +32,7 @@
import java.util.List;
import java.util.Map;
import java.util.Objects;
+import java.util.TreeMap;
import java.util.function.Function;
import lombok.Cleanup;
import lombok.Getter;
@@ -43,13 +42,13 @@
import org.apache.pulsar.tests.integration.containers.BKContainer;
import org.apache.pulsar.tests.integration.containers.BrokerContainer;
import org.apache.pulsar.tests.integration.containers.CSContainer;
+import org.apache.pulsar.tests.integration.containers.OxiaContainer;
import org.apache.pulsar.tests.integration.containers.ProxyContainer;
import org.apache.pulsar.tests.integration.containers.PulsarContainer;
import org.apache.pulsar.tests.integration.containers.PulsarInitMetadataContainer;
import org.apache.pulsar.tests.integration.containers.WorkerContainer;
import org.apache.pulsar.tests.integration.containers.ZKContainer;
import org.apache.pulsar.tests.integration.docker.ContainerExecResult;
-import org.apache.pulsar.tests.integration.oxia.OxiaContainer;
import org.testcontainers.containers.BindMode;
import org.testcontainers.containers.GenericContainer;
import org.testcontainers.containers.Network;
@@ -66,7 +65,7 @@ public class PulsarCluster {
public static final String CURL = "/usr/bin/curl";
/**
- * Pulsar Cluster Spec
+ * Pulsar Cluster Spec.
*
* @param spec pulsar cluster spec.
* @return the built pulsar cluster
@@ -151,9 +150,9 @@ private PulsarCluster(PulsarClusterSpec spec, Network network, CSContainer csCon
this.csContainer = csContainer;
- this.bookieContainers = Maps.newTreeMap();
- this.brokerContainers = Maps.newTreeMap();
- this.workerContainers = Maps.newTreeMap();
+ this.bookieContainers = new TreeMap<>();
+ this.brokerContainers = new TreeMap<>();
+ this.workerContainers = new TreeMap<>();
this.proxyContainer = new ProxyContainer(clusterName, appendClusterName(ProxyContainer.NAME), spec.enableTls)
.withNetwork(network)
@@ -440,7 +439,7 @@ public static void stopService(String networkAlias,
private static Map runNumContainers(String serviceName,
int numContainers,
Function containerCreator) {
- Map containers = Maps.newTreeMap();
+ Map containers = new TreeMap<>();
for (int i = 0; i < numContainers; i++) {
String name = "pulsar-" + serviceName + "-" + i;
T container = containerCreator.apply(name);
@@ -632,7 +631,7 @@ public synchronized WorkerContainer getWorker(String workerName) {
}
private T getAnyContainer(Map containers, String serviceName) {
- List containerList = Lists.newArrayList();
+ List containerList = new ArrayList<>();
containerList.addAll(containers.values());
Collections.shuffle(containerList);
checkArgument(!containerList.isEmpty(), "No " + serviceName + " is alive");
diff --git a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/topologies/PulsarClusterSpec.java b/pulsar-inttest-lib/src/main/java/org/apache/pulsar/tests/integration/topologies/PulsarClusterSpec.java
similarity index 94%
rename from tests/integration/src/test/java/org/apache/pulsar/tests/integration/topologies/PulsarClusterSpec.java
rename to pulsar-inttest-lib/src/main/java/org/apache/pulsar/tests/integration/topologies/PulsarClusterSpec.java
index ca45c9b7c9b82..473ae0b458b8b 100644
--- a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/topologies/PulsarClusterSpec.java
+++ b/pulsar-inttest-lib/src/main/java/org/apache/pulsar/tests/integration/topologies/PulsarClusterSpec.java
@@ -21,14 +21,12 @@
import java.util.List;
import java.util.Map;
import java.util.TreeMap;
-
import lombok.Builder;
import lombok.Builder.Default;
import lombok.Getter;
import lombok.Setter;
import lombok.Singular;
import lombok.experimental.Accessors;
-
import org.apache.pulsar.common.protocol.Commands;
import org.apache.pulsar.tests.integration.containers.PulsarContainer;
import org.testcontainers.containers.GenericContainer;
@@ -82,7 +80,7 @@ public class PulsarClusterSpec {
int numFunctionWorkers = 0;
/**
- * Allow to query the last message
+ * Allow to query the last message.
*/
@Default
boolean queryLastMessage = false;
@@ -96,8 +94,7 @@ public class PulsarClusterSpec {
FunctionRuntimeType functionRuntimeType = FunctionRuntimeType.PROCESS;
/**
- * Returns the list of external services to start with
- * this cluster.
+ * Returns the list of external services to start with this cluster.
*
* @return the list of external services to start with the cluster.
*/
@@ -119,19 +116,19 @@ public class PulsarClusterSpec {
boolean enableContainerLog = false;
/**
- * Provide a map of paths (in the classpath) to mount as volumes inside the containers
+ * Provide a map of paths (in the classpath) to mount as volumes inside the containers.
*/
- @Builder.Default
+ @Default
Map classPathVolumeMounts = new TreeMap<>();
/**
- * Data container
+ * Data container.
*/
- @Builder.Default
+ @Default
GenericContainer> dataContainer = null;
/**
- * Pulsar Test Image Name
+ * Pulsar Test Image Name.
*
* @return the version of the pulsar test image to use
*/
diff --git a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/topologies/PulsarClusterTestBase.java b/pulsar-inttest-lib/src/main/java/org/apache/pulsar/tests/integration/topologies/PulsarClusterTestBase.java
similarity index 74%
rename from tests/integration/src/test/java/org/apache/pulsar/tests/integration/topologies/PulsarClusterTestBase.java
rename to pulsar-inttest-lib/src/main/java/org/apache/pulsar/tests/integration/topologies/PulsarClusterTestBase.java
index 8b99f21373560..56d206538d3a0 100644
--- a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/topologies/PulsarClusterTestBase.java
+++ b/pulsar-inttest-lib/src/main/java/org/apache/pulsar/tests/integration/topologies/PulsarClusterTestBase.java
@@ -18,28 +18,66 @@
*/
package org.apache.pulsar.tests.integration.topologies;
+import static java.util.stream.Collectors.joining;
+import java.io.FileInputStream;
+import java.io.IOException;
import java.util.HashMap;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
+import java.util.Properties;
import java.util.function.Supplier;
+import java.util.stream.Stream;
import lombok.extern.slf4j.Slf4j;
import org.apache.pulsar.client.admin.PulsarAdmin;
+import org.apache.pulsar.client.api.PulsarClient;
import org.apache.pulsar.common.naming.TopicDomain;
import org.testng.annotations.DataProvider;
-import java.util.stream.Stream;
-
-import static java.util.stream.Collectors.joining;
-
@Slf4j
public abstract class PulsarClusterTestBase extends PulsarTestBase {
+
+ public static final String CLIENT_CONFIG_FILE_PATH_PROPERTY_NAME = "client.config.file.path";
+
protected final Map brokerEnvs = new HashMap<>();
protected final Map bookkeeperEnvs = new HashMap<>();
protected final Map proxyEnvs = new HashMap<>();
protected final List brokerAdditionalPorts = new LinkedList<>();
protected final List bookieAdditionalPorts = new LinkedList<>();
+
+ private Map readClientConfigs(String clientConfFilePath) throws IOException {
+ Properties prop = new Properties(System.getProperties());
+ try (FileInputStream input = new FileInputStream(clientConfFilePath)) {
+ prop.load(input);
+ }
+ Map map = new HashMap<>();
+ for (String key : prop.stringPropertyNames()) {
+ map.put(key, prop.get(key));
+ }
+
+ return map;
+ }
+
+ protected PulsarClient getPulsarClient() throws IOException {
+ var clientConfFilePath = System.getProperty(CLIENT_CONFIG_FILE_PATH_PROPERTY_NAME);
+
+ if (clientConfFilePath == null) {
+ return PulsarClient.builder().serviceUrl(getPulsarCluster().getPlainTextServiceUrl()).build();
+ }
+
+ return PulsarClient.builder().loadConf(readClientConfigs(clientConfFilePath)).build();
+ }
+
+ protected PulsarAdmin getPulsarAdmin() throws IOException {
+ var clientConfFilePath = System.getProperty(CLIENT_CONFIG_FILE_PATH_PROPERTY_NAME);
+
+ if (clientConfFilePath == null) {
+ return PulsarAdmin.builder().serviceHttpUrl(getPulsarCluster().getHttpServiceUrl()).build();
+ }
+ return PulsarAdmin.builder().loadConf(readClientConfigs(clientConfFilePath)).build();
+ }
+
@Override
protected final void setup() throws Exception {
setupCluster();
@@ -52,7 +90,7 @@ protected final void cleanup() throws Exception {
@DataProvider(name = "ServiceUrlAndTopics")
public Object[][] serviceUrlAndTopics() {
- return new Object[][] {
+ return new Object[][]{
// plain text, persistent topic
{
stringSupplier(() -> getPulsarCluster().getPlainTextServiceUrl()),
@@ -68,7 +106,7 @@ public Object[][] serviceUrlAndTopics() {
@DataProvider(name = "ServiceUrls")
public Object[][] serviceUrls() {
- return new Object[][] {
+ return new Object[][]{
// plain text
{
stringSupplier(() -> getPulsarCluster().getPlainTextServiceUrl())
@@ -78,7 +116,7 @@ public Object[][] serviceUrls() {
@DataProvider(name = "ServiceAndAdminUrls")
public Object[][] serviceAndAdminUrls() {
- return new Object[][] {
+ return new Object[][]{
// plain text
{
stringSupplier(() -> getPulsarCluster().getPlainTextServiceUrl()),
@@ -89,7 +127,7 @@ public Object[][] serviceAndAdminUrls() {
@DataProvider
public Object[][] serviceUrlAndTopicDomain() {
- return new Object[][] {
+ return new Object[][]{
{
stringSupplier(() -> getPulsarCluster().getPlainTextServiceUrl()),
TopicDomain.persistent
@@ -101,6 +139,18 @@ public Object[][] serviceUrlAndTopicDomain() {
};
}
+ @DataProvider(name = "topicDomain")
+ public Object[][] topicDomain() {
+ return new Object[][]{
+ {
+ TopicDomain.persistent
+ },
+ {
+ TopicDomain.non_persistent
+ },
+ };
+ }
+
protected PulsarAdmin pulsarAdmin;
protected PulsarCluster pulsarCluster;
diff --git a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/topologies/PulsarGeoCluster.java b/pulsar-inttest-lib/src/main/java/org/apache/pulsar/tests/integration/topologies/PulsarGeoCluster.java
similarity index 99%
rename from tests/integration/src/test/java/org/apache/pulsar/tests/integration/topologies/PulsarGeoCluster.java
rename to pulsar-inttest-lib/src/main/java/org/apache/pulsar/tests/integration/topologies/PulsarGeoCluster.java
index 4099aa06e5ece..fa0b74c7fa516 100644
--- a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/topologies/PulsarGeoCluster.java
+++ b/pulsar-inttest-lib/src/main/java/org/apache/pulsar/tests/integration/topologies/PulsarGeoCluster.java
@@ -36,7 +36,7 @@ public class PulsarGeoCluster {
private final PulsarCluster[] clusters;
/**
- * Pulsar Cluster Spec
+ * Pulsar Cluster Spec.
*
* @param specs each pulsar cluster spec.
* @return the built a pulsar cluster with geo replication
diff --git a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/topologies/PulsarGeoClusterTestBase.java b/pulsar-inttest-lib/src/main/java/org/apache/pulsar/tests/integration/topologies/PulsarGeoClusterTestBase.java
similarity index 83%
rename from tests/integration/src/test/java/org/apache/pulsar/tests/integration/topologies/PulsarGeoClusterTestBase.java
rename to pulsar-inttest-lib/src/main/java/org/apache/pulsar/tests/integration/topologies/PulsarGeoClusterTestBase.java
index 07047c7111121..e473a05d8aa8f 100644
--- a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/topologies/PulsarGeoClusterTestBase.java
+++ b/pulsar-inttest-lib/src/main/java/org/apache/pulsar/tests/integration/topologies/PulsarGeoClusterTestBase.java
@@ -19,10 +19,13 @@
package org.apache.pulsar.tests.integration.topologies;
import static java.util.stream.Collectors.joining;
+import java.io.IOException;
+import java.util.concurrent.TimeUnit;
import java.util.stream.Stream;
-
import lombok.Getter;
import lombok.extern.slf4j.Slf4j;
+import org.apache.pulsar.client.admin.PulsarAdmin;
+import org.apache.pulsar.client.api.PulsarClient;
@Slf4j
public class PulsarGeoClusterTestBase extends PulsarTestBase {
@@ -89,4 +92,15 @@ public void tearDownCluster() throws Exception {
this.geoCluster.stop();
}
}
+
+
+ protected PulsarClient getPulsarClient(PulsarCluster cluster) throws IOException {
+ return PulsarClient.builder().serviceUrl(cluster.getPlainTextServiceUrl()).build();
+ }
+
+ protected PulsarAdmin getPulsarAdmin(PulsarCluster cluster) throws IOException {
+ return PulsarAdmin.builder().serviceHttpUrl(cluster.getHttpServiceUrl())
+ .requestTimeout(30, TimeUnit.SECONDS)
+ .build();
+ }
}
diff --git a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/topologies/PulsarStandaloneTestBase.java b/pulsar-inttest-lib/src/main/java/org/apache/pulsar/tests/integration/topologies/PulsarStandaloneTestBase.java
similarity index 100%
rename from tests/integration/src/test/java/org/apache/pulsar/tests/integration/topologies/PulsarStandaloneTestBase.java
rename to pulsar-inttest-lib/src/main/java/org/apache/pulsar/tests/integration/topologies/PulsarStandaloneTestBase.java
diff --git a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/topologies/PulsarTestBase.java b/pulsar-inttest-lib/src/main/java/org/apache/pulsar/tests/integration/topologies/PulsarTestBase.java
similarity index 100%
rename from tests/integration/src/test/java/org/apache/pulsar/tests/integration/topologies/PulsarTestBase.java
rename to pulsar-inttest-lib/src/main/java/org/apache/pulsar/tests/integration/topologies/PulsarTestBase.java
diff --git a/pulsar-inttest-lib/src/main/java/org/apache/pulsar/tests/integration/topologies/package-info.java b/pulsar-inttest-lib/src/main/java/org/apache/pulsar/tests/integration/topologies/package-info.java
new file mode 100644
index 0000000000000..1f387e8087326
--- /dev/null
+++ b/pulsar-inttest-lib/src/main/java/org/apache/pulsar/tests/integration/topologies/package-info.java
@@ -0,0 +1,22 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+/**
+ * Implementation of policies.
+ */
+package org.apache.pulsar.tests.integration.topologies;
\ No newline at end of file
diff --git a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/utils/DockerUtils.java b/pulsar-inttest-lib/src/main/java/org/apache/pulsar/tests/integration/utils/DockerUtils.java
similarity index 100%
rename from tests/integration/src/test/java/org/apache/pulsar/tests/integration/utils/DockerUtils.java
rename to pulsar-inttest-lib/src/main/java/org/apache/pulsar/tests/integration/utils/DockerUtils.java
diff --git a/pulsar-inttest-lib/src/main/java/org/apache/pulsar/tests/integration/utils/IntegTestUtils.java b/pulsar-inttest-lib/src/main/java/org/apache/pulsar/tests/integration/utils/IntegTestUtils.java
new file mode 100644
index 0000000000000..8a0877298c139
--- /dev/null
+++ b/pulsar-inttest-lib/src/main/java/org/apache/pulsar/tests/integration/utils/IntegTestUtils.java
@@ -0,0 +1,79 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.tests.integration.utils;
+
+import java.util.concurrent.ThreadLocalRandom;
+import lombok.extern.slf4j.Slf4j;
+import org.apache.pulsar.client.admin.PulsarAdmin;
+
+@Slf4j
+public class IntegTestUtils {
+ public static String randomName(int numChars) {
+ StringBuilder sb = new StringBuilder();
+ for (int i = 0; i < numChars; i++) {
+ sb.append((char) (ThreadLocalRandom.current().nextInt(26) + 'a'));
+ }
+ return sb.toString();
+ }
+
+ public static String generateNamespaceName() {
+ return "ns-" + randomName(8);
+ }
+
+ public static String generateTopicName(String topicPrefix, boolean isPersistent) {
+ return generateTopicName("default", topicPrefix, isPersistent);
+ }
+
+ public static String generateTopicName(String namespace, String topicPrefix, boolean isPersistent) {
+ String topicName = new StringBuilder(topicPrefix)
+ .append("-")
+ .append(randomName(8))
+ .append("-")
+ .append(System.currentTimeMillis())
+ .toString();
+ if (isPersistent) {
+ return "persistent://public/" + namespace + "/" + topicName;
+ } else {
+ return "non-persistent://public/" + namespace + "/" + topicName;
+ }
+ }
+
+ public static String getNonPartitionedTopic(PulsarAdmin admin, String topicPrefix, boolean isPersistent)
+ throws Exception {
+ String nsName = generateNamespaceName();
+ admin.namespaces().createNamespace("public/" + nsName);
+
+ //TODO: Pass isPersistent, reported in https://github.com/apache/pulsar/issues/23541
+ return generateTopicName(nsName, topicPrefix, true);
+ }
+
+ public static String getPartitionedTopic(PulsarAdmin admin, String topicPrefix, boolean isPersistent,
+ int partitions) throws Exception {
+ if (partitions <= 0) {
+ throw new IllegalArgumentException("partitions must greater than 1");
+ }
+ String nsName = generateNamespaceName();
+ admin.namespaces().createNamespace("public/" + nsName);
+
+ //TODO: Pass isPersistent, reported in https://github.com/apache/pulsar/issues/23541
+ String topicName = generateTopicName(nsName, topicPrefix, true);
+ admin.topics().createPartitionedTopic(topicName, partitions);
+ return topicName;
+ }
+}
diff --git a/pulsar-inttest-lib/src/main/java/org/apache/pulsar/tests/integration/utils/package-info.java b/pulsar-inttest-lib/src/main/java/org/apache/pulsar/tests/integration/utils/package-info.java
new file mode 100644
index 0000000000000..480f9b469c0cd
--- /dev/null
+++ b/pulsar-inttest-lib/src/main/java/org/apache/pulsar/tests/integration/utils/package-info.java
@@ -0,0 +1,22 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+/**
+ * Implementation of policies.
+ */
+package org.apache.pulsar.tests.integration.utils;
\ No newline at end of file
diff --git a/pulsar-inttest-lib/src/main/java/org/apache/pulsar/tests/package-info.java b/pulsar-inttest-lib/src/main/java/org/apache/pulsar/tests/package-info.java
new file mode 100644
index 0000000000000..ce2d0c4af3d54
--- /dev/null
+++ b/pulsar-inttest-lib/src/main/java/org/apache/pulsar/tests/package-info.java
@@ -0,0 +1,22 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+/**
+ * Implementation of policies.
+ */
+package org.apache.pulsar.tests;
\ No newline at end of file
diff --git a/tests/integration/pom.xml b/tests/integration/pom.xml
index 8bb2ce7c7f9ac..0f1e0f5a7d07b 100644
--- a/tests/integration/pom.xml
+++ b/tests/integration/pom.xml
@@ -140,11 +140,7 @@
test
-
- com.github.docker-java
- docker-java-core
- test
-
+
org.bouncycastle
bcpkix-jdk18on
@@ -243,7 +239,12 @@
${mongo-reactivestreams.version}
test
-
+
+ org.apache.pulsar
+ pulsar-inttest-client
+ ${project.version}
+ test
+
diff --git a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/cli/ClientToolTest.java b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/cli/ClientToolTest.java
index 0d6b6f1abe4cf..e6a1a4583e269 100644
--- a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/cli/ClientToolTest.java
+++ b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/cli/ClientToolTest.java
@@ -26,7 +26,7 @@
import org.apache.pulsar.tests.integration.containers.PulsarContainer;
import org.apache.pulsar.tests.integration.containers.ZKContainer;
import org.apache.pulsar.tests.integration.docker.ContainerExecResult;
-import org.apache.pulsar.tests.integration.messaging.TopicMessagingBase;
+import org.apache.pulsar.tests.integration.messaging.MessagingBase;
import org.testng.annotations.Test;
import java.util.ArrayList;
import java.util.List;
@@ -35,7 +35,7 @@
import java.util.stream.Collectors;
import java.util.stream.IntStream;
-public class ClientToolTest extends TopicMessagingBase {
+public class ClientToolTest extends MessagingBase {
private static final int MESSAGE_COUNT = 10;
diff --git a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/cli/PerfToolTest.java b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/cli/PerfToolTest.java
index 8c4f3a137aa31..737a583665c23 100644
--- a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/cli/PerfToolTest.java
+++ b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/cli/PerfToolTest.java
@@ -23,13 +23,13 @@
import org.apache.pulsar.tests.integration.containers.PulsarContainer;
import org.apache.pulsar.tests.integration.containers.ZKContainer;
import org.apache.pulsar.tests.integration.docker.ContainerExecResult;
-import org.apache.pulsar.tests.integration.messaging.TopicMessagingBase;
+import org.apache.pulsar.tests.integration.messaging.MessagingBase;
import org.testng.Assert;
import org.testng.annotations.Test;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.TimeUnit;
-public class PerfToolTest extends TopicMessagingBase {
+public class PerfToolTest extends MessagingBase {
private static final int MESSAGE_COUNT = 50;
diff --git a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/functions/PulsarFunctionsTest.java b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/functions/PulsarFunctionsTest.java
index b78a832f60933..3579a5e48e876 100644
--- a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/functions/PulsarFunctionsTest.java
+++ b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/functions/PulsarFunctionsTest.java
@@ -2171,11 +2171,11 @@ private void checkSchemaForAutoSchema(Message message, String bas
}
}
- private PulsarClient getPulsarClient() throws PulsarClientException {
+ protected PulsarClient getPulsarClient() throws PulsarClientException {
return PulsarClient.builder().serviceUrl(pulsarCluster.getPlainTextServiceUrl()).build();
}
- private PulsarAdmin getPulsarAdmin() throws PulsarClientException {
+ protected PulsarAdmin getPulsarAdmin() throws PulsarClientException {
return PulsarAdmin.builder().serviceHttpUrl(pulsarCluster.getHttpServiceUrl()).build();
}
}
diff --git a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/messaging/DelayMessagingTest.java b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/messaging/DelayMessagingTest.java
index effaef5c19698..189b16c65d993 100644
--- a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/messaging/DelayMessagingTest.java
+++ b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/messaging/DelayMessagingTest.java
@@ -18,19 +18,10 @@
*/
package org.apache.pulsar.tests.integration.messaging;
-import java.util.concurrent.TimeUnit;
-import java.util.function.Supplier;
-import lombok.Cleanup;
import lombok.extern.slf4j.Slf4j;
-import org.apache.pulsar.client.api.Consumer;
-import org.apache.pulsar.client.api.DeadLetterPolicy;
-import org.apache.pulsar.client.api.Message;
-import org.apache.pulsar.client.api.Producer;
-import org.apache.pulsar.client.api.PulsarClient;
-import org.apache.pulsar.client.api.SubscriptionInitialPosition;
-import org.apache.pulsar.client.api.SubscriptionType;
import org.apache.pulsar.tests.integration.suites.PulsarTestSuite;
-import org.testng.Assert;
+import org.testng.annotations.AfterClass;
+import org.testng.annotations.BeforeClass;
import org.testng.annotations.Test;
/**
@@ -39,68 +30,20 @@
@Slf4j
public class DelayMessagingTest extends PulsarTestSuite {
- @Test(dataProvider = "ServiceUrls")
- public void delayMsgBlockTest(Supplier serviceUrl) throws Exception {
- String nsName = generateNamespaceName();
- pulsarCluster.createNamespace(nsName);
-
- String topic = generateTopicName(nsName, "testDelayMsgBlock", true);
- pulsarCluster.createPartitionedTopic(topic, 3);
-
- String retryTopic = topic + "-RETRY";
- String deadLetterTopic = topic + "-DLT";
-
- @Cleanup
- PulsarClient pulsarClient = PulsarClient.builder().serviceUrl(serviceUrl.get()).build();
-
- @Cleanup
- Producer producer = pulsarClient.newProducer()
- .topic(topic)
- .create();
-
- final int redeliverCnt = 10;
- final int delayTimeSeconds = 5;
- @Cleanup
- Consumer consumer = pulsarClient.newConsumer()
- .topic(topic)
- .subscriptionName("test")
- .subscriptionType(SubscriptionType.Shared)
- .subscriptionInitialPosition(SubscriptionInitialPosition.Earliest)
- .enableRetry(true)
- .deadLetterPolicy(DeadLetterPolicy.builder()
- .maxRedeliverCount(redeliverCnt)
- .retryLetterTopic(retryTopic)
- .deadLetterTopic(deadLetterTopic)
- .build())
- .receiverQueueSize(100)
- .ackTimeout(60, TimeUnit.SECONDS)
- .subscribe();
-
- producer.newMessage().value("hello".getBytes()).send();
-
- // receive message at first time
- Message message = consumer.receive(delayTimeSeconds * 2, TimeUnit.SECONDS);
- Assert.assertNotNull(message, "Can't receive message at the first time.");
- consumer.reconsumeLater(message, delayTimeSeconds, TimeUnit.SECONDS);
-
- // receive retry messages
- for (int i = 0; i < redeliverCnt; i++) {
- message = consumer.receive(delayTimeSeconds * 2, TimeUnit.SECONDS);
- Assert.assertNotNull(message, "Consumer can't receive message in double delayTimeSeconds time "
- + delayTimeSeconds * 2 + "s");
- log.info("receive msg. reConsumeTimes: {}", message.getProperty("RECONSUMETIMES"));
- consumer.reconsumeLater(message, delayTimeSeconds, TimeUnit.SECONDS);
- }
+ DelayMessaging test;
+ @BeforeClass(alwaysRun = true)
+ public void setupTest() throws Exception {
+ this.test = new DelayMessaging(getPulsarClient(), getPulsarAdmin());
+ }
- @Cleanup
- Consumer dltConsumer = pulsarClient.newConsumer()
- .topic(deadLetterTopic)
- .subscriptionInitialPosition(SubscriptionInitialPosition.Earliest)
- .subscriptionName("test")
- .subscribe();
+ @AfterClass(alwaysRun = true)
+ public void closeTest() throws Exception {
+ this.test.close();
+ }
- message = dltConsumer.receive(10, TimeUnit.SECONDS);
- Assert.assertNotNull(message, "Dead letter topic consumer can't receive message.");
+ @Test
+ public void delayMsgBlockTest() throws Exception {
+ test.delayMsgBlockTest();
}
}
diff --git a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/messaging/GeoReplicationTest.java b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/messaging/GeoReplicationTest.java
index 863ee22185961..34a3ba77bf349 100644
--- a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/messaging/GeoReplicationTest.java
+++ b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/messaging/GeoReplicationTest.java
@@ -18,25 +18,14 @@
*/
package org.apache.pulsar.tests.integration.messaging;
-import lombok.Cleanup;
+import java.util.HashMap;
+import java.util.Map;
import lombok.extern.slf4j.Slf4j;
-import org.apache.pulsar.client.admin.PulsarAdmin;
-import org.apache.pulsar.client.api.Consumer;
-import org.apache.pulsar.client.api.Message;
-import org.apache.pulsar.client.api.Producer;
-import org.apache.pulsar.client.api.PulsarClient;
import org.apache.pulsar.tests.integration.topologies.PulsarClusterSpec;
import org.apache.pulsar.tests.integration.topologies.PulsarGeoClusterTestBase;
-import org.awaitility.Awaitility;
-import org.testng.Assert;
import org.testng.annotations.AfterClass;
import org.testng.annotations.BeforeClass;
import org.testng.annotations.Test;
-import java.nio.charset.StandardCharsets;
-import java.util.HashMap;
-import java.util.Map;
-import java.util.UUID;
-import java.util.concurrent.TimeUnit;
/**
* Geo replication test.
@@ -44,9 +33,19 @@
@Slf4j
public class GeoReplicationTest extends PulsarGeoClusterTestBase {
+ GeoReplication test;
+
@BeforeClass(alwaysRun = true)
public final void setupBeforeClass() throws Exception {
setup();
+ var cluster1 = getGeoCluster().getClusters()[0];
+ var cluster2 = getGeoCluster().getClusters()[1];
+ this.test = new GeoReplication(
+ getPulsarClient(cluster1),
+ getPulsarAdmin(cluster1),
+ getPulsarClient(cluster2),
+ getPulsarAdmin(cluster2)
+ );
}
@Override
@@ -66,63 +65,11 @@ protected PulsarClusterSpec.PulsarClusterSpecBuilder[] beforeSetupCluster (
@AfterClass(alwaysRun = true)
public final void tearDownAfterClass() throws Exception {
cleanup();
+ this.test.close();
}
@Test(timeOut = 1000 * 30, dataProvider = "TopicDomain")
public void testTopicReplication(String domain) throws Exception {
- String cluster1 = getGeoCluster().getClusters()[0].getClusterName();
- String cluster2 = getGeoCluster().getClusters()[1].getClusterName();
-
- @Cleanup
- PulsarAdmin admin = PulsarAdmin.builder()
- .serviceHttpUrl(getGeoCluster().getClusters()[0].getHttpServiceUrl())
- .requestTimeout(30, TimeUnit.SECONDS)
- .build();
-
- String topic = domain + "://public/default/testTopicReplication-" + UUID.randomUUID();
- Awaitility.await().atMost(10, TimeUnit.SECONDS).untilAsserted(() -> {
- try {
- admin.topics().createPartitionedTopic(topic, 10);
- } catch (Exception e) {
- log.error("Failed to create partitioned topic {}.", topic, e);
- Assert.fail("Failed to create partitioned topic " + topic);
- }
- Assert.assertEquals(admin.topics().getPartitionedTopicMetadata(topic).partitions, 10);
- });
- log.info("Test geo-replication produce and consume for topic {}.", topic);
-
- @Cleanup
- PulsarClient client1 = PulsarClient.builder()
- .serviceUrl(getGeoCluster().getClusters()[0].getPlainTextServiceUrl())
- .build();
-
- @Cleanup
- PulsarClient client2 = PulsarClient.builder()
- .serviceUrl(getGeoCluster().getClusters()[1].getPlainTextServiceUrl())
- .build();
-
- @Cleanup
- Producer p = client1.newProducer()
- .topic(topic)
- .create();
- log.info("Successfully create producer in cluster {} for topic {}.", cluster1, topic);
-
- @Cleanup
- Consumer c = client2.newConsumer()
- .topic(topic)
- .subscriptionName("geo-sub")
- .subscribe();
- log.info("Successfully create consumer in cluster {} for topic {}.", cluster2, topic);
-
- for (int i = 0; i < 10; i++) {
- p.send(String.format("Message [%d]", i).getBytes(StandardCharsets.UTF_8));
- }
- log.info("Successfully produce message to cluster {} for topic {}.", cluster1, topic);
-
- for (int i = 0; i < 10; i++) {
- Message message = c.receive(10, TimeUnit.SECONDS);
- Assert.assertNotNull(message);
- }
- log.info("Successfully consume message from cluster {} for topic {}.", cluster2, topic);
+ test.testTopicReplication(domain);
}
}
diff --git a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/messaging/MessagingBase.java b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/messaging/MessagingBase.java
index ddedacc531a7c..add3dd966dfe2 100644
--- a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/messaging/MessagingBase.java
+++ b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/messaging/MessagingBase.java
@@ -46,7 +46,7 @@ public abstract class MessagingBase extends PulsarTestSuite {
protected String methodName;
@BeforeMethod(alwaysRun = true)
- public void beforeMethod(Method m) throws Exception {
+ public void beforeMethod(Method m) {
methodName = m.getName();
}
@@ -54,6 +54,7 @@ protected String getNonPartitionedTopic(String topicPrefix, boolean isPersistent
String nsName = generateNamespaceName();
pulsarCluster.createNamespace(nsName);
+ //TODO: Pass isPersistent, reported in https://github.com/apache/pulsar/issues/23541
return generateTopicName(nsName, topicPrefix, true);
}
@@ -62,6 +63,7 @@ protected String getPartitionedTopic(String topicPrefix, boolean isPersistent, i
String nsName = generateNamespaceName();
pulsarCluster.createNamespace(nsName);
+ //TODO: Pass isPersistent, reported in https://github.com/apache/pulsar/issues/23541
String topicName = generateTopicName(nsName, topicPrefix, true);
pulsarCluster.createPartitionedTopic(topicName, partitions);
return topicName;
diff --git a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/messaging/MessagingSmokeTest.java b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/messaging/MessagingSmokeTest.java
index 618053ac000e2..73cc2824a8fc4 100644
--- a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/messaging/MessagingSmokeTest.java
+++ b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/messaging/MessagingSmokeTest.java
@@ -18,20 +18,24 @@
*/
package org.apache.pulsar.tests.integration.messaging;
+import java.io.IOException;
import java.util.List;
import java.util.Map;
-import java.util.function.Supplier;
import org.apache.pulsar.broker.loadbalance.extensions.ExtensibleLoadManagerImpl;
import org.apache.pulsar.broker.loadbalance.extensions.scheduler.TransferShedder;
import org.apache.pulsar.common.naming.TopicDomain;
import org.testng.ITest;
+import org.testng.annotations.AfterClass;
+import org.testng.annotations.BeforeClass;
import org.testng.annotations.Factory;
import org.testng.annotations.Test;
-public class MessagingSmokeTest extends TopicMessagingBase implements ITest {
+public class MessagingSmokeTest extends MessagingBase implements ITest {
+
+ TopicMessaging test;
@Factory
- public static Object[] messagingTests() {
+ public static Object[] messagingTests() throws IOException {
List> tests = List.of(
new MessagingSmokeTest("Extensible Load Manager",
Map.of("loadManagerClassName", ExtensibleLoadManagerImpl.class.getName(),
@@ -46,10 +50,21 @@ public static Object[] messagingTests() {
private final String name;
- public MessagingSmokeTest(String name, Map brokerEnvs) {
+ public MessagingSmokeTest(String name, Map brokerEnvs) throws IOException {
super();
this.brokerEnvs.putAll(brokerEnvs);
this.name = name;
+
+ }
+
+ @BeforeClass(alwaysRun = true)
+ public void setupTest() throws Exception {
+ this.test = new TopicMessaging(getPulsarClient(), getPulsarAdmin());
+ }
+
+ @AfterClass(alwaysRun = true)
+ public void closeTest() throws Exception {
+ this.test.close();
}
@Override
@@ -57,51 +72,51 @@ public String getTestName() {
return name;
}
- @Test(dataProvider = "serviceUrlAndTopicDomain")
- public void testNonPartitionedTopicMessagingWithExclusive(Supplier serviceUrl, TopicDomain topicDomain)
+ @Test(dataProvider = "topicDomain")
+ public void testNonPartitionedTopicMessagingWithExclusive(TopicDomain topicDomain)
throws Exception {
- nonPartitionedTopicSendAndReceiveWithExclusive(serviceUrl.get(), TopicDomain.persistent.equals(topicDomain));
+ test.nonPartitionedTopicSendAndReceiveWithExclusive(TopicDomain.persistent.equals(topicDomain));
}
- @Test(dataProvider = "serviceUrlAndTopicDomain")
- public void testPartitionedTopicMessagingWithExclusive(Supplier serviceUrl, TopicDomain topicDomain)
+ @Test(dataProvider = "topicDomain")
+ public void testPartitionedTopicMessagingWithExclusive(TopicDomain topicDomain)
throws Exception {
- partitionedTopicSendAndReceiveWithExclusive(serviceUrl.get(), TopicDomain.persistent.equals(topicDomain));
+ test.partitionedTopicSendAndReceiveWithExclusive(TopicDomain.persistent.equals(topicDomain));
}
- @Test(dataProvider = "serviceUrlAndTopicDomain")
- public void testNonPartitionedTopicMessagingWithFailover(Supplier serviceUrl, TopicDomain topicDomain)
+ @Test(dataProvider = "topicDomain")
+ public void testNonPartitionedTopicMessagingWithFailover(TopicDomain topicDomain)
throws Exception {
- nonPartitionedTopicSendAndReceiveWithFailover(serviceUrl.get(), TopicDomain.persistent.equals(topicDomain));
+ test.nonPartitionedTopicSendAndReceiveWithFailover(TopicDomain.persistent.equals(topicDomain));
}
- @Test(dataProvider = "serviceUrlAndTopicDomain")
- public void testPartitionedTopicMessagingWithFailover(Supplier serviceUrl, TopicDomain topicDomain)
+ @Test(dataProvider = "topicDomain")
+ public void testPartitionedTopicMessagingWithFailover(TopicDomain topicDomain)
throws Exception {
- partitionedTopicSendAndReceiveWithFailover(serviceUrl.get(), TopicDomain.persistent.equals(topicDomain));
+ test.partitionedTopicSendAndReceiveWithFailover(TopicDomain.persistent.equals(topicDomain));
}
- @Test(dataProvider = "serviceUrlAndTopicDomain")
- public void testNonPartitionedTopicMessagingWithShared(Supplier serviceUrl, TopicDomain topicDomain)
+ @Test(dataProvider = "topicDomain")
+ public void testNonPartitionedTopicMessagingWithShared(TopicDomain topicDomain)
throws Exception {
- nonPartitionedTopicSendAndReceiveWithShared(serviceUrl.get(), TopicDomain.persistent.equals(topicDomain));
+ test.nonPartitionedTopicSendAndReceiveWithShared(TopicDomain.persistent.equals(topicDomain));
}
- @Test(dataProvider = "serviceUrlAndTopicDomain")
- public void testPartitionedTopicMessagingWithShared(Supplier serviceUrl, TopicDomain topicDomain)
+ @Test(dataProvider = "topicDomain")
+ public void testPartitionedTopicMessagingWithShared(TopicDomain topicDomain)
throws Exception {
- partitionedTopicSendAndReceiveWithShared(serviceUrl.get(), TopicDomain.persistent.equals(topicDomain));
+ test.partitionedTopicSendAndReceiveWithShared(TopicDomain.persistent.equals(topicDomain));
}
- @Test(dataProvider = "serviceUrlAndTopicDomain")
- public void testNonPartitionedTopicMessagingWithKeyShared(Supplier serviceUrl, TopicDomain topicDomain)
+ @Test(dataProvider = "topicDomain")
+ public void testNonPartitionedTopicMessagingWithKeyShared(TopicDomain topicDomain)
throws Exception {
- nonPartitionedTopicSendAndReceiveWithKeyShared(serviceUrl.get(), TopicDomain.persistent.equals(topicDomain));
+ test.nonPartitionedTopicSendAndReceiveWithKeyShared(TopicDomain.persistent.equals(topicDomain));
}
- @Test(dataProvider = "serviceUrlAndTopicDomain")
- public void testPartitionedTopicMessagingWithKeyShared(Supplier serviceUrl, TopicDomain topicDomain)
+ @Test(dataProvider = "topicDomain")
+ public void testPartitionedTopicMessagingWithKeyShared(TopicDomain topicDomain)
throws Exception {
- partitionedTopicSendAndReceiveWithKeyShared(serviceUrl.get(), TopicDomain.persistent.equals(topicDomain));
+ test.partitionedTopicSendAndReceiveWithKeyShared(TopicDomain.persistent.equals(topicDomain));
}
}
diff --git a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/messaging/NonDurableConsumerMessagingTest.java b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/messaging/NonDurableConsumerMessagingTest.java
index e7eff6a2d2dbf..12f78d8d0b5c0 100644
--- a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/messaging/NonDurableConsumerMessagingTest.java
+++ b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/messaging/NonDurableConsumerMessagingTest.java
@@ -18,53 +18,26 @@
*/
package org.apache.pulsar.tests.integration.messaging;
-import static java.nio.charset.StandardCharsets.UTF_8;
-import static org.testng.Assert.assertEquals;
-import java.util.function.Supplier;
-import java.util.stream.IntStream;
-import lombok.Cleanup;
-import org.apache.pulsar.client.api.Consumer;
-import org.apache.pulsar.client.api.Message;
-import org.apache.pulsar.client.api.Producer;
-import org.apache.pulsar.client.api.PulsarClient;
-import org.apache.pulsar.client.api.SubscriptionInitialPosition;
-import org.apache.pulsar.client.api.SubscriptionMode;
+import org.testng.annotations.AfterClass;
+import org.testng.annotations.BeforeClass;
import org.testng.annotations.Test;
public class NonDurableConsumerMessagingTest extends MessagingBase {
- @Test(dataProvider = "ServiceUrls")
- public void testNonDurableConsumer(Supplier serviceUrl) throws Exception {
- final String topicName = getNonPartitionedTopic("test-non-durable-consumer", false);
- @Cleanup
- final PulsarClient client = PulsarClient.builder().serviceUrl(serviceUrl.get()).build();
+ NonDurableConsumerMessaging test;
- int numMessages = 20;
-
- try (final Producer producer = client.newProducer()
- .topic(topicName)
- .create()) {
-
- IntStream.range(0, numMessages).forEach(i -> {
- String payload = "message-" + i;
- producer.sendAsync(payload.getBytes(UTF_8));
- });
- // flush the producer to make sure all messages are persisted
- producer.flush();
-
- try (final Consumer consumer = client.newConsumer()
- .topic(topicName)
- .subscriptionName("non-durable-consumer")
- .subscriptionMode(SubscriptionMode.NonDurable)
- .subscriptionInitialPosition(SubscriptionInitialPosition.Earliest)
- .subscribe()) {
+ @BeforeClass(alwaysRun = true)
+ public void setupTest() throws Exception {
+ this.test = new NonDurableConsumerMessaging(getPulsarClient(), getPulsarAdmin());
+ }
- for (int i = 0; i < numMessages; i++) {
- Message msg = consumer.receive();
- assertEquals(new String(msg.getValue(), UTF_8), "message-" + i);
- }
- }
- }
+ @AfterClass(alwaysRun = true)
+ public void closeTest() throws Exception {
+ this.test.close();
+ }
+ @Test
+ public void testNonDurableConsumer() throws Exception {
+ test.testNonDurableConsumer();
}
}
diff --git a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/messaging/NonPersistentTopicMessagingTest.java b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/messaging/NonPersistentTopicMessagingTest.java
index 0b379af3957c7..17486e4627ba0 100644
--- a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/messaging/NonPersistentTopicMessagingTest.java
+++ b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/messaging/NonPersistentTopicMessagingTest.java
@@ -18,50 +18,63 @@
*/
package org.apache.pulsar.tests.integration.messaging;
-import java.util.function.Supplier;
import lombok.extern.slf4j.Slf4j;
+import org.testng.annotations.AfterClass;
+import org.testng.annotations.BeforeClass;
import org.testng.annotations.Test;
@Slf4j
-public class NonPersistentTopicMessagingTest extends TopicMessagingBase {
+public class NonPersistentTopicMessagingTest extends MessagingBase {
- @Test(dataProvider = "ServiceUrls")
- public void testNonPartitionedTopicMessagingWithExclusive(Supplier serviceUrl) throws Exception {
- nonPartitionedTopicSendAndReceiveWithExclusive(serviceUrl.get(), false);
+ TopicMessaging test;
+
+ @BeforeClass(alwaysRun = true)
+ public void setupTest() throws Exception {
+ this.test = new TopicMessaging(getPulsarClient(), getPulsarAdmin());
+ }
+
+ @AfterClass(alwaysRun = true)
+ public void closeTest() throws Exception {
+ this.test.close();
+ }
+
+ @Test
+ public void testNonPartitionedTopicMessagingWithExclusive() throws Exception {
+ test.nonPartitionedTopicSendAndReceiveWithExclusive(false);
}
- @Test(dataProvider = "ServiceUrls")
- public void testPartitionedTopicMessagingWithExclusive(Supplier serviceUrl) throws Exception {
- partitionedTopicSendAndReceiveWithExclusive(serviceUrl.get(), false);
+ @Test
+ public void testPartitionedTopicMessagingWithExclusive() throws Exception {
+ test.partitionedTopicSendAndReceiveWithExclusive(false);
}
- @Test(dataProvider = "ServiceUrls")
- public void testNonPartitionedTopicMessagingWithFailover(Supplier serviceUrl) throws Exception {
- nonPartitionedTopicSendAndReceiveWithFailover(serviceUrl.get(), false);
+ @Test
+ public void testNonPartitionedTopicMessagingWithFailover() throws Exception {
+ test.nonPartitionedTopicSendAndReceiveWithFailover(false);
}
- @Test(dataProvider = "ServiceUrls")
- public void testPartitionedTopicMessagingWithFailover(Supplier serviceUrl) throws Exception {
- partitionedTopicSendAndReceiveWithFailover(serviceUrl.get(), false);
+ @Test
+ public void testPartitionedTopicMessagingWithFailover() throws Exception {
+ test.partitionedTopicSendAndReceiveWithFailover(false);
}
- @Test(dataProvider = "ServiceUrls")
- public void testNonPartitionedTopicMessagingWithShared(Supplier serviceUrl) throws Exception {
- nonPartitionedTopicSendAndReceiveWithShared(serviceUrl.get(), false);
+ @Test
+ public void testNonPartitionedTopicMessagingWithShared() throws Exception {
+ test.nonPartitionedTopicSendAndReceiveWithShared(false);
}
- @Test(dataProvider = "ServiceUrls")
- public void testPartitionedTopicMessagingWithShared(Supplier serviceUrl) throws Exception {
- partitionedTopicSendAndReceiveWithShared(serviceUrl.get(), false);
+ @Test
+ public void testPartitionedTopicMessagingWithShared() throws Exception {
+ test.partitionedTopicSendAndReceiveWithShared(false);
}
- @Test(dataProvider = "ServiceUrls")
- public void testNonPartitionedTopicMessagingWithKeyShared(Supplier serviceUrl) throws Exception {
- nonPartitionedTopicSendAndReceiveWithKeyShared(serviceUrl.get(), false);
+ @Test
+ public void testNonPartitionedTopicMessagingWithKeyShared() throws Exception {
+ test.nonPartitionedTopicSendAndReceiveWithKeyShared(false);
}
- @Test(dataProvider = "ServiceUrls")
- public void testPartitionedTopicMessagingWithKeyShared(Supplier serviceUrl) throws Exception {
- partitionedTopicSendAndReceiveWithKeyShared(serviceUrl.get(), false);
+ @Test
+ public void testPartitionedTopicMessagingWithKeyShared() throws Exception {
+ test.partitionedTopicSendAndReceiveWithKeyShared(false);
}
}
diff --git a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/messaging/PersistentTopicMessagingTest.java b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/messaging/PersistentTopicMessagingTest.java
index 5675aa88ec6cc..2de06acf737b1 100644
--- a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/messaging/PersistentTopicMessagingTest.java
+++ b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/messaging/PersistentTopicMessagingTest.java
@@ -18,51 +18,64 @@
*/
package org.apache.pulsar.tests.integration.messaging;
-import java.util.function.Supplier;
import lombok.extern.slf4j.Slf4j;
+import org.testng.annotations.AfterClass;
+import org.testng.annotations.BeforeClass;
import org.testng.annotations.Test;
@Slf4j
-public class PersistentTopicMessagingTest extends TopicMessagingBase {
+public class PersistentTopicMessagingTest extends MessagingBase {
- @Test(dataProvider = "ServiceUrls")
- public void testNonPartitionedTopicMessagingWithExclusive(Supplier serviceUrl) throws Exception {
- nonPartitionedTopicSendAndReceiveWithExclusive(serviceUrl.get(), true);
+ TopicMessaging test;
+
+ @BeforeClass(alwaysRun = true)
+ public void setupTest() throws Exception {
+ this.test = new TopicMessaging(getPulsarClient(), getPulsarAdmin());
+ }
+
+ @AfterClass(alwaysRun = true)
+ public void closeTest() throws Exception {
+ this.test.close();
+ }
+
+ @Test
+ public void testNonPartitionedTopicMessagingWithExclusive() throws Exception {
+ test.nonPartitionedTopicSendAndReceiveWithExclusive(true);
}
- @Test(dataProvider = "ServiceUrls")
- public void testPartitionedTopicMessagingWithExclusive(Supplier serviceUrl) throws Exception {
- partitionedTopicSendAndReceiveWithExclusive(serviceUrl.get(), true);
+ @Test
+ public void testPartitionedTopicMessagingWithExclusive() throws Exception {
+ test.partitionedTopicSendAndReceiveWithExclusive(true);
}
- @Test(dataProvider = "ServiceUrls")
- public void testNonPartitionedTopicMessagingWithFailover(Supplier serviceUrl) throws Exception {
- nonPartitionedTopicSendAndReceiveWithFailover(serviceUrl.get(), true);
+ @Test
+ public void testNonPartitionedTopicMessagingWithFailover() throws Exception {
+ test.nonPartitionedTopicSendAndReceiveWithFailover(true);
}
- @Test(dataProvider = "ServiceUrls")
- public void testPartitionedTopicMessagingWithFailover(Supplier serviceUrl) throws Exception {
- partitionedTopicSendAndReceiveWithFailover(serviceUrl.get(), true);
+ @Test
+ public void testPartitionedTopicMessagingWithFailover() throws Exception {
+ test.partitionedTopicSendAndReceiveWithFailover(true);
}
- @Test(dataProvider = "ServiceUrls")
- public void testNonPartitionedTopicMessagingWithShared(Supplier serviceUrl) throws Exception {
- nonPartitionedTopicSendAndReceiveWithShared(serviceUrl.get(), true);
+ @Test
+ public void testNonPartitionedTopicMessagingWithShared() throws Exception {
+ test.nonPartitionedTopicSendAndReceiveWithShared(true);
}
- @Test(dataProvider = "ServiceUrls")
- public void testPartitionedTopicMessagingWithShared(Supplier serviceUrl) throws Exception {
- partitionedTopicSendAndReceiveWithShared(serviceUrl.get(), true);
+ @Test
+ public void testPartitionedTopicMessagingWithShared() throws Exception {
+ test.partitionedTopicSendAndReceiveWithShared( true);
}
- @Test(dataProvider = "ServiceUrls")
- public void testNonPartitionedTopicMessagingWithKeyShared(Supplier serviceUrl) throws Exception {
- nonPartitionedTopicSendAndReceiveWithKeyShared(serviceUrl.get(), true);
+ @Test
+ public void testNonPartitionedTopicMessagingWithKeyShared() throws Exception {
+ test.nonPartitionedTopicSendAndReceiveWithKeyShared( true);
}
- @Test(dataProvider = "ServiceUrls")
- public void testPartitionedTopicMessagingWithKeyShared(Supplier serviceUrl) throws Exception {
- partitionedTopicSendAndReceiveWithKeyShared(serviceUrl.get(), true);
+ @Test
+ public void testPartitionedTopicMessagingWithKeyShared() throws Exception {
+ test.partitionedTopicSendAndReceiveWithKeyShared( true);
}
}
diff --git a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/messaging/ReaderMessagingTest.java b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/messaging/ReaderMessagingTest.java
index 4bfd964e98e23..3b58f22c0111c 100644
--- a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/messaging/ReaderMessagingTest.java
+++ b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/messaging/ReaderMessagingTest.java
@@ -18,125 +18,34 @@
*/
package org.apache.pulsar.tests.integration.messaging;
-import static org.testng.Assert.assertEquals;
-import static org.testng.Assert.assertNotNull;
-import java.util.concurrent.TimeUnit;
-import java.util.function.Supplier;
-import lombok.Cleanup;
import lombok.extern.slf4j.Slf4j;
-import org.apache.pulsar.client.admin.PulsarAdmin;
-import org.apache.pulsar.client.api.Message;
-import org.apache.pulsar.client.api.MessageId;
-import org.apache.pulsar.client.api.Producer;
-import org.apache.pulsar.client.api.PulsarClient;
-import org.apache.pulsar.client.api.Reader;
-import org.apache.pulsar.client.api.Schema;
+import org.testng.annotations.AfterClass;
+import org.testng.annotations.BeforeClass;
import org.testng.annotations.Test;
@Slf4j
public class ReaderMessagingTest extends MessagingBase {
- @Test(dataProvider = "ServiceAndAdminUrls")
- public void testReaderReconnectAndRead(Supplier serviceUrl, Supplier adminUrl) throws Exception {
- log.info("-- Starting {} test --", methodName);
- final String topicName = getNonPartitionedTopic("test-reader-reconnect-read", false);
- @Cleanup final PulsarClient client = PulsarClient.builder()
- .serviceUrl(serviceUrl.get())
- .build();
- @Cleanup final Reader reader = client.newReader(Schema.STRING)
- .topic(topicName)
- .subscriptionName("test-sub")
- // Here we need to make sure that setting the startMessageId should not cause a change in the
- // behavior of the reader under non.
- .startMessageId(MessageId.earliest)
- .create();
+ ReaderMessaging test;
- final int messagesToSend = 10;
- @Cleanup final Producer producer = client.newProducer(Schema.STRING)
- .topic(topicName)
- .enableBatching(false)
- .create();
- for (int i = 0; i < messagesToSend; i++) {
- MessageId messageId = producer.newMessage().value("message-" + i).send();
- assertNotNull(messageId);
- }
-
- for (int i = 0; i < messagesToSend; i++) {
- Message msg = reader.readNext();
- assertEquals(msg.getValue(), "message-" + i);
- }
-
- @Cleanup
- PulsarAdmin admin = PulsarAdmin.builder()
- .serviceHttpUrl(adminUrl.get())
- .build();
-
- admin.topics().unload(topicName);
-
- for (int i = 0; i < messagesToSend; i++) {
- MessageId messageId = producer.newMessage().value("message-" + i).send();
- assertNotNull(messageId);
- }
+ @BeforeClass(alwaysRun = true)
+ public void setupTest() throws Exception {
+ this.test = new ReaderMessaging(getPulsarClient(), getPulsarAdmin());
+ }
- for (int i = 0; i < messagesToSend; i++) {
- Message msg = reader.readNext();
- assertEquals(msg.getValue(), "message-" + i);
- }
+ @AfterClass(alwaysRun = true)
+ public void closeTest() throws Exception {
+ this.test.close();
+ }
- log.info("-- Exiting {} test --", methodName);
+ @Test
+ public void testReaderReconnectAndRead() throws Exception {
+ test.testReaderReconnectAndRead();
}
- @Test(dataProvider = "ServiceAndAdminUrls")
- public void testReaderReconnectAndReadBatchMessages(Supplier serviceUrl, Supplier adminUrl)
+ @Test
+ public void testReaderReconnectAndReadBatchMessages()
throws Exception {
- log.info("-- Starting {} test --", methodName);
- final String topicName = getNonPartitionedTopic("test-reader-reconnect-read-batch", false);
- @Cleanup final PulsarClient client = PulsarClient.builder()
- .serviceUrl(serviceUrl.get())
- .build();
- @Cleanup final Reader reader = client.newReader(Schema.STRING)
- .topic(topicName)
- .subscriptionName("test-sub")
- // Here we need to make sure that setting the startMessageId should not cause a change in the
- // behavior of the reader under non.
- .startMessageId(MessageId.earliest)
- .create();
-
- final int messagesToSend = 10;
- @Cleanup final Producer producer = client.newProducer(Schema.STRING)
- .topic(topicName)
- .enableBatching(true)
- .batchingMaxPublishDelay(5, TimeUnit.SECONDS)
- .batchingMaxMessages(5)
- .create();
-
- for (int i = 0; i < messagesToSend; i++) {
- MessageId messageId = producer.newMessage().value("message-" + i).send();
- assertNotNull(messageId);
- }
-
- for (int i = 0; i < messagesToSend; i++) {
- Message msg = reader.readNext();
- assertEquals(msg.getValue(), "message-" + i);
- }
-
- @Cleanup
- PulsarAdmin admin = PulsarAdmin.builder()
- .serviceHttpUrl(adminUrl.get())
- .build();
-
- admin.topics().unload(topicName);
-
- for (int i = 0; i < messagesToSend; i++) {
- MessageId messageId = producer.newMessage().value("message-" + i).send();
- assertNotNull(messageId);
- }
-
- for (int i = 0; i < messagesToSend; i++) {
- Message msg = reader.readNext();
- assertEquals(msg.getValue(), "message-" + i);
- }
-
- log.info("-- Exiting {} test --", methodName);
+ test.testReaderReconnectAndReadBatchMessages();
}
}
diff --git a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/plugins/TestBrokerInterceptors.java b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/plugins/TestBrokerInterceptors.java
index 98000c6f40636..7591d3697d84d 100644
--- a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/plugins/TestBrokerInterceptors.java
+++ b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/plugins/TestBrokerInterceptors.java
@@ -29,14 +29,14 @@
import org.apache.pulsar.client.api.Schema;
import org.apache.pulsar.client.api.SubscriptionInitialPosition;
import org.apache.pulsar.client.api.SubscriptionType;
-import org.apache.pulsar.tests.integration.messaging.TopicMessagingBase;
+import org.apache.pulsar.tests.integration.messaging.MessagingBase;
import org.apache.pulsar.tests.integration.topologies.PulsarClusterSpec;
import org.testng.annotations.Test;
import java.util.concurrent.TimeUnit;
import java.util.function.Supplier;
-public class TestBrokerInterceptors extends TopicMessagingBase {
+public class TestBrokerInterceptors extends MessagingBase {
private static final String PREFIX = "PULSAR_PREFIX_";
diff --git a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/plugins/TestEntryFilters.java b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/plugins/TestEntryFilters.java
index f41551b248db0..5e544738868c1 100644
--- a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/plugins/TestEntryFilters.java
+++ b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/plugins/TestEntryFilters.java
@@ -28,12 +28,12 @@
import org.apache.pulsar.client.api.Schema;
import org.apache.pulsar.client.api.SubscriptionInitialPosition;
import org.apache.pulsar.client.api.SubscriptionType;
-import org.apache.pulsar.tests.integration.messaging.TopicMessagingBase;
+import org.apache.pulsar.tests.integration.messaging.MessagingBase;
import org.testng.annotations.Test;
import java.util.Collections;
import java.util.function.Supplier;
-public class TestEntryFilters extends TopicMessagingBase {
+public class TestEntryFilters extends MessagingBase {
private static final String PREFIX = "PULSAR_PREFIX_";
diff --git a/tests/integration/src/test/resources/pulsar-messaging.xml b/tests/integration/src/test/resources/pulsar-messaging.xml
index a34670267dc2a..0169ce61cd05e 100644
--- a/tests/integration/src/test/resources/pulsar-messaging.xml
+++ b/tests/integration/src/test/resources/pulsar-messaging.xml
@@ -30,7 +30,6 @@
-