Skip to content

Commit 89432d2

Browse files
committed
Reduce unnecessary getLastMessageId RPC in hasMessageAvailable
1 parent d9dd029 commit 89432d2

File tree

3 files changed

+69
-10
lines changed

3 files changed

+69
-10
lines changed

lib/ConsumerImpl.cc

Lines changed: 13 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -961,6 +961,9 @@ Result ConsumerImpl::fetchSingleMessageFromBroker(Message& msg) {
961961
// Can't use break here else it may trigger a race with connection opened.
962962

963963
localLock.unlock();
964+
Lock lock(mutexForMessageId_);
965+
lastDequedMessageId_ = msg.getMessageId();
966+
lock.unlock();
964967
msg = interceptors_->beforeConsume(Consumer(shared_from_this()), msg);
965968
return ResultOk;
966969
}
@@ -1107,9 +1110,7 @@ void ConsumerImpl::messageProcessed(Message& msg, bool track) {
11071110
*/
11081111
void ConsumerImpl::clearReceiveQueue() {
11091112
if (duringSeek()) {
1110-
if (!hasSoughtByTimestamp_.load(std::memory_order_acquire)) {
1111-
startMessageId_ = seekMessageId_.get();
1112-
}
1113+
trySetStartMessageIdToSeekMessageId();
11131114
SeekStatus expected = SeekStatus::COMPLETED;
11141115
if (seekStatus_.compare_exchange_strong(expected, SeekStatus::NOT_STARTED)) {
11151116
auto seekCallback = seekCallback_.release();
@@ -1561,14 +1562,18 @@ void ConsumerImpl::seekAsync(uint64_t timestamp, ResultCallback callback) {
15611562
bool ConsumerImpl::isReadCompacted() { return readCompacted_; }
15621563

15631564
void ConsumerImpl::hasMessageAvailableAsync(HasMessageAvailableCallback callback) {
1565+
if (!incomingMessages_.empty()) {
1566+
callback(ResultOk, true);
1567+
return;
1568+
}
15641569
bool compareMarkDeletePosition;
15651570
{
15661571
std::lock_guard<std::mutex> lock{mutexForMessageId_};
15671572
compareMarkDeletePosition =
15681573
(lastDequedMessageId_ == MessageId::earliest()) &&
15691574
(startMessageId_.get().value_or(MessageId::earliest()) == MessageId::latest());
15701575
}
1571-
if (compareMarkDeletePosition || hasSoughtByTimestamp_.load(std::memory_order_acquire)) {
1576+
if (compareMarkDeletePosition || hasSoughtByTimestamp_.load()) {
15721577
auto self = get_shared_this_ptr();
15731578
getLastMessageIdAsync([self, callback](Result result, const GetLastMessageIdResponse& response) {
15741579
if (result != ResultOk) {
@@ -1587,8 +1592,7 @@ void ConsumerImpl::hasMessageAvailableAsync(HasMessageAvailableCallback callback
15871592
callback(ResultOk, false);
15881593
}
15891594
};
1590-
if (self->config_.isStartMessageIdInclusive() &&
1591-
!self->hasSoughtByTimestamp_.load(std::memory_order_acquire)) {
1595+
if (self->config_.isStartMessageIdInclusive() && !self->hasSoughtByTimestamp_.load()) {
15921596
self->seekAsync(response.getLastMessageId(), [callback, handleResponse](Result result) {
15931597
if (result != ResultOk) {
15941598
callback(result, {});
@@ -1723,7 +1727,7 @@ void ConsumerImpl::seekAsyncInternal(long requestId, SharedBuffer seek, const Se
17231727

17241728
const auto originalSeekMessageId = seekMessageId_.get();
17251729
if (boost::get<uint64_t>(&seekArg)) {
1726-
hasSoughtByTimestamp_.store(true, std::memory_order_release);
1730+
hasSoughtByTimestamp_.store(true);
17271731
} else {
17281732
seekMessageId_ = *boost::get<MessageId>(&seekArg);
17291733
}
@@ -1752,16 +1756,15 @@ void ConsumerImpl::seekAsyncInternal(long requestId, SharedBuffer seek, const Se
17521756
// It's during reconnection, complete the seek future after connection is established
17531757
seekStatus_ = SeekStatus::COMPLETED;
17541758
} else {
1755-
if (!hasSoughtByTimestamp_.load(std::memory_order_acquire)) {
1756-
startMessageId_ = seekMessageId_.get();
1757-
}
1759+
trySetStartMessageIdToSeekMessageId();
17581760
seekCallback_.release()(result);
17591761
}
17601762
} else {
17611763
LOG_ERROR(getName() << "Failed to seek: " << result);
17621764
seekMessageId_ = originalSeekMessageId;
17631765
seekStatus_ = SeekStatus::NOT_STARTED;
17641766
seekCallback_.release()(result);
1767+
hasSoughtByTimestamp_.store(false);
17651768
}
17661769
});
17671770
}

lib/ConsumerImpl.h

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -372,6 +372,14 @@ class ConsumerImpl : public ConsumerImplBase {
372372
}
373373
}
374374

375+
void trySetStartMessageIdToSeekMessageId() {
376+
bool expected = true;
377+
if (!hasSoughtByTimestamp_.compare_exchange_strong(expected, false)) {
378+
startMessageId_ = seekMessageId_.get();
379+
} // else: we have sought by timestamp, we should not set startMessageId_ to seekMessageId_
380+
// because it is not a valid message id.
381+
}
382+
375383
friend class PulsarFriend;
376384
friend class MultiTopicsConsumerImpl;
377385

tests/ReaderTest.cc

Lines changed: 48 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -21,7 +21,10 @@
2121
#include <pulsar/Reader.h>
2222
#include <time.h>
2323

24+
#include <chrono>
25+
#include <future>
2426
#include <string>
27+
#include <thread>
2528

2629
#include "HttpHelper.h"
2730
#include "PulsarFriend.h"
@@ -917,5 +920,50 @@ TEST_F(ReaderSeekTest, testSeekInclusiveChunkMessage) {
917920
assertStartMessageId(false, secondMsgId);
918921
}
919922

923+
// https://github.com/apache/pulsar-client-cpp/pull/422 introduces a regression that after seeking by
924+
// timestamp, each hasMessageAvailable() call will require a getLastMessageId RPC. This test is added to
925+
// verify the regression is fixed.
926+
TEST_F(ReaderSeekTest, testHasMessageAvailableCache) {
927+
const auto topic = "test-has-message-available-cache-" + std::to_string(time(nullptr));
928+
929+
Reader reader;
930+
auto hasMessageAvailableCallbackThreadId = [&reader] {
931+
std::promise<std::thread::id> threadIdPromise;
932+
auto callback = [&threadIdPromise](Result, bool) {
933+
threadIdPromise.set_value(std::this_thread::get_id());
934+
};
935+
reader.hasMessageAvailableAsync(callback);
936+
return threadIdPromise.get_future().get();
937+
};
938+
939+
Producer producer;
940+
ASSERT_EQ(ResultOk, client.createProducer(topic, producer));
941+
producer.send(MessageBuilder().setContent("msg-0").build());
942+
producer.send(MessageBuilder().setContent("msg-1").build());
943+
944+
// Case 1: when `incomingMessages_` is empty, `hasMessageAvailableAsync` will complete immediately if
945+
// `lastDequedMessageId_` and `lastMessageIdInBroker_` have been updated and there is a message available.
946+
ReaderConfiguration conf;
947+
conf.setReceiverQueueSize(0);
948+
ASSERT_EQ(ResultOk, client.createReader(topic, MessageId::latest(), conf, reader));
949+
ASSERT_EQ(ResultOk, reader.seek(0L));
950+
Message msg;
951+
ASSERT_EQ(ResultOk, reader.readNext(msg));
952+
// Now, `lastDequedMessageId_` is updated to the id of `msg`
953+
bool available;
954+
ASSERT_EQ(ResultOk, reader.hasMessageAvailable(available));
955+
// Now the `lastMessageIdInBroker_` is updated
956+
ASSERT_EQ(hasMessageAvailableCallbackThreadId(), std::this_thread::get_id());
957+
958+
// Case 2: when `incomingMessages_` is not empty, `hasMessageAvailableAsync` will complete immediately
959+
ASSERT_EQ(ResultOk, client.createReader(topic, MessageId::latest(), {}, reader));
960+
ASSERT_EQ(ResultOk, reader.seek(0L));
961+
ASSERT_EQ(ResultOk, reader.readNext(msg));
962+
waitUntil(std::chrono::seconds(3),
963+
[&reader] { return PulsarFriend::getConsumer(reader)->getNumOfPrefetchedMessages() > 0; });
964+
// Now, `incomingMessages_` is not empty
965+
ASSERT_EQ(hasMessageAvailableCallbackThreadId(), std::this_thread::get_id());
966+
}
967+
920968
INSTANTIATE_TEST_SUITE_P(Pulsar, ReaderTest, ::testing::Values(true, false));
921969
INSTANTIATE_TEST_SUITE_P(Pulsar, ReaderSeekTest, ::testing::Values(true, false));

0 commit comments

Comments
 (0)