Skip to content

Commit 279e1d7

Browse files
authored
[Issue 1297][consumer] Fix DLQ producer name conflicts when there are same name consumers (#1314)
1 parent 7bbb5b2 commit 279e1d7

File tree

2 files changed

+5
-3
lines changed

2 files changed

+5
-3
lines changed

pulsar/consumer_test.go

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,7 @@ import (
2424
"log"
2525
"net/http"
2626
"os"
27+
"regexp"
2728
"strconv"
2829
"sync"
2930
"sync/atomic"
@@ -1521,8 +1522,9 @@ func DLQWithProducerOptions(t *testing.T, prodOpt *ProducerOptions) {
15211522
expectMsg := fmt.Sprintf("hello-%d", expectedMsgIdx)
15221523
assert.Equal(t, []byte(expectMsg), msg.Payload())
15231524

1524-
// check dql produceName
1525-
assert.Equal(t, msg.ProducerName(), fmt.Sprintf("%s-%s-%s-DLQ", topic, sub, consumerName))
1525+
// check dlq produceName
1526+
regex := regexp.MustCompile(fmt.Sprintf("%s-%s-%s-[a-z]{5}-DLQ", topic, sub, consumerName))
1527+
assert.True(t, regex.MatchString(msg.ProducerName()))
15261528

15271529
// check original messageId
15281530
assert.NotEmpty(t, msg.Properties()[PropertyOriginMessageID])

pulsar/dlq_router.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -172,7 +172,7 @@ func (r *dlqRouter) getProducer(schema Schema) Producer {
172172
opt.Topic = r.policy.DeadLetterTopic
173173
opt.Schema = schema
174174
if opt.Name == "" {
175-
opt.Name = fmt.Sprintf("%s-%s-%s-DLQ", r.topicName, r.subscriptionName, r.consumerName)
175+
opt.Name = fmt.Sprintf("%s-%s-%s-%s-DLQ", r.topicName, r.subscriptionName, r.consumerName, generateRandomName())
176176
}
177177
opt.initialSubscriptionName = r.policy.InitialSubscriptionName
178178

0 commit comments

Comments
 (0)