Skip to content

Commit 4392621

Browse files
authored
Fix missing topic in dlq producer name when using RetryEnable option (#1412)
1 parent ba6468a commit 4392621

File tree

2 files changed

+20
-1
lines changed

2 files changed

+20
-1
lines changed

pulsar/consumer_impl.go

Lines changed: 9 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -181,7 +181,15 @@ func newConsumer(client *client, options ConsumerOptions) (Consumer, error) {
181181
}
182182
}
183183

184-
dlq, err := newDlqRouter(client, options.DLQ, options.Topic, options.SubscriptionName, options.Name,
184+
var sourceTopic string
185+
if options.RetryEnable && len(options.Topics) == 2 && options.Topics[1] == options.DLQ.RetryLetterTopic {
186+
// when RetryEnable=true, options.Topic and RetryLetterTopic will be appended to the options.Topics
187+
// we need to try to find previous options.Topic from options.Topics
188+
sourceTopic = options.Topics[0]
189+
} else {
190+
sourceTopic = options.Topic
191+
}
192+
dlq, err := newDlqRouter(client, options.DLQ, sourceTopic, options.SubscriptionName, options.Name,
185193
options.BackOffPolicyFunc, client.log)
186194
if err != nil {
187195
return nil, err

pulsar/consumer_test.go

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1965,6 +1965,7 @@ func TestRLQ(t *testing.T) {
19651965
makeHTTPCall(t, http.MethodPut, testURL, "3")
19661966

19671967
subName := fmt.Sprintf("sub01-%d", time.Now().Unix())
1968+
consumerName := "my-consumer"
19681969
maxRedeliveries := 2
19691970
N := 100
19701971
ctx := context.Background()
@@ -2002,6 +2003,7 @@ func TestRLQ(t *testing.T) {
20022003
rlqConsumer, err := client.Subscribe(ConsumerOptions{
20032004
Topic: topic,
20042005
SubscriptionName: subName,
2006+
Name: consumerName,
20052007
Type: Shared,
20062008
SubscriptionInitialPosition: SubscriptionPositionEarliest,
20072009
DLQ: &DLQPolicy{
@@ -2068,6 +2070,10 @@ func TestRLQ(t *testing.T) {
20682070
assert.LessOrEqual(t, eventTimeList[0].Add(-2*time.Millisecond), msg.EventTime())
20692071
assert.LessOrEqual(t, msg.EventTime(), eventTimeList[N-1].Add(2*time.Millisecond))
20702072

2073+
// check dlq produceName
2074+
regex := regexp.MustCompile(fmt.Sprintf("%s-%s-%s-[a-z]{5}-DLQ", topic, subName, consumerName))
2075+
assert.True(t, regex.MatchString(msg.ProducerName()))
2076+
20712077
assert.Nil(t, err)
20722078
dlqConsumer.Ack(msg)
20732079
dlqReceived++
@@ -2360,6 +2366,7 @@ func TestRLQMultiTopics(t *testing.T) {
23602366
topics := []string{topic01, topic02}
23612367

23622368
subName := fmt.Sprintf("sub01-%d", time.Now().Unix())
2369+
consumerName := "my-consumer"
23632370
maxRedeliveries := 2
23642371
N := 100
23652372
ctx := context.Background()
@@ -2372,6 +2379,7 @@ func TestRLQMultiTopics(t *testing.T) {
23722379
rlqConsumer, err := client.Subscribe(ConsumerOptions{
23732380
Topics: topics,
23742381
SubscriptionName: subName,
2382+
Name: consumerName,
23752383
Type: Shared,
23762384
SubscriptionInitialPosition: SubscriptionPositionEarliest,
23772385
DLQ: &DLQPolicy{MaxDeliveries: uint32(maxRedeliveries)},
@@ -2426,9 +2434,12 @@ func TestRLQMultiTopics(t *testing.T) {
24262434

24272435
// 3. Create consumer on the DLQ topic to verify the routing
24282436
dlqReceived := 0
2437+
// check dlq produceName
2438+
regex := regexp.MustCompile(fmt.Sprintf("%s-%s-%s-[a-z]{5}-DLQ", "", subName, consumerName))
24292439
for dlqReceived < 2*N {
24302440
msg, err := dlqConsumer.Receive(ctx)
24312441
assert.Nil(t, err)
2442+
assert.True(t, regex.MatchString(msg.ProducerName()))
24322443
dlqConsumer.Ack(msg)
24332444
dlqReceived++
24342445
}

0 commit comments

Comments
 (0)