Skip to content

Commit 6bfa2f5

Browse files
committed
Fix hasMessageAvailable incorrectly returns true when read to latest after seeking by timestamp
1 parent 9b86f9e commit 6bfa2f5

File tree

3 files changed

+28
-8
lines changed

3 files changed

+28
-8
lines changed

lib/ConsumerImpl.cc

Lines changed: 17 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -1113,7 +1113,11 @@ void ConsumerImpl::messageProcessed(Message& msg, bool track) {
11131113
*/
11141114
void ConsumerImpl::clearReceiveQueue() {
11151115
if (duringSeek()) {
1116-
if (!hasSoughtByTimestamp_.load(std::memory_order_acquire)) {
1116+
if (hasSoughtByTimestamp()) {
1117+
// Invalidate startMessageId_ so that isPriorBatchIndex and isPriorEntryIndex checks will be
1118+
// skipped, and hasMessageAvailableAsync won't use startMessageId_ in compare.
1119+
startMessageId_ = boost::none;
1120+
} else {
11171121
startMessageId_ = seekMessageId_.get();
11181122
}
11191123
SeekStatus expected = SeekStatus::COMPLETED;
@@ -1578,10 +1582,16 @@ void ConsumerImpl::hasMessageAvailableAsync(const HasMessageAvailableCallback& c
15781582
{
15791583
std::lock_guard<std::mutex> lock{mutexForMessageId_};
15801584
compareMarkDeletePosition =
1581-
(lastDequedMessageId_ == MessageId::earliest()) &&
1582-
(startMessageId_.get().value_or(MessageId::earliest()) == MessageId::latest());
1583-
}
1584-
if (compareMarkDeletePosition || hasSoughtByTimestamp_.load(std::memory_order_acquire)) {
1585+
// there is no message received by consumer, so we cannot compare the last position with the last
1586+
// received position
1587+
lastDequedMessageId_ == MessageId::earliest() &&
1588+
// If the start message id is latest, we should seek to the actual last message first.
1589+
(startMessageId_.get().value_or(MessageId::earliest()) == MessageId::latest() ||
1590+
// If there is a previous seek operation by timestamp, the start message id will be incorrect, so
1591+
// we cannot compare the start positin with the last position.
1592+
hasSoughtByTimestamp());
1593+
}
1594+
if (compareMarkDeletePosition) {
15851595
auto self = get_shared_this_ptr();
15861596
getLastMessageIdAsync([self, callback](Result result, const GetLastMessageIdResponse& response) {
15871597
if (result != ResultOk) {
@@ -1600,8 +1610,7 @@ void ConsumerImpl::hasMessageAvailableAsync(const HasMessageAvailableCallback& c
16001610
callback(ResultOk, false);
16011611
}
16021612
};
1603-
if (self->config_.isStartMessageIdInclusive() &&
1604-
!self->hasSoughtByTimestamp_.load(std::memory_order_acquire)) {
1613+
if (self->config_.isStartMessageIdInclusive() && !self->hasSoughtByTimestamp()) {
16051614
self->seekAsync(response.getLastMessageId(), [callback, handleResponse](Result result) {
16061615
if (result != ResultOk) {
16071616
callback(result, {});
@@ -1766,7 +1775,7 @@ void ConsumerImpl::seekAsyncInternal(long requestId, const SharedBuffer& seek, c
17661775
// It's during reconnection, complete the seek future after connection is established
17671776
seekStatus_ = SeekStatus::COMPLETED;
17681777
} else {
1769-
if (!hasSoughtByTimestamp_.load(std::memory_order_acquire)) {
1778+
if (!hasSoughtByTimestamp()) {
17701779
startMessageId_ = seekMessageId_.get();
17711780
}
17721781
seekCallback_.release()(result);

lib/ConsumerImpl.h

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -266,6 +266,7 @@ class ConsumerImpl : public ConsumerImplBase {
266266
Synchronized<MessageId> seekMessageId_{MessageId::earliest()};
267267
std::atomic<bool> hasSoughtByTimestamp_{false};
268268

269+
bool hasSoughtByTimestamp() const { return hasSoughtByTimestamp_.load(std::memory_order_acquire); }
269270
bool duringSeek() const { return seekStatus_ != SeekStatus::NOT_STARTED; }
270271

271272
class ChunkedMessageCtx {

tests/ReaderTest.cc

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -885,6 +885,16 @@ TEST_F(ReaderSeekTest, testHasMessageAvailableAfterSeekTimestamp) {
885885
createReader(reader, msgId);
886886
ASSERT_EQ(ResultOk, reader.seek(timestampBeforeSend));
887887
EXPECT_HAS_MESSAGE_AVAILABLE(reader, true);
888+
889+
bool hasMessageAvailable;
890+
while (true) {
891+
ASSERT_EQ(ResultOk, reader.hasMessageAvailable(hasMessageAvailable));
892+
if (!hasMessageAvailable) {
893+
break;
894+
}
895+
Message msg;
896+
ASSERT_EQ(ResultOk, reader.readNext(msg, 3000));
897+
}
888898
}
889899

890900
// Test `hasMessageAvailableAsync` will complete immediately if the incoming message queue is non-empty

0 commit comments

Comments
 (0)