Skip to content

Commit be63691

Browse files
committed
Added test coverage for subscription type overriding.
1 parent 674c60e commit be63691

File tree

5 files changed

+37
-3
lines changed

5 files changed

+37
-3
lines changed

src/main/java/io/github/majusko/pulsar/consumer/ConsumerAggregator.java

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -102,6 +102,8 @@ private Consumer<?> subscribe(String generatedConsumerName, ConsumerHolder holde
102102
private SubscriptionType getSubscriptionType(ConsumerHolder holder) throws ClientInitException {
103103
SubscriptionType subscriptionType = Arrays.stream(holder.getAnnotation().subscriptionType()).findFirst().orElse(null);
104104

105+
String aa = consumerProperties.getSubscriptionType();
106+
105107
if (subscriptionType == null && Strings.isNullOrEmpty(consumerProperties.getSubscriptionType())) {
106108
subscriptionType = DEFAULT_SUBSCRIPTION_TYPE;
107109
} else if (subscriptionType == null && !Strings.isNullOrEmpty(consumerProperties.getSubscriptionType())) {

src/main/resources/application-test.properties

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
11
pulsar.consumer.default.ack-timeout-ms=1000
22
pulsar.consumer.default.dead-letter-policy-max-redeliver-count=1
3-
pulsar.consmer.default.subscription-type=Shared
3+
pulsar.consumer.default.subscription-type=Shared
44

55
#Custom Topics
66
my.custom.topic.name=custom-topic-name-from-app-prop

src/test/java/io/github/majusko/pulsar/PulsarJavaSpringBootStarterApplicationTests.java

Lines changed: 22 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -138,7 +138,7 @@ void testProducerCreateMessageMethod() throws PulsarClientException {
138138
void testConsumerRegistration1() throws Exception {
139139
final List<Consumer> consumers = consumerAggregator.getConsumers();
140140

141-
Assertions.assertEquals(14, consumers.size());
141+
Assertions.assertEquals(16, consumers.size());
142142

143143
final Consumer<?> consumer =
144144
consumers.stream().filter($ -> $.getTopic().equals(urlBuildService.buildTopicUrl("topic-one"))).findFirst().orElseThrow(Exception::new);
@@ -169,7 +169,7 @@ void testProducerRegistration() {
169169

170170
final Map<String, ImmutablePair<Class<?>, Serialization>> topics = producerFactory.getTopics();
171171

172-
Assertions.assertEquals(14, topics.size());
172+
Assertions.assertEquals(16, topics.size());
173173

174174
final Set<String> topicNames = new HashSet<>(topics.keySet());
175175

@@ -284,4 +284,24 @@ void sharedSubscriptionOverride() throws Exception {
284284
producer.send(TestConsumers.SHARED_SUB_TEST, new MyMsg(VALIDATION_STRING));
285285
await().atMost(Duration.ofSeconds(10)).until(() -> testConsumers.subscribeToSharedTopicSubscription.get());
286286
}
287+
288+
@Test
289+
void exclusiveSubscriptionOverride() throws Exception {
290+
final ConsumerBase<?> consumer = (ConsumerBase<?>) consumerAggregator.getConsumers().stream()
291+
.filter($ -> $.getTopic().equals(urlBuildService.buildTopicUrl(TestConsumers.EXCLUSIVE_SUB_TEST)))
292+
.findFirst()
293+
.orElseThrow(() -> new Exception("Missing tested consumer."));
294+
295+
final Field f = ConsumerBase.class.getDeclaredField("conf");
296+
297+
f.setAccessible(true);
298+
299+
final ConsumerConfigurationData<?> conf = (ConsumerConfigurationData<?>) f.get(consumer);
300+
301+
Assertions.assertEquals(urlBuildService.buildTopicUrl(TestConsumers.EXCLUSIVE_SUB_TEST), consumer.getTopic());
302+
Assertions.assertEquals(SubscriptionType.Exclusive, conf.getSubscriptionType());
303+
304+
producer.send(TestConsumers.EXCLUSIVE_SUB_TEST, new MyMsg(VALIDATION_STRING));
305+
await().atMost(Duration.ofSeconds(10)).until(() -> testConsumers.subscribeToSharedTopicSubscription.get());
306+
}
287307
}

src/test/java/io/github/majusko/pulsar/TestConsumers.java

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -37,6 +37,7 @@ public class TestConsumers {
3737
public static final String CUSTOM_CONSUMER_TOPIC = "custom-consumer-topic";
3838
public static final String CUSTOM_SUB_AND_CONSUMER_TOPIC = "custom-sub-and-consumer";
3939
public static final String SHARED_SUB_TEST = "shared-sub-consumer";
40+
public static final String EXCLUSIVE_SUB_TEST = "exclusive-sub-consumer";
4041

4142
@PulsarConsumer(topic = "topic-one", clazz = MyMsg.class, serialization = Serialization.JSON)
4243
public void topicOneListener(MyMsg myMsg) {
@@ -167,4 +168,14 @@ public void sharedTopicSubscription(MyMsg myMsg) {
167168
Assertions.assertEquals(PulsarJavaSpringBootStarterApplicationTests.VALIDATION_STRING, myMsg.getData());
168169
subscribeToSharedTopicSubscription.set(true);
169170
}
171+
172+
@PulsarConsumer(
173+
topic = EXCLUSIVE_SUB_TEST,
174+
clazz = MyMsg.class,
175+
subscriptionType = SubscriptionType.Exclusive)
176+
public void exclusiveTopicSubscription(MyMsg myMsg) {
177+
Assertions.assertNotNull(myMsg);
178+
Assertions.assertEquals(PulsarJavaSpringBootStarterApplicationTests.VALIDATION_STRING, myMsg.getData());
179+
subscribeToSharedTopicSubscription.set(true);
180+
}
170181
}

src/test/java/io/github/majusko/pulsar/TestProducerConfiguration.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -29,6 +29,7 @@ public ProducerFactory producerFactory() {
2929
.addProducer("${my.custom.topic.name}", MyMsg.class)
3030
.addProducer(TestConsumers.CUSTOM_SUB_AND_CONSUMER_TOPIC, MyMsg.class)
3131
.addProducer(TestConsumers.SHARED_SUB_TEST, MyMsg.class)
32+
.addProducer(TestConsumers.EXCLUSIVE_SUB_TEST, MyMsg.class)
3233
.addProducer(TestConsumers.CUSTOM_CONSUMER_TOPIC, MyMsg.class);
3334
}
3435
}

0 commit comments

Comments
 (0)