Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 9 additions & 1 deletion pulsar/consumer_impl.go
Original file line number Diff line number Diff line change
Expand Up @@ -181,7 +181,15 @@ func newConsumer(client *client, options ConsumerOptions) (Consumer, error) {
}
}

dlq, err := newDlqRouter(client, options.DLQ, options.Topic, options.SubscriptionName, options.Name,
var sourceTopic string
if options.RetryEnable && len(options.Topics) == 2 && options.Topics[1] == options.DLQ.RetryLetterTopic {
// when RetryEnable=true, options.Topic and RetryLetterTopic will be appended to the options.Topics
// we need to try to find previous options.Topic from options.Topics
sourceTopic = options.Topics[0]
} else {
sourceTopic = options.Topic
}
dlq, err := newDlqRouter(client, options.DLQ, sourceTopic, options.SubscriptionName, options.Name,
options.BackOffPolicyFunc, client.log)
if err != nil {
return nil, err
Expand Down
11 changes: 11 additions & 0 deletions pulsar/consumer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1965,6 +1965,7 @@ func TestRLQ(t *testing.T) {
makeHTTPCall(t, http.MethodPut, testURL, "3")

subName := fmt.Sprintf("sub01-%d", time.Now().Unix())
consumerName := "my-consumer"
maxRedeliveries := 2
N := 100
ctx := context.Background()
Expand Down Expand Up @@ -2002,6 +2003,7 @@ func TestRLQ(t *testing.T) {
rlqConsumer, err := client.Subscribe(ConsumerOptions{
Topic: topic,
SubscriptionName: subName,
Name: consumerName,
Type: Shared,
SubscriptionInitialPosition: SubscriptionPositionEarliest,
DLQ: &DLQPolicy{
Expand Down Expand Up @@ -2068,6 +2070,10 @@ func TestRLQ(t *testing.T) {
assert.LessOrEqual(t, eventTimeList[0].Add(-2*time.Millisecond), msg.EventTime())
assert.LessOrEqual(t, msg.EventTime(), eventTimeList[N-1].Add(2*time.Millisecond))

// check dlq produceName
regex := regexp.MustCompile(fmt.Sprintf("%s-%s-%s-[a-z]{5}-DLQ", topic, subName, consumerName))
assert.True(t, regex.MatchString(msg.ProducerName()))

assert.Nil(t, err)
dlqConsumer.Ack(msg)
dlqReceived++
Expand Down Expand Up @@ -2360,6 +2366,7 @@ func TestRLQMultiTopics(t *testing.T) {
topics := []string{topic01, topic02}

subName := fmt.Sprintf("sub01-%d", time.Now().Unix())
consumerName := "my-consumer"
maxRedeliveries := 2
N := 100
ctx := context.Background()
Expand All @@ -2372,6 +2379,7 @@ func TestRLQMultiTopics(t *testing.T) {
rlqConsumer, err := client.Subscribe(ConsumerOptions{
Topics: topics,
SubscriptionName: subName,
Name: consumerName,
Type: Shared,
SubscriptionInitialPosition: SubscriptionPositionEarliest,
DLQ: &DLQPolicy{MaxDeliveries: uint32(maxRedeliveries)},
Expand Down Expand Up @@ -2426,9 +2434,12 @@ func TestRLQMultiTopics(t *testing.T) {

// 3. Create consumer on the DLQ topic to verify the routing
dlqReceived := 0
// check dlq produceName
regex := regexp.MustCompile(fmt.Sprintf("%s-%s-%s-[a-z]{5}-DLQ", "", subName, consumerName))
for dlqReceived < 2*N {
msg, err := dlqConsumer.Receive(ctx)
assert.Nil(t, err)
assert.True(t, regex.MatchString(msg.ProducerName()))
dlqConsumer.Ack(msg)
dlqReceived++
}
Expand Down