Skip to content

Commit 859b52c

Browse files
authored
Merge pull request #85 from majusko/bugfix/dead-letter-topic-assigning
Fixed the redelivery topic assigning. Added test coverage.
2 parents 1a25056 + d7c6865 commit 859b52c

File tree

4 files changed

+75
-38
lines changed

4 files changed

+75
-38
lines changed

src/main/java/io/github/majusko/pulsar/consumer/ConsumerAggregator.java

Lines changed: 42 additions & 29 deletions
Original file line numberDiff line numberDiff line change
@@ -52,7 +52,7 @@ private void init() {
5252

5353
private Consumer<?> subscribe(String name, ConsumerHolder holder) {
5454
try {
55-
final ConsumerBuilder<?> clientBuilder = pulsarClient
55+
final ConsumerBuilder<?> consumerBuilder = pulsarClient
5656
.newConsumer(SchemaUtils.getSchema(holder.getAnnotation().serialization(),
5757
holder.getAnnotation().clazz()))
5858
.consumerName("consumer-" + name)
@@ -67,18 +67,7 @@ private Consumer<?> subscribe(String name, ConsumerHolder holder) {
6767
method.setAccessible(true);
6868

6969
if (holder.isWrapped()) {
70-
PulsarMessage pulsarMessage = new PulsarMessage();
71-
pulsarMessage.setValue(msg.getValue());
72-
pulsarMessage.setMessageId(msg.getMessageId());
73-
pulsarMessage.setSequenceId(msg.getSequenceId());
74-
pulsarMessage.setProperties(msg.getProperties());
75-
pulsarMessage.setTopicName(msg.getTopicName());
76-
pulsarMessage.setKey(msg.getKey());
77-
pulsarMessage.setEventTime(msg.getEventTime());
78-
pulsarMessage.setPublishTime(msg.getPublishTime());
79-
pulsarMessage.setProducerName(msg.getProducerName());
80-
81-
method.invoke(holder.getBean(), pulsarMessage);
70+
method.invoke(holder.getBean(), wrapMessage(msg));
8271
} else {
8372
method.invoke(holder.getBean(), msg.getValue());
8473
}
@@ -90,32 +79,56 @@ private Consumer<?> subscribe(String name, ConsumerHolder holder) {
9079
}
9180
});
9281

93-
if (consumerProperties.getDeadLetterPolicyMaxRedeliverCount() >= 0) {
94-
clientBuilder.deadLetterPolicy(DeadLetterPolicy.builder().maxRedeliverCount(consumerProperties.getDeadLetterPolicyMaxRedeliverCount()).build());
95-
}
96-
9782
if (consumerProperties.getAckTimeoutMs() > 0) {
98-
clientBuilder.ackTimeout(consumerProperties.getAckTimeoutMs(), TimeUnit.MILLISECONDS);
83+
consumerBuilder.ackTimeout(consumerProperties.getAckTimeoutMs(), TimeUnit.MILLISECONDS);
9984
}
10085

101-
if (holder.getAnnotation().maxRedeliverCount() >= 0) {
102-
final DeadLetterPolicy.DeadLetterPolicyBuilder deadLetterBuilder = DeadLetterPolicy.builder();
86+
buildDeadLetterPolicy(holder, consumerBuilder);
10387

104-
deadLetterBuilder.maxRedeliverCount(holder.getAnnotation().maxRedeliverCount());
88+
return consumerBuilder.subscribe();
89+
} catch (PulsarClientException e) {
90+
throw new ConsumerInitException("Failed to init consumer.", e);
91+
}
92+
}
10593

106-
if (!holder.getAnnotation().deadLetterTopic().isEmpty()) {
107-
deadLetterBuilder.deadLetterTopic(holder.getAnnotation().deadLetterTopic());
108-
}
94+
public void buildDeadLetterPolicy(ConsumerHolder holder, ConsumerBuilder<?> consumerBuilder) {
95+
DeadLetterPolicy.DeadLetterPolicyBuilder deadLetterBuilder = null;
10996

110-
clientBuilder.deadLetterPolicy(deadLetterBuilder.build());
111-
}
97+
if (consumerProperties.getDeadLetterPolicyMaxRedeliverCount() >= 0) {
98+
deadLetterBuilder =
99+
DeadLetterPolicy.builder().maxRedeliverCount(consumerProperties.getDeadLetterPolicyMaxRedeliverCount());
100+
}
112101

113-
return clientBuilder.subscribe();
114-
} catch (PulsarClientException e) {
115-
throw new ConsumerInitException("Failed to init consumer.", e);
102+
if (holder.getAnnotation().maxRedeliverCount() >= 0) {
103+
deadLetterBuilder =
104+
DeadLetterPolicy.builder().maxRedeliverCount(holder.getAnnotation().maxRedeliverCount());
105+
}
106+
107+
if (deadLetterBuilder != null && !holder.getAnnotation().deadLetterTopic().isEmpty()) {
108+
deadLetterBuilder.deadLetterTopic(topicUrlService.buildTopicUrl(holder.getAnnotation().deadLetterTopic()));
109+
}
110+
111+
if (deadLetterBuilder != null) {
112+
consumerBuilder.deadLetterPolicy(deadLetterBuilder.build());
116113
}
117114
}
118115

