From 9bfac71b4279f129cb3c453011cbdc4874c3750e Mon Sep 17 00:00:00 2001 From: Baodi Shi Date: Thu, 9 Mar 2023 19:34:06 +0800 Subject: [PATCH] Fix seek method maybe lose messages. --- pulsar/consumer_impl.go | 24 ++++++++++++++---------- 1 file changed, 14 insertions(+), 10 deletions(-) diff --git a/pulsar/consumer_impl.go b/pulsar/consumer_impl.go index d19e52237f..77daf78810 100644 --- a/pulsar/consumer_impl.go +++ b/pulsar/consumer_impl.go @@ -653,15 +653,17 @@ func (c *consumer) Seek(msgID MessageID) error { return err } - if err := c.consumers[msgID.PartitionIdx()].Seek(msgID); err != nil { - return err - } - // clear messageCh + // We need clear messageCh before execution SeekByTime. Otherwise, messages of after seek maybe will be clear. + // Note: Cleaning up messageCh will also drain the queueCh of the sub-consumers for len(c.messageCh) > 0 { <-c.messageCh } + if err := c.consumers[msgID.PartitionIdx()].Seek(msgID); err != nil { + return err + } + return nil } @@ -669,6 +671,14 @@ func (c *consumer) SeekByTime(time time.Time) error { c.Lock() defer c.Unlock() var errs error + + // clear messageCh + // We need clear messageCh before execution SeekByTime. Otherwise, messages of after seek maybe will be clear. + // Note: Cleaning up messageCh will also drain the queueCh of the sub-consumers + for len(c.messageCh) > 0 { + <-c.messageCh + } + // run SeekByTime on every partition of topic for _, cons := range c.consumers { if err := cons.SeekByTime(time); err != nil { @@ -676,12 +686,6 @@ func (c *consumer) SeekByTime(time time.Time) error { errs = pkgerrors.Wrap(newError(SeekFailed, err.Error()), msg) } } - - // clear messageCh - for len(c.messageCh) > 0 { - <-c.messageCh - } - return errs }