Skip to content

Commit ac0de68

Browse files
committed
Added coverage for default subscription type fallback.
1 parent be63691 commit ac0de68

File tree

2 files changed

+121
-0
lines changed

2 files changed

+121
-0
lines changed
Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,4 @@
1+
#Custom Topics
2+
my.custom.topic.name=custom-topic-name-from-app-prop
3+
my.custom.subscription.name=custom-subscription-name-from-app-prop
4+
my.custom.consumer.name=custom-consumer-name-from-app-prop
Lines changed: 117 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,117 @@
1+
package io.github.majusko.pulsar;
2+
3+
import io.github.majusko.pulsar.collector.ConsumerCollector;
4+
import io.github.majusko.pulsar.collector.ConsumerHolder;
5+
import io.github.majusko.pulsar.constant.Serialization;
6+
import io.github.majusko.pulsar.consumer.ConsumerAggregator;
7+
import io.github.majusko.pulsar.msg.AvroMsg;
8+
import io.github.majusko.pulsar.msg.MyMsg;
9+
import io.github.majusko.pulsar.msg.ProtoMsg;
10+
import io.github.majusko.pulsar.producer.ProducerFactory;
11+
import io.github.majusko.pulsar.producer.PulsarTemplate;
12+
import io.github.majusko.pulsar.utils.UrlBuildService;
13+
import org.apache.commons.lang3.tuple.ImmutablePair;
14+
import org.apache.pulsar.client.api.Consumer;
15+
import org.apache.pulsar.client.api.PulsarClientException;
16+
import org.apache.pulsar.client.api.SubscriptionType;
17+
import org.apache.pulsar.client.impl.ConsumerBase;
18+
import org.apache.pulsar.client.impl.conf.ConsumerConfigurationData;
19+
import org.junit.jupiter.api.Assertions;
20+
import org.junit.jupiter.api.Test;
21+
import org.springframework.beans.factory.annotation.Autowired;
22+
import org.springframework.beans.factory.annotation.Value;
23+
import org.springframework.boot.test.context.SpringBootTest;
24+
import org.springframework.context.annotation.Import;
25+
import org.springframework.test.context.ActiveProfiles;
26+
import org.springframework.test.context.DynamicPropertyRegistry;
27+
import org.springframework.test.context.DynamicPropertySource;
28+
import org.testcontainers.containers.PulsarContainer;
29+
import org.testcontainers.junit.jupiter.Container;
30+
import org.testcontainers.junit.jupiter.Testcontainers;
31+
import org.testcontainers.utility.DockerImageName;
32+
import reactor.core.Disposable;
33+
34+
import java.lang.reflect.Field;
35+
import java.nio.charset.StandardCharsets;
36+
import java.time.Duration;
37+
import java.util.*;
38+
import java.util.concurrent.atomic.AtomicBoolean;
39+
import java.util.stream.Collectors;
40+
41+
import static org.awaitility.Awaitility.await;
42+
43+
@ActiveProfiles("default-sub-type")
44+
@SpringBootTest
45+
@Import({TestProducerConfiguration.class, TestConsumers.class})
46+
@Testcontainers
47+
class DefaultSubscriptionTypeTests {
48+
49+
@Autowired
50+
private ConsumerAggregator consumerAggregator;
51+
52+
@Autowired
53+
private PulsarTemplate<MyMsg> producer;
54+
55+
@Autowired
56+
private TestConsumers testConsumers;
57+
58+
@Autowired
59+
private UrlBuildService urlBuildService;
60+
61+
@Container
62+
static PulsarContainer pulsarContainer = new PulsarContainer(DockerImageName.parse("apachepulsar/pulsar:latest"));
63+
64+
public static final String VALIDATION_STRING = "validation-string";
65+
66+
@DynamicPropertySource
67+
static void propertySettings(DynamicPropertyRegistry registry) {
68+
registry.add("pulsar.serviceUrl", pulsarContainer::getPulsarBrokerUrl);
69+
}
70+
71+
@Test
72+
void testProducerSendMethod() throws PulsarClientException {
73+
producer.send("topic-one", new MyMsg("bb"));
74+
75+
await().untilTrue(testConsumers.mockTopicListenerReceived);
76+
}
77+
78+
@Test
79+
void sharedSubscriptionOverride() throws Exception {
80+
final ConsumerBase<?> consumer = (ConsumerBase<?>) consumerAggregator.getConsumers().stream()
81+
.filter($ -> $.getTopic().equals(urlBuildService.buildTopicUrl(TestConsumers.SHARED_SUB_TEST)))
82+
.findFirst()
83+
.orElseThrow(() -> new Exception("Missing tested consumer."));
84+
85+
final Field f = ConsumerBase.class.getDeclaredField("conf");
86+
87+
f.setAccessible(true);
88+
89+
final ConsumerConfigurationData<?> conf = (ConsumerConfigurationData<?>) f.get(consumer);
90+
91+
Assertions.assertEquals(urlBuildService.buildTopicUrl(TestConsumers.SHARED_SUB_TEST), consumer.getTopic());
92+
Assertions.assertEquals(SubscriptionType.Shared, conf.getSubscriptionType());
93+
94+
producer.send(TestConsumers.SHARED_SUB_TEST, new MyMsg(VALIDATION_STRING));
95+
await().atMost(Duration.ofSeconds(10)).until(() -> testConsumers.subscribeToSharedTopicSubscription.get());
96+
}
97+
98+
@Test
99+
void exclusiveSubscriptionOverride() throws Exception {
100+
final ConsumerBase<?> consumer = (ConsumerBase<?>) consumerAggregator.getConsumers().stream()
101+
.filter($ -> $.getTopic().equals(urlBuildService.buildTopicUrl(TestConsumers.EXCLUSIVE_SUB_TEST)))
102+
.findFirst()
103+
.orElseThrow(() -> new Exception("Missing tested consumer."));
104+
105+
final Field f = ConsumerBase.class.getDeclaredField("conf");
106+
107+
f.setAccessible(true);
108+
109+
final ConsumerConfigurationData<?> conf = (ConsumerConfigurationData<?>) f.get(consumer);
110+
111+
Assertions.assertEquals(urlBuildService.buildTopicUrl(TestConsumers.EXCLUSIVE_SUB_TEST), consumer.getTopic());
112+
Assertions.assertEquals(SubscriptionType.Exclusive, conf.getSubscriptionType());
113+
114+
producer.send(TestConsumers.EXCLUSIVE_SUB_TEST, new MyMsg(VALIDATION_STRING));
115+
await().atMost(Duration.ofSeconds(10)).until(() -> testConsumers.subscribeToSharedTopicSubscription.get());
116+
}
117+
}

0 commit comments

Comments
 (0)