116+
public <T> PulsarMessage<T> wrapMessage(Message<T> message) {
117+
final PulsarMessage<T> pulsarMessage = new PulsarMessage<T>();
118+
119+
pulsarMessage.setValue(message.getValue());
120+
pulsarMessage.setMessageId(message.getMessageId());
121+
pulsarMessage.setSequenceId(message.getSequenceId());
122+
pulsarMessage.setProperties(message.getProperties());
123+
pulsarMessage.setTopicName(message.getTopicName());
124+
pulsarMessage.setKey(message.getKey());
125+
pulsarMessage.setEventTime(message.getEventTime());
126+
pulsarMessage.setPublishTime(message.getPublishTime());
127+
pulsarMessage.setProducerName(message.getProducerName());
128+
129+
return pulsarMessage;
130+
}
131+
119132
public List<Consumer> getConsumers() {
120133
return consumers;
121134
}

src/test/java/io/github/majusko/pulsar/PulsarJavaSpringBootStarterApplicationTests.java

Lines changed: 9 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -97,7 +97,7 @@ void testBasicDeadLetterRetryPolicy() throws PulsarClientException {
9797

9898
await().untilTrue(testConsumers.mockRetryCountListenerReceived);
9999

100-
Assertions.assertEquals(3, testConsumers.retryCount.get());
100+
Assertions.assertEquals(3, testConsumers.failTwiceRetryCount.get());
101101
}
102102

