Skip to content

Commit f806dd5

Browse files
authored
Merge pull request #161 from majusko/feature/issue-155-default-override-to-properties
Feature/issue 155 default override to properties
2 parents c31ffb3 + ac0de68 commit f806dd5

File tree

11 files changed

+253
-10
lines changed

11 files changed

+253
-10
lines changed

README.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -189,6 +189,7 @@ pulsar.token-auth-value=43th4398gh340gf34gj349gh304ghryj34fh
189189
#### Consumer
190190
- `pulsar.consumer.default.dead-letter-policy-max-redeliver-count` - How many times should pulsar try to retry sending the message to consumer.
191191
- `pulsar.consumer.default.ack-timeout-ms` - How soon should be the message acked and how soon will dead letter mechanism try to retry to send the message.
192+
- `pulsar.consumer.default.subscription-type` - By default all subscriptions are `Exclusive`. You can override this default value here globally or set individualy in each `@PulsarConsumer` annotation.
192193

193194
### Additional usages
194195

src/main/java/io/github/majusko/pulsar/annotation/PulsarConsumer.java

Lines changed: 17 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -17,7 +17,23 @@
1717

1818
Serialization serialization() default Serialization.JSON;
1919

20-
SubscriptionType subscriptionType() default SubscriptionType.Exclusive;
20+
/**
21+
* Type of subscription.
22+
*
23+
* Shared - This will allow you to have multiple consumers/instances of the application in a cluster with same subscription
24+
* name and guarantee that the message is read only by one consumer.
25+
*
26+
* Exclusive - message will be delivered to every subscription name only once but won't allow to instantiate multiple
27+
* instances or consumers of the same subscription name. With a default configuration you don't need to worry about horizontal
28+
* scaling because message will be delivered to each pod in a cluster since in case of exclusive subscription
29+
* the name is unique per instance and can be nicely used to update state of each pod in case your service
30+
* is stateful (For example - you need to update in-memory cached configuration for each instance of authorization microservice).
31+
*
32+
* By default the type is `Exclusive` but you can also override the default in `application.properties`.
33+
* This can be handy in case you are using `Shared` subscription in your application all the time and you
34+
* don't want to override this value every time you use `@PulsarConsumer`.
35+
*/
36+
SubscriptionType[] subscriptionType() default {};
2137

