diff --git a/integration-tests/conf/standalone.conf b/integration-tests/conf/standalone.conf index c816c8fd16..5cc004ed1f 100644 --- a/integration-tests/conf/standalone.conf +++ b/integration-tests/conf/standalone.conf @@ -83,6 +83,8 @@ maxUnackedMessagesPerConsumer=50000 # Set maxMessageSize to 1MB rather than the default value 5MB for testing maxMessageSize=1048576 +brokerDeduplicationEnabled=true + ### --- Authentication --- ### # Enable TLS diff --git a/pulsar/producer_partition.go b/pulsar/producer_partition.go index fbcc5b9776..268da5f94c 100755 --- a/pulsar/producer_partition.go +++ b/pulsar/producer_partition.go @@ -1274,14 +1274,18 @@ func (p *partitionProducer) ReceivedSendReceipt(response *pb.CommandSendReceipt) if pi.sequenceID < response.GetSequenceId() { // Force connection closing so that messages can be re-transmitted in a new connection - p.log.Warnf("Received ack for %v on sequenceId %v - expected: %v, local < remote, closing connection", - response.GetMessageId(), response.GetSequenceId(), pi.sequenceID) + p.log.Warnf("Received ack for ledgerId:%d entryId:%d on sequenceId %v - expected: %v, local < remote, "+ + "closing connection", + int64(*response.GetMessageId().LedgerId), int64(*response.GetMessageId().EntryId), response.GetSequenceId(), + pi.sequenceID) p._getConn().Close() return } else if pi.sequenceID > response.GetSequenceId() { // Ignoring the ack since it's referring to a message that has already timed out. - p.log.Warnf("Received ack for %v on sequenceId %v - expected: %v, local > remote, ignore it", - response.GetMessageId(), response.GetSequenceId(), pi.sequenceID) + p.log.Warnf("Received ack for ledgerId:%d entryId:%d on sequenceId %v - expected: %v, local > remote, "+ + "ignore it", + int64(*response.GetMessageId().LedgerId), int64(*response.GetMessageId().EntryId), response.GetSequenceId(), + pi.sequenceID) return } else { // The ack was indeed for the expected item in the queue, we can remove it and trigger the callback diff --git a/pulsar/producer_test.go b/pulsar/producer_test.go index ba5911565e..9eea9abec2 100644 --- a/pulsar/producer_test.go +++ b/pulsar/producer_test.go @@ -2385,6 +2385,36 @@ func TestSendConcurrently(t *testing.T) { wg.Wait() } +func TestProducerSendDuplicatedMessages(t *testing.T) { + client, err := NewClient(ClientOptions{ + URL: lookupURL, + }) + assert.NoError(t, err) + defer client.Close() + testProducer, err := client.CreateProducer(ProducerOptions{ + Topic: newTopicName(), + }) + + assert.NoError(t, err) + assert.NotNil(t, testProducer) + _, err = testProducer.Send(context.Background(), &ProducerMessage{ + Payload: make([]byte, 1024), + }) + assert.NoError(t, err) + for i := 0; i < 3; i++ { + var seqID int64 + msgID, err := testProducer.Send(context.Background(), &ProducerMessage{ + Payload: make([]byte, 1024), + SequenceID: &seqID, + }) + assert.NoError(t, err) + assert.NotNil(t, msgID) + assert.Equal(t, int64(-1), msgID.LedgerID()) + assert.Equal(t, int64(-1), msgID.EntryID()) + } + testProducer.Close() +} + type pendingQueueWrapper struct { pendingQueue internal.BlockingQueue writtenBuffers *[]internal.Buffer