103103
@Test
@@ -125,7 +125,7 @@ void testProducerCreateMessageMethod() throws PulsarClientException {
125125
void testConsumerRegistration1() throws Exception {
126126
final List<Consumer> consumers = consumerAggregator.getConsumers();
127127

128-
Assertions.assertEquals(9, consumers.size());
128+
Assertions.assertEquals(11, consumers.size());
129129

130130
final Consumer<?> consumer =
131131
consumers.stream().filter($ -> $.getTopic().equals(topicUrlService.buildTopicUrl("topic-one"))).findFirst().orElseThrow(Exception::new);
@@ -156,7 +156,7 @@ void testProducerRegistration() {
156156

157157
final Map<String, ImmutablePair<Class<?>, Serialization>> topics = producerFactory.getTopics();
158158

159-
Assertions.assertEquals(10, topics.size());
159+
Assertions.assertEquals(11, topics.size());
160160

161161
final Set<String> topicNames = new HashSet<>(topics.keySet());
162162

@@ -211,4 +211,10 @@ void stringSerializationTestOk() throws Exception {
211211
producerForStringTopic.send("topic-string", VALIDATION_STRING);
212212
await().atMost(Duration.ofSeconds(10)).until(() -> testConsumers.stringTopicReceived.get());
213213
}
214+
215+
@Test
216+
void dealLetterTopicDelivery() throws Exception {
217+
producer.send("topic-deliver-to-dead-letter", new MyMsg(VALIDATION_STRING));
218+
await().atMost(Duration.ofSeconds(10)).until(() -> testConsumers.subscribeToDeadLetterTopicReceived.get());
219+
}
214220
}

src/test/java/io/github/majusko/pulsar/TestConsumers.java

Lines changed: 22 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -24,7 +24,9 @@ public class TestConsumers {
2424
public AtomicBoolean byteTopicReceived = new AtomicBoolean(false);
2525
public AtomicBoolean stringTopicReceived = new AtomicBoolean(false);
2626
public AtomicBoolean mockRetryCountListenerReceived = new AtomicBoolean(false);
27-
public AtomicInteger retryCount = new AtomicInteger(0);
27+
public AtomicBoolean subscribeToDeadLetterTopicReceived = new AtomicBoolean(false);
28+
public AtomicInteger failTwiceRetryCount = new AtomicInteger(0);
29+
public AtomicInteger topicOverflowDueToExceptionRetryCount = new AtomicInteger(0);
2830

2931
@PulsarConsumer(topic = "topic-one", clazz = MyMsg.class, serialization = Serialization.JSON)
3032
public void topicOneListener(MyMsg myMsg) {
@@ -85,20 +87,35 @@ public void topicMessageListener(PulsarMessage<MyMsg> myMsg) {
8587
mockTopicMessageListenerReceived.set(true);
8688
}
8789

88-
@PulsarConsumer(topic = "topic-retry", clazz = MyMsg.class, maxRedeliverCount = 3, subscriptionType = SubscriptionType.Shared, deadLetterTopic = "dead-letter-topic")
90+
@PulsarConsumer(topic = "topic-retry", clazz = MyMsg.class, maxRedeliverCount = 3, subscriptionType = SubscriptionType.Shared)
8991
public void failTwice(MyMsg myMsg) throws Exception {
90-
int retryAttempt = retryCount.getAndIncrement();
92+
int retryAttempt = failTwiceRetryCount.getAndIncrement();
9193

9294
if(retryAttempt < 2) {
9395
throw new Exception("Expected msg fail.");
9496
}
9597
Assertions.assertNotNull(myMsg);
9698
mockRetryCountListenerReceived.set(true);
99+
}
100+
97101

102+
@PulsarConsumer(topic = "topic-deliver-to-dead-letter", clazz = MyMsg.class, subscriptionType = SubscriptionType.Shared, deadLetterTopic = "custom-dead-letter-topic")
103+
public void topicOverflowDueToException(MyMsg myMsg) throws Exception {
104+
int retryAttempt = topicOverflowDueToExceptionRetryCount.getAndIncrement();
105+
106+
Assertions.assertNotNull(myMsg);
107+
Assertions.assertEquals(PulsarJavaSpringBootStarterApplicationTests.VALIDATION_STRING, myMsg.getData());
98108

109+
if(retryAttempt < 2) {
110+
throw new Exception("Expected msg fail.");
111+
}
112+
Assertions.fail();
99113
}
100114

101-
public static Serialization aa() {
102-
return Serialization.BYTE;
115+
@PulsarConsumer(topic = "custom-dead-letter-topic", clazz = MyMsg.class)
116+
public void subscribeToDeadLetterTopic(MyMsg myMsg) {
117+
Assertions.assertNotNull(myMsg);
118+
Assertions.assertEquals(PulsarJavaSpringBootStarterApplicationTests.VALIDATION_STRING, myMsg.getData());
119+
subscribeToDeadLetterTopicReceived.set(true);
103120
}
104121
}

src/test/java/io/github/majusko/pulsar/TestProducerConfiguration.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,7 @@ public ProducerFactory producerFactory() {
2424
.addProducer("topic-retry", MyMsg.class)
2525
.addProducer("topic-string", String.class, Serialization.STRING)
2626
.addProducer("topic-byte")
27-
.addProducer("topic-proto", ProtoMsg.class, Serialization.PROTOBUF);
27+
.addProducer("topic-proto", ProtoMsg.class, Serialization.PROTOBUF)
28+
.addProducer("topic-deliver-to-dead-letter", MyMsg.class);
2829
}
2930
}

0 commit comments

Comments
 (0)