Skip to content

Commit ba83732

Browse files
authored
Fix wrong result of reader.hasNext/Next after seeking by id or time (#1340)
* Fix wrong result of hasNext after seeking by id or time * fix unit test * Address code reviews. * Address code review * Add annotation to StartMessageIDInclusive
1 parent 3181aaa commit ba83732

File tree

8 files changed

+262
-41
lines changed

8 files changed

+262
-41
lines changed

pulsar/consumer.go

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -261,6 +261,7 @@ type ConsumerOptions struct {
261261
SubscriptionMode SubscriptionMode
262262

263263
// StartMessageIDInclusive, if true, the consumer will start at the `StartMessageID`, included.
264+
// Note: This configuration also affects the seek operation.
264265
// Default is `false` and the consumer will start from the "next" message
265266
StartMessageIDInclusive bool
266267

pulsar/consumer_partition.go

Lines changed: 75 additions & 23 deletions
Original file line numberDiff line numberDiff line change
@@ -190,9 +190,13 @@ type partitionConsumer struct {
190190
backoffPolicyFunc func() backoff.Policy
191191

192192
dispatcherSeekingControlCh chan struct{}
193-
isSeeking atomic.Bool
194-
ctx context.Context
195-
cancelFunc context.CancelFunc
193+
// handle to the dispatcher goroutine
194+
isSeeking atomic.Bool
195+
// After executing seekByTime, the client is unaware of the startMessageId.
196+
// Use this flag to compare markDeletePosition with BrokerLastMessageId when checking hasMoreMessages.
197+
hasSoughtByTime atomic.Bool
198+
ctx context.Context
199+
cancelFunc context.CancelFunc
196200
}
197201

198202
// pauseDispatchMessage used to discard the message in the dispatcher goroutine.
@@ -429,11 +433,12 @@ func newPartitionConsumer(parent Consumer, client *client, options *partitionCon
429433

430434
startingMessageID := pc.startMessageID.get()
431435
if pc.options.startMessageIDInclusive && startingMessageID != nil && startingMessageID.equal(latestMessageID) {
432-
msgID, err := pc.requestGetLastMessageID()
436+
msgIDResp, err := pc.requestGetLastMessageID()
433437
if err != nil {
434438
pc.Close()
435439
return nil, err
436440
}
441+
msgID := convertToMessageID(msgIDResp.GetLastMessageId())
437442
if msgID.entryID != noMessageEntry {
438443
pc.startMessageID.set(msgID)
439444

@@ -616,18 +621,27 @@ func (pc *partitionConsumer) internalUnsubscribe(unsub *unsubscribeRequest) {
616621
}
617622

618623
func (pc *partitionConsumer) getLastMessageID() (*trackingMessageID, error) {
624+
res, err := pc.getLastMessageIDAndMarkDeletePosition()
625+
if err != nil {
626+
return nil, err
627+
}
628+
return res.msgID, err
629+
}
630+
631+
func (pc *partitionConsumer) getLastMessageIDAndMarkDeletePosition() (*getLastMsgIDResult, error) {
619632
if state := pc.getConsumerState(); state == consumerClosed || state == consumerClosing {
620633
pc.log.WithField("state", state).Error("Failed to getLastMessageID for the closing or closed consumer")
621634
return nil, errors.New("failed to getLastMessageID for the closing or closed consumer")
622635
}
623636
bo := pc.backoffPolicyFunc()
624-
request := func() (*trackingMessageID, error) {
637+
request := func() (*getLastMsgIDResult, error) {
625638
req := &getLastMsgIDRequest{doneCh: make(chan struct{})}
626639
pc.eventsCh <- req
627640

628641
// wait for the request to complete
629642
<-req.doneCh
630-
return req.msgID, req.err
643+
res := &getLastMsgIDResult{req.msgID, req.markDeletePosition}
644+
return res, req.err
631645
}
632646

633647
ctx, cancel := context.WithTimeout(context.Background(), pc.client.operationTimeout)
@@ -647,10 +661,16 @@ func (pc *partitionConsumer) getLastMessageID() (*trackingMessageID, error) {
647661

648662
func (pc *partitionConsumer) internalGetLastMessageID(req *getLastMsgIDRequest) {
649663
defer close(req.doneCh)
650-
req.msgID, req.err = pc.requestGetLastMessageID()
664+
rsp, err := pc.requestGetLastMessageID()
665+
if err != nil {
666+
req.err = err
667+
return
668+
}
669+
req.msgID = convertToMessageID(rsp.GetLastMessageId())
670+
req.markDeletePosition = convertToMessageID(rsp.GetConsumerMarkDeletePosition())
651671
}
652672

653-
func (pc *partitionConsumer) requestGetLastMessageID() (*trackingMessageID, error) {
673+
func (pc *partitionConsumer) requestGetLastMessageID() (*pb.CommandGetLastMessageIdResponse, error) {
654674
if state := pc.getConsumerState(); state == consumerClosed || state == consumerClosing {
655675
pc.log.WithField("state", state).Error("Failed to getLastMessageID closing or closed consumer")
656676
return nil, errors.New("failed to getLastMessageID closing or closed consumer")
@@ -667,8 +687,7 @@ func (pc *partitionConsumer) requestGetLastMessageID() (*trackingMessageID, erro
667687
pc.log.WithError(err).Error("Failed to get last message id")
668688
return nil, err
669689
}
670-
id := res.Response.GetLastMessageIdResponse.GetLastMessageId()
671-
return convertToMessageID(id), nil
690+
return res.Response.GetLastMessageIdResponse, nil
672691
}
673692

674693
func (pc *partitionConsumer) sendIndividualAck(msgID MessageID) *ackRequest {
@@ -997,7 +1016,15 @@ func (pc *partitionConsumer) requestSeek(msgID *messageID) error {
9971016
if err := pc.requestSeekWithoutClear(msgID); err != nil {
9981017
return err
9991018
}
1000-
pc.clearReceiverQueue()
1019+
// When the seek operation is successful, it indicates:
1020+
// 1. The broker has reset the cursor and sent a request to close the consumer on the client side.
1021+
// Since this method is in the same goroutine as the reconnectToBroker,
1022+
// we can safely clear the messages in the queue (at this point, it won't contain messages after the seek).
1023+
// 2. The startMessageID is reset to ensure accurate judgment when calling hasNext next time.
1024+
// Since the messages in the queue are cleared here reconnection won't reset startMessageId.
1025+
pc.lastDequeuedMsg = nil
1026+
pc.startMessageID.set(toTrackingMessageID(msgID))
1027+
pc.clearQueueAndGetNextMessage()
10011028
return nil
10021029
}
10031030

@@ -1069,7 +1096,9 @@ func (pc *partitionConsumer) internalSeekByTime(seek *seekByTimeRequest) {
10691096
seek.err = err
10701097
return
10711098
}
1072-
pc.clearReceiverQueue()
1099+
pc.lastDequeuedMsg = nil
1100+
pc.hasSoughtByTime.Store(true)
1101+
pc.clearQueueAndGetNextMessage()
10731102
}
10741103

10751104
func (pc *partitionConsumer) internalAck(req *ackRequest) {
@@ -1451,10 +1480,6 @@ func (pc *partitionConsumer) messageShouldBeDiscarded(msgID *trackingMessageID)
14511480
if pc.startMessageID.get() == nil {
14521481
return false
14531482
}
1454-
// if we start at latest message, we should never discard
1455-
if pc.options.startMessageID != nil && pc.options.startMessageID.equal(latestMessageID) {
1456-
return false
1457-
}
14581483

14591484
if pc.options.startMessageIDInclusive {
14601485
return pc.startMessageID.get().greater(msgID.messageID)
@@ -1709,9 +1734,15 @@ type redeliveryRequest struct {
17091734
}
17101735

17111736
type getLastMsgIDRequest struct {
1712-
doneCh chan struct{}
1713-
msgID *trackingMessageID
1714-
err error
1737+
doneCh chan struct{}
1738+
msgID *trackingMessageID
1739+
markDeletePosition *trackingMessageID
1740+
err error
1741+
}
1742+
1743+
type getLastMsgIDResult struct {
1744+
msgID *trackingMessageID
1745+
markDeletePosition *trackingMessageID
17151746
}
17161747

17171748
type seekRequest struct {
@@ -2200,6 +2231,25 @@ func (pc *partitionConsumer) discardCorruptedMessage(msgID *pb.MessageIdData,
22002231
}
22012232

22022233
func (pc *partitionConsumer) hasNext() bool {
2234+
2235+
// If a seek by time has been performed, then the `startMessageId` becomes irrelevant.
2236+
// We need to compare `markDeletePosition` and `lastMessageId`,
2237+
// and then reset `startMessageID` to `markDeletePosition`.
2238+
if pc.lastDequeuedMsg == nil && pc.hasSoughtByTime.CompareAndSwap(true, false) {
2239+
res, err := pc.getLastMessageIDAndMarkDeletePosition()
2240+
if err != nil {
2241+
pc.log.WithError(err).Error("Failed to get last message id")
2242+
pc.hasSoughtByTime.CompareAndSwap(false, true)
2243+
return false
2244+
}
2245+
pc.lastMessageInBroker = res.msgID
2246+
pc.startMessageID.set(res.markDeletePosition)
2247+
// We only care about comparing ledger ids and entry ids as mark delete position
2248+
// doesn't have other ids such as batch index
2249+
compareResult := pc.lastMessageInBroker.messageID.compareLedgerAndEntryID(pc.startMessageID.get().messageID)
2250+
return compareResult > 0 || (pc.options.startMessageIDInclusive && compareResult == 0)
2251+
}
2252+
22032253
if pc.lastMessageInBroker != nil && pc.hasMoreMessages() {
22042254
return true
22052255
}
@@ -2261,12 +2311,14 @@ func convertToMessageID(id *pb.MessageIdData) *trackingMessageID {
22612311

22622312
msgID := &trackingMessageID{
22632313
messageID: &messageID{
2264-
ledgerID: int64(*id.LedgerId),
2265-
entryID: int64(*id.EntryId),
2314+
ledgerID: int64(id.GetLedgerId()),
2315+
entryID: int64(id.GetEntryId()),
2316+
batchIdx: id.GetBatchIndex(),
2317+
batchSize: id.GetBatchSize(),
22662318
},
22672319
}
2268-
if id.BatchIndex != nil {
2269-
msgID.batchIdx = *id.BatchIndex
2320+
if msgID.batchIdx == -1 {
2321+
msgID.batchIdx = 0
22702322
}
22712323

22722324
return msgID

pulsar/consumer_test.go

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1262,8 +1262,9 @@ func TestConsumerSeek(t *testing.T) {
12621262
defer producer.Close()
12631263

12641264
consumer, err := client.Subscribe(ConsumerOptions{
1265-
Topic: topicName,
1266-
SubscriptionName: "sub-1",
1265+
Topic: topicName,
1266+
SubscriptionName: "sub-1",
1267+
StartMessageIDInclusive: true,
12671268
})
12681269
assert.Nil(t, err)
12691270
defer consumer.Close()

pulsar/consumer_zero_queue_test.go

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -474,6 +474,7 @@ func TestZeroQueueConsumer_Seek(t *testing.T) {
474474
Topic: topicName,
475475
EnableZeroQueueConsumer: true,
476476
SubscriptionName: "sub-1",
477+
StartMessageIDInclusive: true,
477478
})
478479
assert.Nil(t, err)
479480
_, ok := consumer.(*zeroQueueConsumer)

pulsar/impl_message.go

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@
1818
package pulsar
1919

2020
import (
21+
"cmp"
2122
"errors"
2223
"fmt"
2324
"math"
@@ -147,6 +148,13 @@ func (id *messageID) equal(other *messageID) bool {
147148
id.batchIdx == other.batchIdx
148149
}
149150

151+
func (id *messageID) compareLedgerAndEntryID(other *messageID) int {
152+
if result := cmp.Compare(id.ledgerID, other.ledgerID); result != 0 {
153+
return result
154+
}
155+
return cmp.Compare(id.entryID, other.entryID)
156+
}
157+
150158
func (id *messageID) greaterEqual(other *messageID) bool {
151159
return id.equal(other) || id.greater(other)
152160
}

pulsar/reader.go

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -53,6 +53,7 @@ type ReaderOptions struct {
5353
StartMessageID MessageID
5454

5555
// StartMessageIDInclusive, if true, the reader will start at the `StartMessageID`, included.
56+
// Note: This configuration also affects the seek operation.
5657
// Default is `false` and the reader will start from the "next" message
5758
StartMessageIDInclusive bool
5859

pulsar/reader_impl.go

Lines changed: 6 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -196,19 +196,6 @@ func (r *reader) Close() {
196196
r.metrics.ReadersClosed.Inc()
197197
}
198198

199-
func (r *reader) messageID(msgID MessageID) *trackingMessageID {
200-
mid := toTrackingMessageID(msgID)
201-
202-
partition := int(mid.partitionIdx)
203-
// did we receive a valid partition index?
204-
if partition < 0 {
205-
r.log.Warnf("invalid partition index %d expected", partition)
206-
return nil
207-
}
208-
209-
return mid
210-
}
211-
212199
func (r *reader) Seek(msgID MessageID) error {
213200
r.Lock()
214201
defer r.Unlock()
@@ -218,9 +205,12 @@ func (r *reader) Seek(msgID MessageID) error {
218205
return fmt.Errorf("invalid message id type %T", msgID)
219206
}
220207

221-
mid := r.messageID(msgID)
222-
if mid == nil {
223-
return nil
208+
mid := toTrackingMessageID(msgID)
209+
210+
partition := int(mid.partitionIdx)
211+
if partition < 0 {
212+
r.log.Warnf("invalid partition index %d expected", partition)
213+
return fmt.Errorf("seek msgId must include partitoinIndex")
224214
}
225215

226216
return r.c.Seek(mid)

0 commit comments

Comments
 (0)