diff --git a/pulsar/consumer.go b/pulsar/consumer.go index d611c6916e..80bbf01c37 100644 --- a/pulsar/consumer.go +++ b/pulsar/consumer.go @@ -75,6 +75,14 @@ type DLQPolicy struct { // DeadLetterTopic specifies the name of the topic where the failing messages will be sent. DeadLetterTopic string + // DeadLetterTopicProducerName specifies a name for the producer specifically for the DLQ topic. + // If not assigned, the system will generate a globally unique name which can be access with + // Producer.ProducerName(). + // When specifying a name, it is up to the user to ensure that, for a given topic, the producer name is unique + // across all Pulsar's clusters. Brokers will enforce that only a single producer a given name can be publishing on + // a topic. + DeadLetterTopicProducerName string + // ProducerOptions is the producer options to produce messages to the DLQ and RLQ topic ProducerOptions ProducerOptions diff --git a/pulsar/consumer_test.go b/pulsar/consumer_test.go index b85ad018a9..c36f1f5925 100644 --- a/pulsar/consumer_test.go +++ b/pulsar/consumer_test.go @@ -1853,6 +1853,180 @@ func TestDeadLetterTopicWithInitialSubscription(t *testing.T) { } +func TestWithoutDeadLetterTopicDeadLetterTopicProducerName(t *testing.T) { + ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second) + defer cancel() + + client, err := NewClient(ClientOptions{ + URL: lookupURL, + }) + + assert.Nil(t, err) + defer client.Close() + + topic := "persistent://public/default/" + newTopicName() + subscriptionName := "default" + consumerName := "my-consumer" + + dlqTopic := fmt.Sprintf("%s-%s-DLQ", topic, subscriptionName) + rlqTopic := fmt.Sprintf("%s-%s-RLQ", topic, subscriptionName) + + producerName := "producer-name" + RLQProducerName := "rlq-producer-name" + DLQProducerName := "dlq-producer-name" + + consumer, err := client.Subscribe(ConsumerOptions{ + Topic: topic, + SubscriptionName: subscriptionName, + NackRedeliveryDelay: 1 * time.Millisecond, + Type: Shared, + DLQ: &DLQPolicy{ + MaxDeliveries: 1, + RetryLetterTopic: rlqTopic, + DeadLetterTopic: dlqTopic, + DeadLetterTopicProducerName: DLQProducerName, + ProducerOptions: ProducerOptions{ + Topic: rlqTopic, + Name: RLQProducerName, + }, + }, + Name: consumerName, + RetryEnable: true, + }) + assert.Nil(t, err) + defer consumer.Close() + + producer, err := client.CreateProducer(ProducerOptions{ + Topic: topic, + Name: producerName, + }) + assert.Nil(t, err) + defer producer.Close() + + _, err = producer.Send(ctx, &ProducerMessage{ + Payload: []byte("hello-0"), + }) + assert.Nil(t, err) + + // Validate the name of the original producer + msg, err := consumer.Receive(ctx) + assert.Nil(t, err) + assert.NotNil(t, msg) + assert.Equal(t, msg.ProducerName(), producerName) + consumer.ReconsumeLater(msg, 0) + + // Validate the name of the RLQ producer + msg, err = consumer.Receive(ctx) + assert.Nil(t, err) + assert.NotNil(t, msg) + assert.Equal(t, msg.ProducerName(), RLQProducerName) + consumer.Nack(msg) + + // Create DLQ consumer + dlqConsumer, err := client.Subscribe(ConsumerOptions{ + Topic: dlqTopic, + SubscriptionName: subscriptionName, + }) + assert.Nil(t, err) + defer dlqConsumer.Close() + + // Validate the name of the DLQ producer + msg, err = dlqConsumer.Receive(ctx) + defer dlqConsumer.Nack(msg) + + assert.Nil(t, err) + assert.NotNil(t, msg) + assert.Nil(t, err) + assert.Equal(t, msg.ProducerName(), DLQProducerName) +} + +func TestWithDeadLetterTopicDeadLetterTopicProducerName(t *testing.T) { + ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second) + defer cancel() + + client, err := NewClient(ClientOptions{ + URL: lookupURL, + }) + + assert.Nil(t, err) + defer client.Close() + + topic := "persistent://public/default/" + newTopicName() + subscriptionName := "default" + consumerName := "my-consumer" + + dlqTopic := fmt.Sprintf("%s-%s-DLQ", topic, subscriptionName) + rlqTopic := fmt.Sprintf("%s-%s-RLQ", topic, subscriptionName) + + producerName := "producer-name" + RLQProducerName := "rlq-producer-name" + + consumer, err := client.Subscribe(ConsumerOptions{ + Topic: topic, + SubscriptionName: subscriptionName, + NackRedeliveryDelay: 1 * time.Millisecond, + Type: Shared, + DLQ: &DLQPolicy{ + MaxDeliveries: 1, + RetryLetterTopic: rlqTopic, + DeadLetterTopic: dlqTopic, + // Set no producer name for the DLQ explicitly + DeadLetterTopicProducerName: "", + ProducerOptions: ProducerOptions{ + Topic: rlqTopic, + Name: RLQProducerName, + }, + }, + Name: consumerName, + RetryEnable: true, + }) + assert.Nil(t, err) + defer consumer.Close() + + producer, err := client.CreateProducer(ProducerOptions{ + Topic: topic, + Name: producerName, + }) + assert.Nil(t, err) + defer producer.Close() + + _, err = producer.Send(ctx, &ProducerMessage{ + Payload: []byte("hello-0"), + }) + assert.Nil(t, err) + + // Validate the name of the original producer + msg, err := consumer.Receive(ctx) + assert.Nil(t, err) + assert.NotNil(t, msg) + assert.Equal(t, msg.ProducerName(), producerName) + consumer.ReconsumeLater(msg, 0) + + // Validate the name of the RLQ producer + msg, err = consumer.Receive(ctx) + assert.Nil(t, err) + assert.NotNil(t, msg) + assert.Equal(t, msg.ProducerName(), RLQProducerName) + consumer.Nack(msg) + + // Create DLQ consumer + dlqConsumer, err := client.Subscribe(ConsumerOptions{ + Topic: dlqTopic, + SubscriptionName: subscriptionName, + }) + assert.Nil(t, err) + defer dlqConsumer.Close() + + // Validate the name of the DLQ producer + msg, err = dlqConsumer.Receive(ctx) + defer dlqConsumer.Nack(msg) + + assert.Nil(t, err) + assert.NotNil(t, msg) + regex := regexp.MustCompile(fmt.Sprintf("%s-%s-%s-[a-z]{5}-DLQ", topic, subscriptionName, consumerName)) + assert.True(t, regex.MatchString(msg.ProducerName())) +} + func TestDLQMultiTopics(t *testing.T) { client, err := NewClient(ClientOptions{ URL: lookupURL, diff --git a/pulsar/dlq_router.go b/pulsar/dlq_router.go index 3fef99a3a5..3bacca19e5 100644 --- a/pulsar/dlq_router.go +++ b/pulsar/dlq_router.go @@ -161,8 +161,10 @@ func (r *dlqRouter) getProducer(schema Schema) Producer { opt := r.policy.ProducerOptions opt.Topic = r.policy.DeadLetterTopic opt.Schema = schema - if opt.Name == "" { + if r.policy.DeadLetterTopicProducerName == "" { opt.Name = fmt.Sprintf("%s-%s-%s-%s-DLQ", r.topicName, r.subscriptionName, r.consumerName, generateRandomName()) + } else { + opt.Name = r.policy.DeadLetterTopicProducerName } opt.initialSubscriptionName = r.policy.InitialSubscriptionName