Skip to content
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 8 additions & 0 deletions pulsar/consumer.go
Original file line number Diff line number Diff line change
Expand Up @@ -75,6 +75,14 @@ type DLQPolicy struct {
// DeadLetterTopic specifies the name of the topic where the failing messages will be sent.
DeadLetterTopic string

// DeadLetterTopicProducerName specifies a name for the producer specifically for the DQL topic.
// If not assigned, the system will generate a globally unique name which can be access with
// Producer.ProducerName().
// When specifying a name, it is up to the user to ensure that, for a given topic, the producer name is unique
// across all Pulsar's clusters. Brokers will enforce that only a single producer a given name can be publishing on
// a topic.
DeadLetterTopicProducerName string

// ProducerOptions is the producer options to produce messages to the DLQ and RLQ topic
ProducerOptions ProducerOptions

Expand Down
175 changes: 175 additions & 0 deletions pulsar/consumer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1853,6 +1853,181 @@ func TestDeadLetterTopicWithInitialSubscription(t *testing.T) {

}

func TestWithoutDeadLetterTopicDeadLetterTopicProducerName(t *testing.T) {
ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
defer cancel()

client, err := NewClient(ClientOptions{
URL: lookupURL,
})

assert.Nil(t, err)
defer client.Close()

topic := "persistent://public/default/" + newTopicName()
subscriptionName := "default"
consumerName := "my-consumer"

dlqTopic := fmt.Sprintf("%s-%s-DLQ", topic, subscriptionName)
rlqTopic := fmt.Sprintf("%s-%s-RLQ", topic, subscriptionName)

producerName := "producer-name"
RLQProducerName := "rlq-producer-name"
DLQProducerName := "dlq-producer-name"

consumer, err := client.Subscribe(ConsumerOptions{
Topic: topic,
SubscriptionName: subscriptionName,
NackRedeliveryDelay: 1 * time.Millisecond,
Type: Shared,
DLQ: &DLQPolicy{
MaxDeliveries: 1,
RetryLetterTopic: rlqTopic,
DeadLetterTopic: dlqTopic,
DeadLetterTopicProducerName: DLQProducerName,
ProducerOptions: ProducerOptions{
Topic: rlqTopic,
Name: RLQProducerName,
},
},
Name: consumerName,
RetryEnable: true,
})
assert.Nil(t, err)
defer consumer.Close()

producer, err := client.CreateProducer(ProducerOptions{
Topic: topic,
Name: producerName,
})
assert.Nil(t, err)
defer producer.Close()

_, err = producer.Send(ctx, &ProducerMessage{
Payload: []byte("hello-0"),
})
assert.Nil(t, err)

// Validate the name of the original producer
msg, err := consumer.Receive(ctx)
assert.Nil(t, err)
assert.NotNil(t, msg)
assert.Equal(t, msg.ProducerName(), producerName)
consumer.ReconsumeLater(msg, 0)

// Validate the name of the RLQ producer
msg, err = consumer.Receive(ctx)
assert.Nil(t, err)
assert.NotNil(t, msg)
assert.Equal(t, msg.ProducerName(), RLQProducerName)
consumer.Nack(msg)

// Create DLQ consumer
dlqConsumer, err := client.Subscribe(ConsumerOptions{
Topic: dlqTopic,
SubscriptionName: subscriptionName,
})
assert.Nil(t, err)
defer dlqConsumer.Close()

// Validate the name of the DLQ producer
msg, err = dlqConsumer.Receive(ctx)
defer dlqConsumer.Nack(msg)

assert.Nil(t, err)
assert.NotNil(t, msg)
assert.Nil(t, err)
assert.Equal(t, msg.ProducerName(), DLQProducerName)
}

func TestWithDeadLetterTopicDeadLetterTopicProducerName(t *testing.T) {
ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
defer cancel()

client, err := NewClient(ClientOptions{
URL: lookupURL,
})

assert.Nil(t, err)
defer client.Close()

topic := "persistent://public/default/" + newTopicName()
subscriptionName := "default"
consumerName := "my-consumer"

dlqTopic := fmt.Sprintf("%s-%s-DLQ", topic, subscriptionName)
rlqTopic := fmt.Sprintf("%s-%s-RLQ", topic, subscriptionName)

producerName := "producer-name"
RLQProducerName := "rlq-producer-name"

consumer, err := client.Subscribe(ConsumerOptions{
Topic: topic,
SubscriptionName: subscriptionName,
NackRedeliveryDelay: 1 * time.Millisecond,
Type: Shared,
DLQ: &DLQPolicy{
MaxDeliveries: 1,
RetryLetterTopic: rlqTopic,
DeadLetterTopic: dlqTopic,
// Set no producer name for the DLQ explicitly
DeadLetterTopicProducerName: "",
ProducerOptions: ProducerOptions{
Topic: rlqTopic,
Name: RLQProducerName,
},
},
Name: consumerName,
RetryEnable: true,
})
assert.Nil(t, err)
defer consumer.Close()

producer, err := client.CreateProducer(ProducerOptions{
Topic: topic,
Name: producerName,
})
assert.Nil(t, err)
defer producer.Close()

_, err = producer.Send(ctx, &ProducerMessage{
Payload: []byte("hello-0"),
})
assert.Nil(t, err)

// Validate the name of the original producer
msg, err := consumer.Receive(ctx)
assert.Nil(t, err)
assert.NotNil(t, msg)
assert.Equal(t, msg.ProducerName(), producerName)
consumer.ReconsumeLater(msg, 0)

// Validate the name of the RLQ producer
msg, err = consumer.Receive(ctx)
assert.Nil(t, err)
assert.NotNil(t, msg)
assert.Equal(t, msg.ProducerName(), RLQProducerName)
consumer.Nack(msg)

// Create DLQ consumer
dlqConsumer, err := client.Subscribe(ConsumerOptions{
Topic: dlqTopic,
SubscriptionName: subscriptionName,
})
assert.Nil(t, err)
defer dlqConsumer.Close()

// Validate the name of the DLQ producer
msg, err = dlqConsumer.Receive(ctx)
defer dlqConsumer.Nack(msg)

assert.Nil(t, err)
assert.NotNil(t, msg)
assert.Nil(t, err)
regex := regexp.MustCompile(fmt.Sprintf("%s-%s-%s-[a-z]{5}-DLQ", topic, subscriptionName, consumerName))
assert.True(t, regex.MatchString(msg.ProducerName()))
}

func TestDLQMultiTopics(t *testing.T) {
client, err := NewClient(ClientOptions{
URL: lookupURL,
Expand Down
4 changes: 3 additions & 1 deletion pulsar/dlq_router.go
Original file line number Diff line number Diff line change
Expand Up @@ -161,8 +161,10 @@ func (r *dlqRouter) getProducer(schema Schema) Producer {
opt := r.policy.ProducerOptions
opt.Topic = r.policy.DeadLetterTopic
opt.Schema = schema
if opt.Name == "" {
if r.policy.DeadLetterTopicProducerName == "" {
opt.Name = fmt.Sprintf("%s-%s-%s-%s-DLQ", r.topicName, r.subscriptionName, r.consumerName, generateRandomName())
} else {
opt.Name = r.policy.DeadLetterTopicProducerName
}
opt.initialSubscriptionName = r.policy.InitialSubscriptionName

Expand Down