From 843e991c952305160be8eac477095a27b6b62657 Mon Sep 17 00:00:00 2001 From: Zike Yang Date: Mon, 29 Jan 2024 11:42:26 +0800 Subject: [PATCH 1/4] Fix incorrect ledgerID and entryID logged when producer receives unexpected ack --- pulsar/producer_partition.go | 8 ++++---- pulsar/producer_test.go | 30 ++++++++++++++++++++++++++++++ 2 files changed, 34 insertions(+), 4 deletions(-) diff --git a/pulsar/producer_partition.go b/pulsar/producer_partition.go index 1b79053e38..b8bf999e97 100755 --- a/pulsar/producer_partition.go +++ b/pulsar/producer_partition.go @@ -1274,14 +1274,14 @@ func (p *partitionProducer) ReceivedSendReceipt(response *pb.CommandSendReceipt) if pi.sequenceID < response.GetSequenceId() { // Force connection closing so that messages can be re-transmitted in a new connection - p.log.Warnf("Received ack for %v on sequenceId %v - expected: %v, local < remote, closing connection", - response.GetMessageId(), response.GetSequenceId(), pi.sequenceID) + p.log.Warnf("Received ack for ledgerId:%d entryId:%d on sequenceId %v - expected: %v, local < remote, closing connection", + int64(*response.GetMessageId().LedgerId), int64(*response.GetMessageId().EntryId), response.GetSequenceId(), pi.sequenceID) p._getConn().Close() return } else if pi.sequenceID > response.GetSequenceId() { // Ignoring the ack since it's referring to a message that has already timed out. - p.log.Warnf("Received ack for %v on sequenceId %v - expected: %v, local > remote, ignore it", - response.GetMessageId(), response.GetSequenceId(), pi.sequenceID) + p.log.Warnf("Received ack for ledgerId:%d entryId:%d on sequenceId %v - expected: %v, local > remote, ignore it", + int64(*response.GetMessageId().LedgerId), int64(*response.GetMessageId().EntryId), response.GetSequenceId(), pi.sequenceID) return } else { // The ack was indeed for the expected item in the queue, we can remove it and trigger the callback diff --git a/pulsar/producer_test.go b/pulsar/producer_test.go index 0d74cdeefe..6c1a247372 100644 --- a/pulsar/producer_test.go +++ b/pulsar/producer_test.go @@ -2357,6 +2357,36 @@ func TestFailPendingMessageWithClose(t *testing.T) { assert.Equal(t, 0, partitionProducerImp.pendingQueue.Size()) } +func TestProducerSendDuplicatedMessages(t *testing.T) { + client, err := NewClient(ClientOptions{ + URL: lookupURL, + }) + assert.NoError(t, err) + defer client.Close() + testProducer, err := client.CreateProducer(ProducerOptions{ + Topic: newTopicName(), + }) + + assert.NoError(t, err) + assert.NotNil(t, testProducer) + _, err = testProducer.Send(context.Background(), &ProducerMessage{ + Payload: make([]byte, 1024), + }) + assert.NoError(t, err) + for i := 0; i < 3; i++ { + var seqId int64 = 0 + msgId, err := testProducer.Send(context.Background(), &ProducerMessage{ + Payload: make([]byte, 1024), + SequenceID: &seqId, + }) + assert.NoError(t, err) + assert.NotNil(t, msgId) + assert.Equal(t, int64(-1), msgId.LedgerID()) + assert.Equal(t, int64(-1), msgId.EntryID()) + } + testProducer.Close() +} + type pendingQueueWrapper struct { pendingQueue internal.BlockingQueue writtenBuffers *[]internal.Buffer From ba6c9fc816e0d4ffc6341952ddd2d8018b0acc09 Mon Sep 17 00:00:00 2001 From: Zike Yang Date: Wed, 31 Jan 2024 17:06:02 +0800 Subject: [PATCH 2/4] fix lint --- pulsar/producer_partition.go | 12 ++++++++---- pulsar/producer_test.go | 10 +++++----- 2 files changed, 13 insertions(+), 9 deletions(-) diff --git a/pulsar/producer_partition.go b/pulsar/producer_partition.go index b8bf999e97..6cb1885305 100755 --- a/pulsar/producer_partition.go +++ b/pulsar/producer_partition.go @@ -1274,14 +1274,18 @@ func (p *partitionProducer) ReceivedSendReceipt(response *pb.CommandSendReceipt) if pi.sequenceID < response.GetSequenceId() { // Force connection closing so that messages can be re-transmitted in a new connection - p.log.Warnf("Received ack for ledgerId:%d entryId:%d on sequenceId %v - expected: %v, local < remote, closing connection", - int64(*response.GetMessageId().LedgerId), int64(*response.GetMessageId().EntryId), response.GetSequenceId(), pi.sequenceID) + p.log.Warnf("Received ack for ledgerId:%d entryId:%d on sequenceId %v - expected: %v, local < remote, "+ + "closing connection", + int64(*response.GetMessageId().LedgerId), int64(*response.GetMessageId().EntryId), response.GetSequenceId(), + pi.sequenceID) p._getConn().Close() return } else if pi.sequenceID > response.GetSequenceId() { // Ignoring the ack since it's referring to a message that has already timed out. - p.log.Warnf("Received ack for ledgerId:%d entryId:%d on sequenceId %v - expected: %v, local > remote, ignore it", - int64(*response.GetMessageId().LedgerId), int64(*response.GetMessageId().EntryId), response.GetSequenceId(), pi.sequenceID) + p.log.Warnf("Received ack for ledgerId:%d entryId:%d on sequenceId %v - expected: %v, local > remote, "+ + "ignore it", + int64(*response.GetMessageId().LedgerId), int64(*response.GetMessageId().EntryId), response.GetSequenceId(), + pi.sequenceID) return } else { // The ack was indeed for the expected item in the queue, we can remove it and trigger the callback diff --git a/pulsar/producer_test.go b/pulsar/producer_test.go index 6c1a247372..d3166c3c4d 100644 --- a/pulsar/producer_test.go +++ b/pulsar/producer_test.go @@ -2374,15 +2374,15 @@ func TestProducerSendDuplicatedMessages(t *testing.T) { }) assert.NoError(t, err) for i := 0; i < 3; i++ { - var seqId int64 = 0 - msgId, err := testProducer.Send(context.Background(), &ProducerMessage{ + var seqId int64 + msgID, err := testProducer.Send(context.Background(), &ProducerMessage{ Payload: make([]byte, 1024), SequenceID: &seqId, }) assert.NoError(t, err) - assert.NotNil(t, msgId) - assert.Equal(t, int64(-1), msgId.LedgerID()) - assert.Equal(t, int64(-1), msgId.EntryID()) + assert.NotNil(t, msgID) + assert.Equal(t, int64(-1), msgID.LedgerID()) + assert.Equal(t, int64(-1), msgID.EntryID()) } testProducer.Close() } From 0bea7cdd5ef478c036e550d63d5dc9b497c59c02 Mon Sep 17 00:00:00 2001 From: Zike Yang Date: Wed, 31 Jan 2024 17:09:07 +0800 Subject: [PATCH 3/4] fix lint --- pulsar/producer_test.go | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/pulsar/producer_test.go b/pulsar/producer_test.go index d3166c3c4d..9bea99ea59 100644 --- a/pulsar/producer_test.go +++ b/pulsar/producer_test.go @@ -2374,10 +2374,10 @@ func TestProducerSendDuplicatedMessages(t *testing.T) { }) assert.NoError(t, err) for i := 0; i < 3; i++ { - var seqId int64 + var seqID int64 msgID, err := testProducer.Send(context.Background(), &ProducerMessage{ Payload: make([]byte, 1024), - SequenceID: &seqId, + SequenceID: &seqID, }) assert.NoError(t, err) assert.NotNil(t, msgID) From 826696f2ab3f4d4bb81a6181ba5daaf8d5160c33 Mon Sep 17 00:00:00 2001 From: Zike Yang Date: Thu, 1 Feb 2024 15:08:03 +0800 Subject: [PATCH 4/4] Enable brokerDeduplication --- integration-tests/conf/standalone.conf | 2 ++ 1 file changed, 2 insertions(+) diff --git a/integration-tests/conf/standalone.conf b/integration-tests/conf/standalone.conf index c816c8fd16..5cc004ed1f 100644 --- a/integration-tests/conf/standalone.conf +++ b/integration-tests/conf/standalone.conf @@ -83,6 +83,8 @@ maxUnackedMessagesPerConsumer=50000 # Set maxMessageSize to 1MB rather than the default value 5MB for testing maxMessageSize=1048576 +brokerDeduplicationEnabled=true + ### --- Authentication --- ### # Enable TLS