Skip to content

Commit c4eb2d2

Browse files
authored
Merge pull request #81 from majusko/feature/topic-name-fix-tenancy-namespaces
Implemented the proper way of topic name creation with a support for …
2 parents 023d366 + 11575a9 commit c4eb2d2

File tree

10 files changed

+83
-23
lines changed

10 files changed

+83
-23
lines changed

README.md

Lines changed: 8 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -90,15 +90,18 @@ pulsar.operation-timeout-sec=15
9090
pulsar.starting-backoff-interval-ms=100
9191
pulsar.max-backoff-interval-sec=10
9292
pulsar.consumer-name-delimiter=
93+
pulsar.namespace=default
94+
pulsar.tenant=public
9395

9496
#Consumer
9597
pulsar.consumer.default.dead-letter-policy-max-redeliver-count=-1
9698
pulsar.consumer.default.ack-timeout-ms=30
9799

98100
```
99101

100-
Properties explained:
102+
###Properties explained:
101103

104+
####PulsarClient
102105
- `pulsar.service-url` - URL used to connect to pulsar cluster.
103106
- `pulsar.io-threads` - Number of threads to be used for handling connections to brokers.
104107
- `pulsar.listener-threads` - Set the number of threads to be used for message listeners/subscribers.
@@ -109,6 +112,10 @@ Properties explained:
109112
- `pulsar.starting-backoff-interval-ms` - Duration of time for a backoff interval (Retry algorithm).
110113
- `pulsar.max-backoff-interval-sec` - The maximum duration of time for a backoff interval (Retry algorithm).
111114
- `pulsar.consumer-name-delimiter` - Consumer names are connection of bean name and method with a delimiter. By default, there is no delimiter and words are connected together.
115+
- `pulsar.namespace` - Namespace separation. For example: app1/app2 OR dev/staging/prod. More in [Namespaces docs](https://pulsar.apache.org/docs/en/concepts-messaging/#namespaces).
116+
- `pulsar.tenant` - Pulsar multi-tenancy support. More in [Multi Tenancy docs](https://pulsar.apache.org/docs/en/concepts-multi-tenancy/).
117+
118+
####Consumer
112119
- `pulsar.consumer.default.dead-letter-policy-max-redeliver-count` - How many times should pulsar try to retry sending the message to consumer.
113120
- `pulsar.consumer.default.ack-timeout-ms` - How soon should be the message acked and how soon will dead letter mechanism try to retry to send the message.
114121

src/main/java/io/github/majusko/pulsar/PulsarAutoConfiguration.java

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,7 @@
11
package io.github.majusko.pulsar;
22

3+
import io.github.majusko.pulsar.properties.ConsumerProperties;
4+
import io.github.majusko.pulsar.properties.PulsarProperties;
35
import org.apache.pulsar.client.api.PulsarClient;
46
import org.apache.pulsar.client.api.PulsarClientException;
57
import org.springframework.boot.autoconfigure.condition.ConditionalOnMissingBean;

src/main/java/io/github/majusko/pulsar/collector/ConsumerCollector.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -13,7 +13,7 @@
1313
import java.util.concurrent.ConcurrentHashMap;
1414
import java.util.stream.Collectors;
1515

16-
import static io.github.majusko.pulsar.PulsarSpringStarterUtils.getParameterType;
16+
import static io.github.majusko.pulsar.utils.SchemaUtils.getParameterType;
1717

1818
@Configuration
1919
public class ConsumerCollector implements BeanPostProcessor {

src/main/java/io/github/majusko/pulsar/consumer/ConsumerAggregator.java

Lines changed: 10 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -1,14 +1,13 @@
11
package io.github.majusko.pulsar.consumer;
22

3-
import com.google.protobuf.GeneratedMessageV3;
4-
import io.github.majusko.pulsar.ConsumerProperties;
53
import io.github.majusko.pulsar.PulsarMessage;
6-
import io.github.majusko.pulsar.PulsarSpringStarterUtils;
74
import io.github.majusko.pulsar.collector.ConsumerCollector;
85
import io.github.majusko.pulsar.collector.ConsumerHolder;
9-
import io.github.majusko.pulsar.constant.Serialization;
106
import io.github.majusko.pulsar.error.FailedMessage;
117
import io.github.majusko.pulsar.error.exception.ConsumerInitException;
8+
import io.github.majusko.pulsar.properties.ConsumerProperties;
9+
import io.github.majusko.pulsar.utils.SchemaUtils;
10+
import io.github.majusko.pulsar.utils.TopicUrlService;
1211
import org.apache.pulsar.client.api.*;
1312
import org.springframework.context.EmbeddedValueResolverAware;
1413
import org.springframework.context.annotation.DependsOn;
@@ -31,15 +30,17 @@ public class ConsumerAggregator implements EmbeddedValueResolverAware {
3130
private final ConsumerCollector consumerCollector;
3231
private final PulsarClient pulsarClient;
3332
private final ConsumerProperties consumerProperties;
33+
private final TopicUrlService topicUrlService;
3434

3535
private StringValueResolver stringValueResolver;
3636
private List<Consumer> consumers;
3737

3838
public ConsumerAggregator(ConsumerCollector consumerCollector, PulsarClient pulsarClient,
39-
ConsumerProperties consumerProperties) {
39+
ConsumerProperties consumerProperties, TopicUrlService topicUrlService) {
4040
this.consumerCollector = consumerCollector;
4141
this.pulsarClient = pulsarClient;
4242
this.consumerProperties = consumerProperties;
43+
this.topicUrlService = topicUrlService;
4344
}
4445

4546
@PostConstruct
@@ -52,11 +53,13 @@ private void init() {
5253
private Consumer<?> subscribe(String name, ConsumerHolder holder) {
5354
try {
5455
final ConsumerBuilder<?> clientBuilder = pulsarClient
55-
.newConsumer(PulsarSpringStarterUtils.getSchema(holder.getAnnotation().serialization(),
56+
.newConsumer(SchemaUtils.getSchema(holder.getAnnotation().serialization(),
5657
holder.getAnnotation().clazz()))
5758
.consumerName("consumer-" + name)
5859
.subscriptionName("subscription-" + name)
59-
.topic(stringValueResolver.resolveStringValue(holder.getAnnotation().topic()))
60+
.topic(topicUrlService
61+
.buildTopicUrl(stringValueResolver
62+
.resolveStringValue(holder.getAnnotation().topic())))
6063
.subscriptionType(holder.getAnnotation().subscriptionType())
6164
.messageListener((consumer, msg) -> {
6265
try {

src/main/java/io/github/majusko/pulsar/producer/ProducerCollector.java

Lines changed: 7 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1,9 +1,10 @@
11
package io.github.majusko.pulsar.producer;
22

3-
import io.github.majusko.pulsar.PulsarSpringStarterUtils;
3+
import io.github.majusko.pulsar.utils.SchemaUtils;
44
import io.github.majusko.pulsar.annotation.PulsarProducer;
55
import io.github.majusko.pulsar.collector.ProducerHolder;
66
import io.github.majusko.pulsar.error.exception.ProducerInitException;
7+
import io.github.majusko.pulsar.utils.TopicUrlService;
78
import org.apache.pulsar.client.api.Producer;
89
import org.apache.pulsar.client.api.PulsarClient;
910
import org.apache.pulsar.client.api.PulsarClientException;
@@ -19,11 +20,13 @@
1920
public class ProducerCollector implements BeanPostProcessor {
2021

2122
private final PulsarClient pulsarClient;
23+
private final TopicUrlService topicUrlService;
2224

2325
private final Map<String, Producer> producers = new ConcurrentHashMap<>();
2426

25-
public ProducerCollector(PulsarClient pulsarClient) {
27+
public ProducerCollector(PulsarClient pulsarClient, TopicUrlService topicUrlService) {
2628
this.pulsarClient = pulsarClient;
29+
this.topicUrlService = topicUrlService;
2730
}
2831

2932
@Override
@@ -47,15 +50,15 @@ public Object postProcessAfterInitialization(Object bean, String beanName) {
4750
private Producer<?> buildProducer(ProducerHolder holder) {
4851
try {
4952
return pulsarClient.newProducer(getSchema(holder))
50-
.topic(holder.getTopic())
53+
.topic(topicUrlService.buildTopicUrl(holder.getTopic()))
5154
.create();
5255
} catch (PulsarClientException e) {
5356
throw new ProducerInitException("Failed to init producer.", e);
5457
}
5558
}
5659

5760
private <T> Schema<?> getSchema(ProducerHolder holder) throws RuntimeException {
58-
return PulsarSpringStarterUtils.getSchema(holder.getSerialization(), holder.getClazz());
61+
return SchemaUtils.getSchema(holder.getSerialization(), holder.getClazz());
5962
}
6063

6164
Map<String, Producer> getProducers() {

src/main/java/io/github/majusko/pulsar/ConsumerProperties.java renamed to src/main/java/io/github/majusko/pulsar/properties/ConsumerProperties.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
1-
package io.github.majusko.pulsar;
1+
package io.github.majusko.pulsar.properties;
22

33
import org.springframework.boot.context.properties.ConfigurationProperties;
44

src/main/java/io/github/majusko/pulsar/PulsarProperties.java renamed to src/main/java/io/github/majusko/pulsar/properties/PulsarProperties.java

Lines changed: 19 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
1-
package io.github.majusko.pulsar;
1+
package io.github.majusko.pulsar.properties;
22

33
import org.springframework.boot.context.properties.ConfigurationProperties;
44

@@ -14,6 +14,8 @@ public class PulsarProperties {
1414
private Integer startingBackoffIntervalMs = 100;
1515
private Integer maxBackoffIntervalSec = 10;
1616
private String consumerNameDelimiter = "";
17+
private String namespace = "default";
18+
private String tenant = "public";
1719

1820
public String getServiceUrl() {
1921
return serviceUrl;
@@ -94,4 +96,20 @@ public String getConsumerNameDelimiter() {
9496
public void setConsumerNameDelimiter(String consumerNameDelimiter) {
9597
this.consumerNameDelimiter = consumerNameDelimiter;
9698
}
99+
100+
public String getNamespace() {
101+
return namespace;
102+
}
103+
104+
public void setNamespace(String namespace) {
105+
this.namespace = namespace;
106+
}
107+
108+
public String getTenant() {
109+
return tenant;
110+
}
111+
112+
public void setTenant(String tenant) {
113+
this.tenant = tenant;
114+
}
97115
}

src/main/java/io/github/majusko/pulsar/PulsarSpringStarterUtils.java renamed to src/main/java/io/github/majusko/pulsar/utils/SchemaUtils.java

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
1-
package io.github.majusko.pulsar;
1+
package io.github.majusko.pulsar.utils;
22

33
import com.google.protobuf.GeneratedMessageV3;
44
import io.github.majusko.pulsar.constant.Serialization;
@@ -7,9 +7,9 @@
77

88
import java.lang.reflect.Method;
99

10-
public class PulsarSpringStarterUtils {
10+
public class SchemaUtils {
1111

12-
private PulsarSpringStarterUtils() {
12+
private SchemaUtils() {
1313
}
1414

1515
private static <T> Schema<?> getGenericSchema(Serialization serialization, Class<T> clazz) throws RuntimeException {
Lines changed: 23 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,23 @@
1+
package io.github.majusko.pulsar.utils;
2+
3+
import io.github.majusko.pulsar.properties.PulsarProperties;
4+
import org.springframework.stereotype.Service;
5+
6+
@Service
7+
public class TopicUrlService {
8+
9+
private static final String PERSISTENT_PREFIX = "persistent";
10+
private static final String NON_PERSISTENT_PREFIX = "non-persistent";
11+
private static final String DEFAULT_PERSISTENCE = PERSISTENT_PREFIX;
12+
13+
private final PulsarProperties pulsarProperties;
14+
15+
private TopicUrlService(PulsarProperties pulsarProperties) {
16+
this.pulsarProperties = pulsarProperties;
17+
}
18+
19+
public String buildTopicUrl(String topic) {
20+
return DEFAULT_PERSISTENCE + "://" + pulsarProperties.getTenant() + "/" + pulsarProperties.getNamespace() +
21+
"/" + topic;
22+
}
23+
}

src/test/java/io/github/majusko/pulsar/PulsarJavaSpringBootStarterApplicationTests.java

Lines changed: 9 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,7 @@
99
import io.github.majusko.pulsar.msg.ProtoMsg;
1010
import io.github.majusko.pulsar.producer.ProducerFactory;
1111
import io.github.majusko.pulsar.producer.PulsarTemplate;
12+
import io.github.majusko.pulsar.utils.TopicUrlService;
1213
import org.apache.commons.lang3.tuple.ImmutablePair;
1314
import org.apache.pulsar.client.api.Consumer;
1415
import org.apache.pulsar.client.api.PulsarClientException;
@@ -66,12 +67,15 @@ class PulsarJavaSpringBootStarterApplicationTests {
6667
@Autowired
6768
private PulsarTemplate<String> producerForStringTopic;
6869

69-
@Container
70-
static PulsarContainer pulsarContainer = new PulsarContainer();
71-
7270
@Autowired
7371
private TestConsumers testConsumers;
7472

73+
@Autowired
74+
private TopicUrlService topicUrlService;
75+
76+
@Container
77+
static PulsarContainer pulsarContainer = new PulsarContainer();
78+
7579
public static final String VALIDATION_STRING = "validation-string";
7680

7781
@DynamicPropertySource
@@ -124,7 +128,7 @@ void testConsumerRegistration1() throws Exception {
124128
Assertions.assertEquals(9, consumers.size());
125129

126130
final Consumer<?> consumer =
127-
consumers.stream().filter($ -> $.getTopic().equals("topic-one")).findFirst().orElseThrow(Exception::new);
131+
consumers.stream().filter($ -> $.getTopic().equals(topicUrlService.buildTopicUrl("topic-one"))).findFirst().orElseThrow(Exception::new);
128132

129133
Assertions.assertNotNull(consumer);
130134
}
@@ -165,7 +169,7 @@ void testMessageErrorHandling() throws PulsarClientException {
165169
final AtomicBoolean receivedError = new AtomicBoolean(false);
166170
final String messageToSend = "This message will never arrive.";
167171
final Disposable disposable = consumerAggregator.onError(($) -> {
168-
Assertions.assertEquals($.getConsumer().getTopic(), "topic-for-error");
172+
Assertions.assertEquals($.getConsumer().getTopic(), topicUrlService.buildTopicUrl("topic-for-error"));
169173
Assertions.assertEquals($.getMessage().getValue(), messageToSend);
170174
Assertions.assertNotNull($.getException());
171175

0 commit comments

Comments
 (0)