@@ -1113,7 +1113,11 @@ void ConsumerImpl::messageProcessed(Message& msg, bool track) {
11131113 */
11141114void ConsumerImpl::clearReceiveQueue () {
11151115 if (duringSeek ()) {
1116- if (!hasSoughtByTimestamp_.load (std::memory_order_acquire)) {
1116+ if (hasSoughtByTimestamp ()) {
1117+ // Invalidate startMessageId_ so that isPriorBatchIndex and isPriorEntryIndex checks will be
1118+ // skipped, and hasMessageAvailableAsync won't use startMessageId_ in compare.
1119+ startMessageId_ = boost::none;
1120+ } else {
11171121 startMessageId_ = seekMessageId_.get ();
11181122 }
11191123 SeekStatus expected = SeekStatus::COMPLETED;
@@ -1578,10 +1582,16 @@ void ConsumerImpl::hasMessageAvailableAsync(const HasMessageAvailableCallback& c
15781582 {
15791583 std::lock_guard<std::mutex> lock{mutexForMessageId_};
15801584 compareMarkDeletePosition =
1581- (lastDequedMessageId_ == MessageId::earliest ()) &&
1582- (startMessageId_.get ().value_or (MessageId::earliest ()) == MessageId::latest ());
1583- }
1584- if (compareMarkDeletePosition || hasSoughtByTimestamp_.load (std::memory_order_acquire)) {
1585+ // there is no message received by consumer, so we cannot compare the last position with the last
1586+ // received position
1587+ lastDequedMessageId_ == MessageId::earliest () &&
1588+ // If the start message id is latest, we should seek to the actual last message first.
1589+ (startMessageId_.get ().value_or (MessageId::earliest ()) == MessageId::latest () ||
1590+ // If there is a previous seek operation by timestamp, the start message id will be incorrect, so
1591+ // we cannot compare the start position with the last position.
1592+ hasSoughtByTimestamp ());
1593+ }
1594+ if (compareMarkDeletePosition) {
15851595 auto self = get_shared_this_ptr ();
15861596 getLastMessageIdAsync ([self, callback](Result result, const GetLastMessageIdResponse& response) {
15871597 if (result != ResultOk) {
@@ -1600,8 +1610,7 @@ void ConsumerImpl::hasMessageAvailableAsync(const HasMessageAvailableCallback& c
16001610 callback (ResultOk, false );
16011611 }
16021612 };
1603- if (self->config_ .isStartMessageIdInclusive () &&
1604- !self->hasSoughtByTimestamp_ .load (std::memory_order_acquire)) {
1613+ if (self->config_ .isStartMessageIdInclusive () && !self->hasSoughtByTimestamp ()) {
16051614 self->seekAsync (response.getLastMessageId (), [callback, handleResponse](Result result) {
16061615 if (result != ResultOk) {
16071616 callback (result, {});
@@ -1766,7 +1775,7 @@ void ConsumerImpl::seekAsyncInternal(long requestId, const SharedBuffer& seek, c
17661775 // It's during reconnection, complete the seek future after connection is established
17671776 seekStatus_ = SeekStatus::COMPLETED;
17681777 } else {
1769- if (!hasSoughtByTimestamp_. load (std::memory_order_acquire )) {
1778+ if (!hasSoughtByTimestamp ( )) {
17701779 startMessageId_ = seekMessageId_.get ();
17711780 }
17721781 seekCallback_.release ()(result);
0 commit comments