2238
/**
2339
* (Optional) Consumer names are auto-generated but in case you wish to use your custom consumer names,

src/main/java/io/github/majusko/pulsar/consumer/ConsumerAggregator.java

Lines changed: 28 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -1,9 +1,11 @@
11
package io.github.majusko.pulsar.consumer;
22

3+
import com.google.common.base.Strings;
34
import io.github.majusko.pulsar.PulsarMessage;
45
import io.github.majusko.pulsar.collector.ConsumerCollector;
56
import io.github.majusko.pulsar.collector.ConsumerHolder;
67
import io.github.majusko.pulsar.error.FailedMessage;
8+
import io.github.majusko.pulsar.error.exception.ClientInitException;
79
import io.github.majusko.pulsar.error.exception.ConsumerInitException;
810
import io.github.majusko.pulsar.properties.ConsumerProperties;
911
import io.github.majusko.pulsar.utils.SchemaUtils;
@@ -20,6 +22,7 @@
2022
import reactor.util.concurrent.Queues;
2123

2224
import java.lang.reflect.Method;
25+
import java.util.Arrays;
2326
import java.util.List;
2427
import java.util.concurrent.TimeUnit;
2528
import java.util.stream.Collectors;
@@ -33,6 +36,7 @@ public class ConsumerAggregator implements EmbeddedValueResolverAware {
3336
private final PulsarClient pulsarClient;
3437
private final ConsumerProperties consumerProperties;
3538
private final UrlBuildService urlBuildService;
39+
private final static SubscriptionType DEFAULT_SUBSCRIPTION_TYPE = SubscriptionType.Exclusive;
3640

3741
private StringValueResolver stringValueResolver;
3842
private List<Consumer> consumers;
@@ -46,7 +50,7 @@ public ConsumerAggregator(ConsumerCollector consumerCollector, PulsarClient puls
4650
}
4751

4852
@EventListener(ApplicationReadyEvent.class)
49-
public void init() {
53+
public void init() throws ClientInitException {
5054
consumers = consumerCollector.getConsumers().entrySet().stream()
5155
.map(holder -> subscribe(holder.getKey(), holder.getValue()))
5256
.collect(Collectors.toList());
@@ -57,15 +61,14 @@ private Consumer<?> subscribe(String generatedConsumerName, ConsumerHolder holde
5761
final String consumerName = stringValueResolver.resolveStringValue(holder.getAnnotation().consumerName());
5862
final String subscriptionName = stringValueResolver.resolveStringValue(holder.getAnnotation().subscriptionName());
5963
final String topicName = stringValueResolver.resolveStringValue(holder.getAnnotation().topic());
64+
final SubscriptionType subscriptionType = getSubscriptionType(holder);
6065
final ConsumerBuilder<?> consumerBuilder = pulsarClient
6166
.newConsumer(SchemaUtils.getSchema(holder.getAnnotation().serialization(),
6267
holder.getAnnotation().clazz()))
63-
.consumerName(urlBuildService
64-
.buildPulsarConsumerName(consumerName, generatedConsumerName))
65-
.subscriptionName(urlBuildService
66-
.buildPulsarSubscriptionName(subscriptionName, generatedConsumerName))
68+
.consumerName(urlBuildService.buildPulsarConsumerName(consumerName, generatedConsumerName))
69+
.subscriptionName(urlBuildService.buildPulsarSubscriptionName(subscriptionName, generatedConsumerName))
6770
.topic(urlBuildService.buildTopicUrl(topicName))
68-
.subscriptionType(holder.getAnnotation().subscriptionType())
71+
.subscriptionType(subscriptionType)
6972
.messageListener((consumer, msg) -> {
7073
try {
7174
final Method method = holder.getHandler();
@@ -91,11 +94,29 @@ private Consumer<?> subscribe(String generatedConsumerName, ConsumerHolder holde
9194
buildDeadLetterPolicy(holder, consumerBuilder);
9295

9396
return consumerBuilder.subscribe();
94-
} catch (PulsarClientException e) {
97+
} catch (PulsarClientException | ClientInitException e) {
9598
throw new ConsumerInitException("Failed to init consumer.", e);
9699
}
97100
}
98101

102+
private SubscriptionType getSubscriptionType(ConsumerHolder holder) throws ClientInitException {
103+
SubscriptionType subscriptionType = Arrays.stream(holder.getAnnotation().subscriptionType()).findFirst().orElse(null);
104+
105+
String aa = consumerProperties.getSubscriptionType();
106+
107+
if (subscriptionType == null && Strings.isNullOrEmpty(consumerProperties.getSubscriptionType())) {
108+
subscriptionType = DEFAULT_SUBSCRIPTION_TYPE;
109+
} else if (subscriptionType == null && !Strings.isNullOrEmpty(consumerProperties.getSubscriptionType())) {
110+
try {
111+
subscriptionType = SubscriptionType.valueOf(consumerProperties.getSubscriptionType());
112+
} catch (IllegalArgumentException exception) {
113+
throw new ClientInitException("There was unknown SubscriptionType.", exception);
114+
}
115+
}
116+
117+
return subscriptionType;
118+
}
119+
99120
public void buildDeadLetterPolicy(ConsumerHolder holder, ConsumerBuilder<?> consumerBuilder) {
100121
DeadLetterPolicy.DeadLetterPolicyBuilder deadLetterBuilder = null;
101122

src/main/java/io/github/majusko/pulsar/error/exception/ClientInitException.java

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -6,4 +6,8 @@ public class ClientInitException extends IOException {
66
public ClientInitException(String message) {
77
super(message);
88
}
9+
10+
public ClientInitException(String message, Throwable cause) {
11+
super(message, cause);
12+
}
913
}

src/main/java/io/github/majusko/pulsar/properties/ConsumerProperties.java

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,7 @@
66
public class ConsumerProperties {
77
int deadLetterPolicyMaxRedeliverCount = -1;
88
int ackTimeoutMs = 0;
9+
String subscriptionType = "";
910

1011
public int getDeadLetterPolicyMaxRedeliverCount() {
1112
return deadLetterPolicyMaxRedeliverCount;
@@ -22,4 +23,12 @@ public int getAckTimeoutMs() {
2223
public void setAckTimeoutMs(int ackTimeoutMs) {
2324
this.ackTimeoutMs = ackTimeoutMs;
2425
}
26+
27+
public String getSubscriptionType() {
28+
return subscriptionType;
29+
}
30+
31+
public void setSubscriptionType(String subscriptionType) {
32+
this.subscriptionType = subscriptionType;
33+
}
2534
}
Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,4 @@
1+
#Custom Topics
2+
my.custom.topic.name=custom-topic-name-from-app-prop
3+
my.custom.subscription.name=custom-subscription-name-from-app-prop
4+
my.custom.consumer.name=custom-consumer-name-from-app-prop

src/main/resources/application-test.properties

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,6 @@
11
pulsar.consumer.default.ack-timeout-ms=1000
22
pulsar.consumer.default.dead-letter-policy-max-redeliver-count=1
3+
pulsar.consumer.default.subscription-type=Shared
34

45
#Custom Topics
56
my.custom.topic.name=custom-topic-name-from-app-prop
Lines changed: 117 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,117 @@
1+
package io.github.majusko.pulsar;
2+
3+
import io.github.majusko.pulsar.collector.ConsumerCollector;
4+
import io.github.majusko.pulsar.collector.ConsumerHolder;
5+
import io.github.majusko.pulsar.constant.Serialization;
6+
import io.github.majusko.pulsar.consumer.ConsumerAggregator;
7+
import io.github.majusko.pulsar.msg.AvroMsg;
8+
import io.github.majusko.pulsar.msg.MyMsg;
9+
import io.github.majusko.pulsar.msg.ProtoMsg;
10+
import io.github.majusko.pulsar.producer.ProducerFactory;
11+
import io.github.majusko.pulsar.producer.PulsarTemplate;
12+
import io.github.majusko.pulsar.utils.UrlBuildService;
13+
import org.apache.commons.lang3.tuple.ImmutablePair;
14+
import org.apache.pulsar.client.api.Consumer;
15+
import org.apache.pulsar.client.api.PulsarClientException;
16+
import org.apache.pulsar.client.api.SubscriptionType;
17+
import org.apache.pulsar.client.impl.ConsumerBase;
18+
import org.apache.pulsar.client.impl.conf.ConsumerConfigurationData;
19+
import org.junit.jupiter.api.Assertions;
20+
import org.junit.jupiter.api.Test;
21+
import org.springframework.beans.factory.annotation.Autowired;
22+
import org.springframework.beans.factory.annotation.Value;
23+
import org.springframework.boot.test.context.SpringBootTest;
24+
import org.springframework.context.annotation.Import;
25+
import org.springframework.test.context.ActiveProfiles;
26+
import org.springframework.test.context.DynamicPropertyRegistry;
27+
import org.springframework.test.context.DynamicPropertySource;
28+
import org.testcontainers.containers.PulsarContainer;
29+
import org.testcontainers.junit.jupiter.Container;
30+
import org.testcontainers.junit.jupiter.Testcontainers;
31+
import org.testcontainers.utility.DockerImageName;
32+
import reactor.core.Disposable;
33+
34+
import java.lang.reflect.Field;
35+
import java.nio.charset.StandardCharsets;
36+
import java.time.Duration;
37+
import java.util.*;
38+
import java.util.concurrent.atomic.AtomicBoolean;
39+
import java.util.stream.Collectors;
40+
41+
import static org.awaitility.Awaitility.await;
42+
43+
@ActiveProfiles("default-sub-type")
44+
@SpringBootTest
45+
@Import({TestProducerConfiguration.class, TestConsumers.class})
46+
@Testcontainers
47+
class DefaultSubscriptionTypeTests {
48+
49+
@Autowired
50+
private ConsumerAggregator consumerAggregator;
51+
52+
@Autowired
53+
private PulsarTemplate<MyMsg> producer;
54+
55+
@Autowired
56+
private TestConsumers testConsumers;
57+
58+
@Autowired
59+
private UrlBuildService urlBuildService;
60+
61+
@Container
62+
static PulsarContainer pulsarContainer = new PulsarContainer(DockerImageName.parse("apachepulsar/pulsar:latest"));
63+
64+
public static final String VALIDATION_STRING = "validation-string";
65+
66+
@DynamicPropertySource
67+
static void propertySettings(DynamicPropertyRegistry registry) {
68+
registry.add("pulsar.serviceUrl", pulsarContainer::getPulsarBrokerUrl);
69+
}
70+
71+
@Test
72+
void testProducerSendMethod() throws PulsarClientException {
73+
producer.send("topic-one", new MyMsg("bb"));
74+
75+
await().untilTrue(testConsumers.mockTopicListenerReceived);
76+
}
77+
78+
@Test
79+
void sharedSubscriptionOverride() throws Exception {
80+
final ConsumerBase<?> consumer = (ConsumerBase<?>) consumerAggregator.getConsumers().stream()
81+
.filter($ -> $.getTopic().equals(urlBuildService.buildTopicUrl(TestConsumers.SHARED_SUB_TEST)))
82+
.findFirst()
83+
.orElseThrow(() -> new Exception("Missing tested consumer."));
84+
85+
final Field f = ConsumerBase.class.getDeclaredField("conf");
86+
87+
f.setAccessible(true);
88+
89+
final ConsumerConfigurationData<?> conf = (ConsumerConfigurationData<?>) f.get(consumer);
90+
91+
Assertions.assertEquals(urlBuildService.buildTopicUrl(TestConsumers.SHARED_SUB_TEST), consumer.getTopic());
92+
Assertions.assertEquals(SubscriptionType.Shared, conf.getSubscriptionType());
93+
94+
producer.send(TestConsumers.SHARED_SUB_TEST, new MyMsg(VALIDATION_STRING));
95+
await().atMost(Duration.ofSeconds(10)).until(() -> testConsumers.subscribeToSharedTopicSubscription.get());
96+
}
97+
98+
@Test
99+
void exclusiveSubscriptionOverride() throws Exception {
100+
final ConsumerBase<?> consumer = (ConsumerBase<?>) consumerAggregator.getConsumers().stream()
101+
.filter($ -> $.getTopic().equals(urlBuildService.buildTopicUrl(TestConsumers.EXCLUSIVE_SUB_TEST)))
102+
.findFirst()
103+
.orElseThrow(() -> new Exception("Missing tested consumer."));
104+
105+
final Field f = ConsumerBase.class.getDeclaredField("conf");
106+
107+
f.setAccessible(true);
108+
109+
final ConsumerConfigurationData<?> conf = (ConsumerConfigurationData<?>) f.get(consumer);
110+
111+
Assertions.assertEquals(urlBuildService.buildTopicUrl(TestConsumers.EXCLUSIVE_SUB_TEST), consumer.getTopic());
112+
Assertions.assertEquals(SubscriptionType.Exclusive, conf.getSubscriptionType());
113+
114+
producer.send(TestConsumers.EXCLUSIVE_SUB_TEST, new MyMsg(VALIDATION_STRING));
115+
await().atMost(Duration.ofSeconds(10)).until(() -> testConsumers.subscribeToSharedTopicSubscription.get());
116+
}
117+
}

src/test/java/io/github/majusko/pulsar/PulsarJavaSpringBootStarterApplicationTests.java

Lines changed: 47 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,10 @@
1313
import org.apache.commons.lang3.tuple.ImmutablePair;
1414
import org.apache.pulsar.client.api.Consumer;
1515
import org.apache.pulsar.client.api.PulsarClientException;
16+
import org.apache.pulsar.client.api.SubscriptionType;
17+
import org.apache.pulsar.client.impl.ConsumerBase;
18+
import org.apache.pulsar.client.impl.ConsumerImpl;
19+
import org.apache.pulsar.client.impl.conf.ConsumerConfigurationData;
1620
import org.junit.jupiter.api.Assertions;
1721
import org.junit.jupiter.api.Test;
1822
import org.springframework.beans.factory.annotation.Autowired;
@@ -28,6 +32,7 @@
2832
import org.testcontainers.utility.DockerImageName;
2933
import reactor.core.Disposable;
3034

35+
import java.lang.reflect.Field;
3136
import java.nio.charset.StandardCharsets;
3237
import java.time.Duration;
3338
import java.util.*;
@@ -133,7 +138,7 @@ void testProducerCreateMessageMethod() throws PulsarClientException {
133138
void testConsumerRegistration1() throws Exception {
134139
final List<Consumer> consumers = consumerAggregator.getConsumers();
135140

136-
Assertions.assertEquals(14, consumers.size());
141+
Assertions.assertEquals(16, consumers.size());
137142

138143
final Consumer<?> consumer =
139144
consumers.stream().filter($ -> $.getTopic().equals(urlBuildService.buildTopicUrl("topic-one"))).findFirst().orElseThrow(Exception::new);
@@ -164,7 +169,7 @@ void testProducerRegistration() {
164169

165170
final Map<String, ImmutablePair<Class<?>, Serialization>> topics = producerFactory.getTopics();
166171

167-
Assertions.assertEquals(14, topics.size());
172+
Assertions.assertEquals(16, topics.size());
168173

169174
final Set<String> topicNames = new HashSet<>(topics.keySet());
170175

@@ -259,4 +264,44 @@ void consumerNamesOverrideWithSpELSupportTest() throws Exception {
259264
producer.send(TestConsumers.CUSTOM_SUB_AND_CONSUMER_TOPIC, new MyMsg(VALIDATION_STRING));
260265
await().atMost(Duration.ofSeconds(10)).until(() -> testConsumers.subscribeToCustomSpElConsumerAndSubConfig.get());
261266
}
267+
268+
@Test
269+
void sharedSubscriptionOverride() throws Exception {
270+
final ConsumerBase<?> consumer = (ConsumerBase<?>) consumerAggregator.getConsumers().stream()
271+
.filter($ -> $.getTopic().equals(urlBuildService.buildTopicUrl(TestConsumers.SHARED_SUB_TEST)))
272+
.findFirst()
273+
.orElseThrow(() -> new Exception("Missing tested consumer."));
274+
275+
final Field f = ConsumerBase.class.getDeclaredField("conf");
276+
277+
f.setAccessible(true);
278+
279+
final ConsumerConfigurationData<?> conf = (ConsumerConfigurationData<?>) f.get(consumer);
280+
281+
Assertions.assertEquals(urlBuildService.buildTopicUrl(TestConsumers.SHARED_SUB_TEST), consumer.getTopic());
282+
Assertions.assertEquals(SubscriptionType.Shared, conf.getSubscriptionType());
283+
284+
producer.send(TestConsumers.SHARED_SUB_TEST, new MyMsg(VALIDATION_STRING));
285+
await().atMost(Duration.ofSeconds(10)).until(() -> testConsumers.subscribeToSharedTopicSubscription.get());
286+
}
287+
288+
@Test
289+
void exclusiveSubscriptionOverride() throws Exception {
290+
final ConsumerBase<?> consumer = (ConsumerBase<?>) consumerAggregator.getConsumers().stream()
291+
.filter($ -> $.getTopic().equals(urlBuildService.buildTopicUrl(TestConsumers.EXCLUSIVE_SUB_TEST)))
292+
.findFirst()
293+
.orElseThrow(() -> new Exception("Missing tested consumer."));
294+
295+
final Field f = ConsumerBase.class.getDeclaredField("conf");
296+
297+
f.setAccessible(true);
298+
299+
final ConsumerConfigurationData<?> conf = (ConsumerConfigurationData<?>) f.get(consumer);
300+
301+
Assertions.assertEquals(urlBuildService.buildTopicUrl(TestConsumers.EXCLUSIVE_SUB_TEST), consumer.getTopic());
302+
Assertions.assertEquals(SubscriptionType.Exclusive, conf.getSubscriptionType());
303+
304+
producer.send(TestConsumers.EXCLUSIVE_SUB_TEST, new MyMsg(VALIDATION_STRING));
305+
await().atMost(Duration.ofSeconds(10)).until(() -> testConsumers.subscribeToSharedTopicSubscription.get());
306+
}
262307
}

0 commit comments

Comments
 (0)