Skip to content

Commit 875f6ba

Browse files
authored
fix: seek race (#1265)
* fix: use same goroutine to perform reconnect and seek * fix: pause dispatch message before performing seek * use chan struct{} instead chan bool * add log when seek
1 parent ccaf552 commit 875f6ba

File tree

2 files changed

+76
-55
lines changed

2 files changed

+76
-55
lines changed

pulsar/consumer_impl.go

Lines changed: 12 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -712,22 +712,29 @@ func (c *consumer) Seek(msgID MessageID) error {
712712
return err
713713
}
714714

715-
if err := c.consumers[msgID.PartitionIdx()].Seek(msgID); err != nil {
716-
return err
717-
}
718-
715+
consumer := c.consumers[msgID.PartitionIdx()]
716+
consumer.pauseDispatchMessage()
719717
// clear messageCh
720718
for len(c.messageCh) > 0 {
721719
<-c.messageCh
722720
}
723721

724-
return nil
722+
return consumer.Seek(msgID)
725723
}
726724

727725
func (c *consumer) SeekByTime(time time.Time) error {
728726
c.Lock()
729727
defer c.Unlock()
730728
var errs error
729+
730+
for _, cons := range c.consumers {
731+
cons.pauseDispatchMessage()
732+
}
733+
// clear messageCh
734+
for len(c.messageCh) > 0 {
735+
<-c.messageCh
736+
}
737+
731738
// run SeekByTime on every partition of topic
732739
for _, cons := range c.consumers {
733740
if err := cons.SeekByTime(time); err != nil {
@@ -736,11 +743,6 @@ func (c *consumer) SeekByTime(time time.Time) error {
736743
}
737744
}
738745

739-
// clear messageCh
740-
for len(c.messageCh) > 0 {
741-
<-c.messageCh
742-
}
743-
744746
return errs
745747
}
746748

pulsar/consumer_partition.go

Lines changed: 64 additions & 45 deletions
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,7 @@ import (
2424
"math"
2525
"strings"
2626
"sync"
27+
"sync/atomic"
2728
"time"
2829

2930
"github.com/apache/pulsar-client-go/pulsar/backoff"
@@ -185,6 +186,16 @@ type partitionConsumer struct {
185186

186187
redirectedClusterURI string
187188
backoffPolicyFunc func() backoff.Policy
189+
190+
dispatcherSeekingControlCh chan struct{}
191+
isSeeking atomic.Bool
192+
}
193+
194+
// pauseDispatchMessage used to discard the message in the dispatcher goroutine.
195+
// This method will be called When the parent consumer performs the seek operation.
196+
// After the seek operation, the dispatcher will continue dispatching messages automatically.
197+
func (pc *partitionConsumer) pauseDispatchMessage() {
198+
pc.dispatcherSeekingControlCh <- struct{}{}
188199
}
189200

190201
func (pc *partitionConsumer) ActiveConsumerChanged(isActive bool) {
@@ -329,27 +340,28 @@ func newPartitionConsumer(parent Consumer, client *client, options *partitionCon
329340
}
330341

331342
pc := &partitionConsumer{
332-
parentConsumer: parent,
333-
client: client,
334-
options: options,
335-
topic: options.topic,
336-
name: options.consumerName,
337-
consumerID: client.rpcClient.NewConsumerID(),
338-
partitionIdx: int32(options.partitionIdx),
339-
eventsCh: make(chan interface{}, 10),
340-
maxQueueSize: int32(options.receiverQueueSize),
341-
queueCh: make(chan *message, options.receiverQueueSize),
342-
startMessageID: atomicMessageID{msgID: options.startMessageID},
343-
connectedCh: make(chan struct{}),
344-
messageCh: messageCh,
345-
connectClosedCh: make(chan *connectionClosed, 1),
346-
closeCh: make(chan struct{}),
347-
clearQueueCh: make(chan func(id *trackingMessageID)),
348-
compressionProviders: sync.Map{},
349-
dlq: dlq,
350-
metrics: metrics,
351-
schemaInfoCache: newSchemaInfoCache(client, options.topic),
352-
backoffPolicyFunc: boFunc,
343+
parentConsumer: parent,
344+
client: client,
345+
options: options,
346+
topic: options.topic,
347+
name: options.consumerName,
348+
consumerID: client.rpcClient.NewConsumerID(),
349+
partitionIdx: int32(options.partitionIdx),
350+
eventsCh: make(chan interface{}, 10),
351+
maxQueueSize: int32(options.receiverQueueSize),
352+
queueCh: make(chan *message, options.receiverQueueSize),
353+
startMessageID: atomicMessageID{msgID: options.startMessageID},
354+
connectedCh: make(chan struct{}),
355+
messageCh: messageCh,
356+
connectClosedCh: make(chan *connectionClosed, 1),
357+
closeCh: make(chan struct{}),
358+
clearQueueCh: make(chan func(id *trackingMessageID)),
359+
compressionProviders: sync.Map{},
360+
dlq: dlq,
361+
metrics: metrics,
362+
schemaInfoCache: newSchemaInfoCache(client, options.topic),
363+
backoffPolicyFunc: boFunc,
364+
dispatcherSeekingControlCh: make(chan struct{}),
353365
}
354366
if pc.options.autoReceiverQueueSize {
355367
pc.currentQueueSize.Store(initialReceiverQueueSize)
@@ -1440,17 +1452,20 @@ func (pc *partitionConsumer) dispatcher() {
14401452
}
14411453
nextMessageSize = queueMsg.size()
14421454

1443-
if pc.dlq.shouldSendToDlq(&nextMessage) {
1444-
// pass the message to the DLQ router
1445-
pc.metrics.DlqCounter.Inc()
1446-
messageCh = pc.dlq.Chan()
1455+
if !pc.isSeeking.Load() {
1456+
if pc.dlq.shouldSendToDlq(&nextMessage) {
1457+
// pass the message to the DLQ router
1458+
pc.metrics.DlqCounter.Inc()
1459+
messageCh = pc.dlq.Chan()
1460+
} else {
1461+
// pass the message to application channel
1462+
messageCh = pc.messageCh
1463+
}
1464+
pc.metrics.PrefetchedMessages.Dec()
1465+
pc.metrics.PrefetchedBytes.Sub(float64(len(queueMsg.payLoad)))
14471466
} else {
1448-
// pass the message to application channel
1449-
messageCh = pc.messageCh
1467+
pc.log.Debug("skip dispatching messages when seeking")
14501468
}
1451-
1452-
pc.metrics.PrefetchedMessages.Dec()
1453-
pc.metrics.PrefetchedBytes.Sub(float64(len(queueMsg.payLoad)))
14541469
} else {
14551470
queueCh = pc.queueCh
14561471
}
@@ -1483,6 +1498,13 @@ func (pc *partitionConsumer) dispatcher() {
14831498
pc.log.WithError(err).Error("unable to send initial permits to broker")
14841499
}
14851500

1501+
case _, ok := <-pc.dispatcherSeekingControlCh:
1502+
if !ok {
1503+
return
1504+
}
1505+
pc.log.Debug("received dispatcherSeekingControlCh, set isSeek to true")
1506+
pc.isSeeking.Store(true)
1507+
14861508
case msg, ok := <-queueCh:
14871509
if !ok {
14881510
return
@@ -1587,22 +1609,16 @@ func (pc *partitionConsumer) runEventsLoop() {
15871609
}()
15881610
pc.log.Debug("get into runEventsLoop")
15891611

1590-
go func() {
1591-
for {
1592-
select {
1593-
case <-pc.closeCh:
1594-
pc.log.Info("close consumer, exit reconnect")
1595-
return
1596-
case connectionClosed := <-pc.connectClosedCh:
1597-
pc.log.Debug("runEventsLoop will reconnect")
1598-
pc.reconnectToBroker(connectionClosed)
1599-
}
1600-
}
1601-
}()
1602-
16031612
for {
1604-
for i := range pc.eventsCh {
1605-
switch v := i.(type) {
1613+
select {
1614+
case <-pc.closeCh:
1615+
pc.log.Info("close consumer, exit reconnect")
1616+
return
1617+
case connectionClosed := <-pc.connectClosedCh:
1618+
pc.log.Debug("runEventsLoop will reconnect")
1619+
pc.reconnectToBroker(connectionClosed)
1620+
case event := <-pc.eventsCh:
1621+
switch v := event.(type) {
16061622
case *ackRequest:
16071623
pc.internalAck(v)
16081624
case *ackWithTxnRequest:
@@ -1680,6 +1696,9 @@ func (pc *partitionConsumer) internalClose(req *closeRequest) {
16801696
}
16811697

16821698
func (pc *partitionConsumer) reconnectToBroker(connectionClosed *connectionClosed) {
1699+
if pc.isSeeking.CompareAndSwap(true, false) {
1700+
pc.log.Debug("seek operation triggers reconnection, and reset isSeeking")
1701+
}
16831702
var (
16841703
maxRetry int
16851704
delayReconnectTime, totalDelayReconnectTime time.Duration

0 commit comments

Comments
 (0)