From 9bd16e54fa13eade55c77d5fd7b7b0e2a5a59874 Mon Sep 17 00:00:00 2001 From: ninjazhou <843520313@qq.com> Date: Wed, 27 Aug 2025 16:17:11 +0800 Subject: [PATCH] Fix missing topic in dlq producer name when using RetryEnable option --- pulsar/consumer_impl.go | 10 +++++++++- pulsar/consumer_test.go | 11 +++++++++++ 2 files changed, 20 insertions(+), 1 deletion(-) diff --git a/pulsar/consumer_impl.go b/pulsar/consumer_impl.go index 5f6dac3d80..db4abcc476 100644 --- a/pulsar/consumer_impl.go +++ b/pulsar/consumer_impl.go @@ -181,7 +181,15 @@ func newConsumer(client *client, options ConsumerOptions) (Consumer, error) { } } - dlq, err := newDlqRouter(client, options.DLQ, options.Topic, options.SubscriptionName, options.Name, + var sourceTopic string + if options.RetryEnable && len(options.Topics) == 2 && options.Topics[1] == options.DLQ.RetryLetterTopic { + // when RetryEnable=true, options.Topic and RetryLetterTopic will be appended to the options.Topics + // we need to try to find previous options.Topic from options.Topics + sourceTopic = options.Topics[0] + } else { + sourceTopic = options.Topic + } + dlq, err := newDlqRouter(client, options.DLQ, sourceTopic, options.SubscriptionName, options.Name, options.BackOffPolicyFunc, client.log) if err != nil { return nil, err diff --git a/pulsar/consumer_test.go b/pulsar/consumer_test.go index 8a4ea3f34e..b85ad018a9 100644 --- a/pulsar/consumer_test.go +++ b/pulsar/consumer_test.go @@ -1965,6 +1965,7 @@ func TestRLQ(t *testing.T) { makeHTTPCall(t, http.MethodPut, testURL, "3") subName := fmt.Sprintf("sub01-%d", time.Now().Unix()) + consumerName := "my-consumer" maxRedeliveries := 2 N := 100 ctx := context.Background() @@ -2002,6 +2003,7 @@ func TestRLQ(t *testing.T) { rlqConsumer, err := client.Subscribe(ConsumerOptions{ Topic: topic, SubscriptionName: subName, + Name: consumerName, Type: Shared, SubscriptionInitialPosition: SubscriptionPositionEarliest, DLQ: &DLQPolicy{ @@ -2068,6 +2070,10 @@ func TestRLQ(t *testing.T) { assert.LessOrEqual(t, eventTimeList[0].Add(-2*time.Millisecond), msg.EventTime()) assert.LessOrEqual(t, msg.EventTime(), eventTimeList[N-1].Add(2*time.Millisecond)) + // check dlq produceName + regex := regexp.MustCompile(fmt.Sprintf("%s-%s-%s-[a-z]{5}-DLQ", topic, subName, consumerName)) + assert.True(t, regex.MatchString(msg.ProducerName())) + assert.Nil(t, err) dlqConsumer.Ack(msg) dlqReceived++ @@ -2360,6 +2366,7 @@ func TestRLQMultiTopics(t *testing.T) { topics := []string{topic01, topic02} subName := fmt.Sprintf("sub01-%d", time.Now().Unix()) + consumerName := "my-consumer" maxRedeliveries := 2 N := 100 ctx := context.Background() @@ -2372,6 +2379,7 @@ func TestRLQMultiTopics(t *testing.T) { rlqConsumer, err := client.Subscribe(ConsumerOptions{ Topics: topics, SubscriptionName: subName, + Name: consumerName, Type: Shared, SubscriptionInitialPosition: SubscriptionPositionEarliest, DLQ: &DLQPolicy{MaxDeliveries: uint32(maxRedeliveries)}, @@ -2426,9 +2434,12 @@ func TestRLQMultiTopics(t *testing.T) { // 3. Create consumer on the DLQ topic to verify the routing dlqReceived := 0 + // check dlq produceName + regex := regexp.MustCompile(fmt.Sprintf("%s-%s-%s-[a-z]{5}-DLQ", "", subName, consumerName)) for dlqReceived < 2*N { msg, err := dlqConsumer.Receive(ctx) assert.Nil(t, err) + assert.True(t, regex.MatchString(msg.ProducerName())) dlqConsumer.Ack(msg) dlqReceived++ }