Skip to content

Commit 843e991

Browse files
committed
Fix incorrect ledgerID and entryID logged when producer receives unexpected ack
1 parent 3a1a766 commit 843e991

File tree

2 files changed

+34
-4
lines changed

2 files changed

+34
-4
lines changed

pulsar/producer_partition.go

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1274,14 +1274,14 @@ func (p *partitionProducer) ReceivedSendReceipt(response *pb.CommandSendReceipt)
12741274

12751275
if pi.sequenceID < response.GetSequenceId() {
12761276
// Force connection closing so that messages can be re-transmitted in a new connection
1277-
p.log.Warnf("Received ack for %v on sequenceId %v - expected: %v, local < remote, closing connection",
1278-
response.GetMessageId(), response.GetSequenceId(), pi.sequenceID)
1277+
p.log.Warnf("Received ack for ledgerId:%d entryId:%d on sequenceId %v - expected: %v, local < remote, closing connection",
1278+
int64(*response.GetMessageId().LedgerId), int64(*response.GetMessageId().EntryId), response.GetSequenceId(), pi.sequenceID)
12791279
p._getConn().Close()
12801280
return
12811281
} else if pi.sequenceID > response.GetSequenceId() {
12821282
// Ignoring the ack since it's referring to a message that has already timed out.
1283-
p.log.Warnf("Received ack for %v on sequenceId %v - expected: %v, local > remote, ignore it",
1284-
response.GetMessageId(), response.GetSequenceId(), pi.sequenceID)
1283+
p.log.Warnf("Received ack for ledgerId:%d entryId:%d on sequenceId %v - expected: %v, local > remote, ignore it",
1284+
int64(*response.GetMessageId().LedgerId), int64(*response.GetMessageId().EntryId), response.GetSequenceId(), pi.sequenceID)
12851285
return
12861286
} else {
12871287
// The ack was indeed for the expected item in the queue, we can remove it and trigger the callback

pulsar/producer_test.go

Lines changed: 30 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2357,6 +2357,36 @@ func TestFailPendingMessageWithClose(t *testing.T) {
23572357
assert.Equal(t, 0, partitionProducerImp.pendingQueue.Size())
23582358
}
23592359

2360+
func TestProducerSendDuplicatedMessages(t *testing.T) {
2361+
client, err := NewClient(ClientOptions{
2362+
URL: lookupURL,
2363+
})
2364+
assert.NoError(t, err)
2365+
defer client.Close()
2366+
testProducer, err := client.CreateProducer(ProducerOptions{
2367+
Topic: newTopicName(),
2368+
})
2369+
2370+
assert.NoError(t, err)
2371+
assert.NotNil(t, testProducer)
2372+
_, err = testProducer.Send(context.Background(), &ProducerMessage{
2373+
Payload: make([]byte, 1024),
2374+
})
2375+
assert.NoError(t, err)
2376+
for i := 0; i < 3; i++ {
2377+
var seqId int64 = 0
2378+
msgId, err := testProducer.Send(context.Background(), &ProducerMessage{
2379+
Payload: make([]byte, 1024),
2380+
SequenceID: &seqId,
2381+
})
2382+
assert.NoError(t, err)
2383+
assert.NotNil(t, msgId)
2384+
assert.Equal(t, int64(-1), msgId.LedgerID())
2385+
assert.Equal(t, int64(-1), msgId.EntryID())
2386+
}
2387+
testProducer.Close()
2388+
}
2389+
23602390
type pendingQueueWrapper struct {
23612391
pendingQueue internal.BlockingQueue
23622392
writtenBuffers *[]internal.Buffer

0 commit comments

Comments
 (0)