Skip to content

Commit 9aeeb4e

Browse files
committed
Fix wrong result of hasNext after seeking by id or time
1 parent 4e71a47 commit 9aeeb4e

File tree

4 files changed

+235
-36
lines changed

4 files changed

+235
-36
lines changed

pulsar/consumer_partition.go

Lines changed: 64 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -190,9 +190,13 @@ type partitionConsumer struct {
190190
backoffPolicyFunc func() backoff.Policy
191191

192192
dispatcherSeekingControlCh chan struct{}
193-
isSeeking atomic.Bool
194-
ctx context.Context
195-
cancelFunc context.CancelFunc
193+
// handle to the dispatcher goroutine
194+
isSeeking atomic.Bool
195+
// After executing seekByTime, the client is unaware of the startMessageId.
196+
// Use this flag to compare markDeletePosition with BrokerLastMessageId when checking hasMoreMessages.
197+
hasSoughtByTime atomic.Bool
198+
ctx context.Context
199+
cancelFunc context.CancelFunc
196200
}
197201

198202
// pauseDispatchMessage used to discard the message in the dispatcher goroutine.
@@ -429,11 +433,12 @@ func newPartitionConsumer(parent Consumer, client *client, options *partitionCon
429433

430434
startingMessageID := pc.startMessageID.get()
431435
if pc.options.startMessageIDInclusive && startingMessageID != nil && startingMessageID.equal(latestMessageID) {
432-
msgID, err := pc.requestGetLastMessageID()
436+
msgIDResp, err := pc.requestGetLastMessageID()
433437
if err != nil {
434438
pc.Close()
435439
return nil, err
436440
}
441+
msgID := convertToMessageID(msgIDResp.GetLastMessageId())
437442
if msgID.entryID != noMessageEntry {
438443
pc.startMessageID.set(msgID)
439444

@@ -616,18 +621,24 @@ func (pc *partitionConsumer) internalUnsubscribe(unsub *unsubscribeRequest) {
616621
}
617622

618623
func (pc *partitionConsumer) getLastMessageID() (*trackingMessageID, error) {
624+
res, err := pc.getLastMessageIDAndMarkDeletePosition()
625+
return res.msgID, err
626+
}
627+
628+
func (pc *partitionConsumer) getLastMessageIDAndMarkDeletePosition() (*getLastMsgResult, error) {
619629
if state := pc.getConsumerState(); state == consumerClosed || state == consumerClosing {
620630
pc.log.WithField("state", state).Error("Failed to getLastMessageID for the closing or closed consumer")
621631
return nil, errors.New("failed to getLastMessageID for the closing or closed consumer")
622632
}
623633
bo := pc.backoffPolicyFunc()
624-
request := func() (*trackingMessageID, error) {
634+
request := func() (*getLastMsgResult, error) {
625635
req := &getLastMsgIDRequest{doneCh: make(chan struct{})}
626636
pc.eventsCh <- req
627637

628638
// wait for the request to complete
629639
<-req.doneCh
630-
return req.msgID, req.err
640+
res := &getLastMsgResult{req.msgID, req.markDeletePosition}
641+
return res, req.err
631642
}
632643

633644
ctx, cancel := context.WithTimeout(context.Background(), pc.client.operationTimeout)
@@ -647,10 +658,16 @@ func (pc *partitionConsumer) getLastMessageID() (*trackingMessageID, error) {
647658

648659
func (pc *partitionConsumer) internalGetLastMessageID(req *getLastMsgIDRequest) {
649660
defer close(req.doneCh)
650-
req.msgID, req.err = pc.requestGetLastMessageID()
661+
rsp, err := pc.requestGetLastMessageID()
662+
if err != nil {
663+
req.err = err
664+
return
665+
}
666+
req.msgID = convertToMessageID(rsp.GetLastMessageId())
667+
req.markDeletePosition = convertToMessageID(rsp.GetConsumerMarkDeletePosition())
651668
}
652669

653-
func (pc *partitionConsumer) requestGetLastMessageID() (*trackingMessageID, error) {
670+
func (pc *partitionConsumer) requestGetLastMessageID() (*pb.CommandGetLastMessageIdResponse, error) {
654671
if state := pc.getConsumerState(); state == consumerClosed || state == consumerClosing {
655672
pc.log.WithField("state", state).Error("Failed to getLastMessageID closing or closed consumer")
656673
return nil, errors.New("failed to getLastMessageID closing or closed consumer")
@@ -667,8 +684,7 @@ func (pc *partitionConsumer) requestGetLastMessageID() (*trackingMessageID, erro
667684
pc.log.WithError(err).Error("Failed to get last message id")
668685
return nil, err
669686
}
670-
id := res.Response.GetLastMessageIdResponse.GetLastMessageId()
671-
return convertToMessageID(id), nil
687+
return res.Response.GetLastMessageIdResponse, nil
672688
}
673689

674690
func (pc *partitionConsumer) sendIndividualAck(msgID MessageID) *ackRequest {
@@ -997,7 +1013,14 @@ func (pc *partitionConsumer) requestSeek(msgID *messageID) error {
9971013
if err := pc.requestSeekWithoutClear(msgID); err != nil {
9981014
return err
9991015
}
1000-
pc.clearReceiverQueue()
1016+
// When the seek operation is successful, it indicates:
1017+
// 1. The broker has reset the cursor and sent a request to close the consumer on the client side.
1018+
// Since this method is in the same goroutine as the reconnectToBroker,
1019+
// we can safely clear the messages in the queue (at this point, it won't contain messages after the seek).
1020+
// 2. The startMessageID is reset to ensure accurate judgment when calling hasNext next time.
1021+
// Since the messages in the queue are cleared here reconnection won't reset startMessageId.
1022+
pc.lastDequeuedMsg = nil
1023+
pc.startMessageID.set(toTrackingMessageID(msgID))
10011024
return nil
10021025
}
10031026

@@ -1069,7 +1092,9 @@ func (pc *partitionConsumer) internalSeekByTime(seek *seekByTimeRequest) {
10691092
seek.err = err
10701093
return
10711094
}
1072-
pc.clearReceiverQueue()
1095+
pc.clearQueueAndGetNextMessage()
1096+
pc.lastDequeuedMsg = nil
1097+
pc.hasSoughtByTime.Store(true)
10731098
}
10741099

10751100
func (pc *partitionConsumer) internalAck(req *ackRequest) {
@@ -1451,10 +1476,6 @@ func (pc *partitionConsumer) messageShouldBeDiscarded(msgID *trackingMessageID)
14511476
if pc.startMessageID.get() == nil {
14521477
return false
14531478
}
1454-
// if we start at latest message, we should never discard
1455-
if pc.options.startMessageID != nil && pc.options.startMessageID.equal(latestMessageID) {
1456-
return false
1457-
}
14581479

14591480
if pc.options.startMessageIDInclusive {
14601481
return pc.startMessageID.get().greater(msgID.messageID)
@@ -1709,9 +1730,15 @@ type redeliveryRequest struct {
17091730
}
17101731

17111732
type getLastMsgIDRequest struct {
1712-
doneCh chan struct{}
1713-
msgID *trackingMessageID
1714-
err error
1733+
doneCh chan struct{}
1734+
msgID *trackingMessageID
1735+
markDeletePosition *trackingMessageID
1736+
err error
1737+
}
1738+
1739+
type getLastMsgResult struct {
1740+
msgID *trackingMessageID
1741+
markDeletePosition *trackingMessageID
17151742
}
17161743

17171744
type seekRequest struct {
@@ -2195,6 +2222,24 @@ func (pc *partitionConsumer) discardCorruptedMessage(msgID *pb.MessageIdData,
21952222
}
21962223

21972224
func (pc *partitionConsumer) hasNext() bool {
2225+
2226+
// If a seek by time has been performed, then the `startMessageId` becomes irrelevant.
2227+
// We need to compare `markDeletePosition` and `lastMessageId`,
2228+
// and then reset `startMessageID` to `markDeletePosition`.
2229+
if pc.hasSoughtByTime.CompareAndSwap(true, false) {
2230+
res, err := pc.getLastMessageIDAndMarkDeletePosition()
2231+
if err != nil {
2232+
pc.log.WithError(err).Error("Failed to get last message id")
2233+
return false
2234+
}
2235+
pc.lastMessageInBroker = res.msgID
2236+
pc.startMessageID.set(res.markDeletePosition)
2237+
// We only care about comparing ledger ids and entry ids as mark delete position
2238+
// doesn't have other ids such as batch index
2239+
compareResult := pc.lastMessageInBroker.messageID.compareLedgerAndEntryId(pc.startMessageID.get().messageID)
2240+
return compareResult > 0 || (pc.options.startMessageIDInclusive && compareResult == 0)
2241+
}
2242+
21982243
if pc.lastMessageInBroker != nil && pc.hasMoreMessages() {
21992244
return true
22002245
}

pulsar/impl_message.go

Lines changed: 15 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@
1818
package pulsar
1919

2020
import (
21+
"cmp"
2122
"errors"
2223
"fmt"
2324
"math"
@@ -147,6 +148,13 @@ func (id *messageID) equal(other *messageID) bool {
147148
id.batchIdx == other.batchIdx
148149
}
149150

151+
func (id *messageID) compareLedgerAndEntryId(other *messageID) int {
152+
if result := cmp.Compare(id.ledgerID, other.ledgerID); result != 0 {
153+
return result
154+
}
155+
return cmp.Compare(id.entryID, other.entryID)
156+
}
157+
150158
func (id *messageID) greaterEqual(other *messageID) bool {
151159
return id.equal(other) || id.greater(other)
152160
}
@@ -184,7 +192,7 @@ func (id *messageID) BatchSize() int32 {
184192
}
185193

186194
func (id *messageID) String() string {
187-
return fmt.Sprintf("%d:%d:%d", id.ledgerID, id.entryID, id.partitionIdx)
195+
return fmt.Sprintf("%d:%d:%d:%d", id.ledgerID, id.entryID, id.batchIdx, id.partitionIdx)
188196
}
189197

190198
func deserializeMessageID(data []byte) (MessageID, error) {
@@ -204,6 +212,9 @@ func deserializeMessageID(data []byte) (MessageID, error) {
204212
}
205213

206214
func newMessageID(ledgerID int64, entryID int64, batchIdx int32, partitionIdx int32, batchSize int32) MessageID {
215+
if batchSize == 1 {
216+
batchIdx = -1
217+
}
207218
return &messageID{
208219
ledgerID: ledgerID,
209220
entryID: entryID,
@@ -225,6 +236,9 @@ func fromMessageID(msgID MessageID) *messageID {
225236

226237
func newTrackingMessageID(ledgerID int64, entryID int64, batchIdx int32, partitionIdx int32, batchSize int32,
227238
tracker *ackTracker) *trackingMessageID {
239+
if batchSize == 1 {
240+
batchIdx = -1
241+
}
228242
return &trackingMessageID{
229243
messageID: &messageID{
230244
ledgerID: ledgerID,

pulsar/reader_impl.go

Lines changed: 6 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -196,19 +196,6 @@ func (r *reader) Close() {
196196
r.metrics.ReadersClosed.Inc()
197197
}
198198

199-
func (r *reader) messageID(msgID MessageID) *trackingMessageID {
200-
mid := toTrackingMessageID(msgID)
201-
202-
partition := int(mid.partitionIdx)
203-
// did we receive a valid partition index?
204-
if partition < 0 {
205-
r.log.Warnf("invalid partition index %d expected", partition)
206-
return nil
207-
}
208-
209-
return mid
210-
}
211-
212199
func (r *reader) Seek(msgID MessageID) error {
213200
r.Lock()
214201
defer r.Unlock()
@@ -218,9 +205,12 @@ func (r *reader) Seek(msgID MessageID) error {
218205
return fmt.Errorf("invalid message id type %T", msgID)
219206
}
220207

221-
mid := r.messageID(msgID)
222-
if mid == nil {
223-
return nil
208+
mid := toTrackingMessageID(msgID)
209+
210+
partition := int(mid.partitionIdx)
211+
if partition < 0 {
212+
r.log.Warnf("invalid partition index %d expected", partition)
213+
return fmt.Errorf("seek msgId must include partitoinIndex")
224214
}
225215

226216
return r.c.Seek(mid)

0 commit comments

Comments
 (0)