Skip to content

Commit 4e13822

Browse files
authored
[fix] Fix DLQ producer name conflicts when multiples consumers send messages to DLQ (#1156)
### Motivation To keep consistent with the Java client. Releted PR: apache/pulsar#21890 ### Modifications Set DLQ producerName `fmt.Sprintf("%s-%s-%s-DLQ", r.topicName, r.subscriptionName, r.consumerName)`
1 parent 9347690 commit 4e13822

File tree

5 files changed

+14
-8
lines changed

5 files changed

+14
-8
lines changed

pulsar/consumer_impl.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -167,7 +167,7 @@ func newConsumer(client *client, options ConsumerOptions) (Consumer, error) {
167167
}
168168
}
169169

170-
dlq, err := newDlqRouter(client, options.DLQ, options.Topic, options.SubscriptionName, client.log)
170+
dlq, err := newDlqRouter(client, options.DLQ, options.Topic, options.SubscriptionName, options.Name, client.log)
171171
if err != nil {
172172
return nil, err
173173
}

pulsar/consumer_regex_test.go

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -152,9 +152,10 @@ func runRegexConsumerDiscoverPatternAll(t *testing.T, c Client, namespace string
152152
opts := ConsumerOptions{
153153
SubscriptionName: "regex-sub",
154154
AutoDiscoveryPeriod: 5 * time.Minute,
155+
Name: "regex-consumer",
155156
}
156157

157-
dlq, _ := newDlqRouter(c.(*client), nil, tn.Topic, "regex-sub", log.DefaultNopLogger())
158+
dlq, _ := newDlqRouter(c.(*client), nil, tn.Topic, "regex-sub", "regex-consumer", log.DefaultNopLogger())
158159
rlq, _ := newRetryRouter(c.(*client), nil, false, log.DefaultNopLogger())
159160
consumer, err := newRegexConsumer(c.(*client), opts, tn, pattern, make(chan ConsumerMessage, 1), dlq, rlq)
160161
if err != nil {
@@ -190,9 +191,10 @@ func runRegexConsumerDiscoverPatternFoo(t *testing.T, c Client, namespace string
190191
opts := ConsumerOptions{
191192
SubscriptionName: "regex-sub",
192193
AutoDiscoveryPeriod: 5 * time.Minute,
194+
Name: "regex-consumer",
193195
}
194196

195-
dlq, _ := newDlqRouter(c.(*client), nil, tn.Topic, "regex-sub", log.DefaultNopLogger())
197+
dlq, _ := newDlqRouter(c.(*client), nil, tn.Topic, "regex-sub", "regex-consumer", log.DefaultNopLogger())
196198
rlq, _ := newRetryRouter(c.(*client), nil, false, log.DefaultNopLogger())
197199
consumer, err := newRegexConsumer(c.(*client), opts, tn, pattern, make(chan ConsumerMessage, 1), dlq, rlq)
198200
if err != nil {

pulsar/consumer_test.go

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1449,13 +1449,15 @@ func DLQWithProducerOptions(t *testing.T, prodOpt *ProducerOptions) {
14491449
if prodOpt != nil {
14501450
dlqPolicy.ProducerOptions = *prodOpt
14511451
}
1452-
sub := "my-sub"
1452+
sub, consumerName := "my-sub", "my-consumer"
1453+
14531454
consumer, err := client.Subscribe(ConsumerOptions{
14541455
Topic: topic,
14551456
SubscriptionName: sub,
14561457
NackRedeliveryDelay: 1 * time.Second,
14571458
Type: Shared,
14581459
DLQ: &dlqPolicy,
1460+
Name: consumerName,
14591461
})
14601462
assert.Nil(t, err)
14611463
defer consumer.Close()
@@ -1508,7 +1510,7 @@ func DLQWithProducerOptions(t *testing.T, prodOpt *ProducerOptions) {
15081510
assert.Equal(t, []byte(expectMsg), msg.Payload())
15091511

15101512
// check dql produceName
1511-
assert.Equal(t, msg.ProducerName(), fmt.Sprintf("%s-%s-DLQ", topic, sub))
1513+
assert.Equal(t, msg.ProducerName(), fmt.Sprintf("%s-%s-%s-DLQ", topic, sub, consumerName))
15121514

15131515
// check original messageId
15141516
assert.NotEmpty(t, msg.Properties()[PropertyOriginMessageID])

pulsar/dlq_router.go

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -34,16 +34,18 @@ type dlqRouter struct {
3434
closeCh chan interface{}
3535
topicName string
3636
subscriptionName string
37+
consumerName string
3738
log log.Logger
3839
}
3940

40-
func newDlqRouter(client Client, policy *DLQPolicy, topicName, subscriptionName string,
41+
func newDlqRouter(client Client, policy *DLQPolicy, topicName, subscriptionName, consumerName string,
4142
logger log.Logger) (*dlqRouter, error) {
4243
r := &dlqRouter{
4344
client: client,
4445
policy: policy,
4546
topicName: topicName,
4647
subscriptionName: subscriptionName,
48+
consumerName: consumerName,
4749
log: logger,
4850
}
4951

@@ -159,7 +161,7 @@ func (r *dlqRouter) getProducer(schema Schema) Producer {
159161
opt.Topic = r.policy.DeadLetterTopic
160162
opt.Schema = schema
161163
if opt.Name == "" {
162-
opt.Name = fmt.Sprintf("%s-%s-DLQ", r.topicName, r.subscriptionName)
164+
opt.Name = fmt.Sprintf("%s-%s-%s-DLQ", r.topicName, r.subscriptionName, r.consumerName)
163165
}
164166

165167
// the origin code sets to LZ4 compression with no options

pulsar/reader_impl.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -127,7 +127,7 @@ func newReader(client *client, options ReaderOptions) (Reader, error) {
127127
}
128128

129129
// Provide dummy dlq router with not dlq policy
130-
dlq, err := newDlqRouter(client, nil, options.Topic, options.SubscriptionName, client.log)
130+
dlq, err := newDlqRouter(client, nil, options.Topic, options.SubscriptionName, options.Name, client.log)
131131
if err != nil {
132132
return nil, err
133133
}

0 commit comments

Comments
 (0)