Skip to content

Commit 27d8cc0

Browse files
Fix wrong results of hasMessageAvailable and readNext after seeking by timestamp (apache#422)
Fixes apache#420 It's a catch-up for apache/pulsar#22363
1 parent 763b85c commit 27d8cc0

File tree

3 files changed

+104
-36
lines changed

3 files changed

+104
-36
lines changed

lib/ConsumerImpl.cc

Lines changed: 23 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -1050,7 +1050,9 @@ void ConsumerImpl::messageProcessed(Message& msg, bool track) {
10501050
*/
10511051
void ConsumerImpl::clearReceiveQueue() {
10521052
if (duringSeek()) {
1053-
startMessageId_ = seekMessageId_.get();
1053+
if (!hasSoughtByTimestamp_.load(std::memory_order_acquire)) {
1054+
startMessageId_ = seekMessageId_.get();
1055+
}
10541056
SeekStatus expected = SeekStatus::COMPLETED;
10551057
if (seekStatus_.compare_exchange_strong(expected, SeekStatus::NOT_STARTED)) {
10561058
auto seekCallback = seekCallback_.release();
@@ -1476,7 +1478,7 @@ void ConsumerImpl::seekAsync(const MessageId& msgId, ResultCallback callback) {
14761478
return;
14771479
}
14781480
const auto requestId = client->newRequestId();
1479-
seekAsyncInternal(requestId, Commands::newSeek(consumerId_, requestId, msgId), msgId, 0L, callback);
1481+
seekAsyncInternal(requestId, Commands::newSeek(consumerId_, requestId, msgId), SeekArg{msgId}, callback);
14801482
}
14811483

14821484
void ConsumerImpl::seekAsync(uint64_t timestamp, ResultCallback callback) {
@@ -1495,8 +1497,8 @@ void ConsumerImpl::seekAsync(uint64_t timestamp, ResultCallback callback) {
14951497
return;
14961498
}
14971499
const auto requestId = client->newRequestId();
1498-
seekAsyncInternal(requestId, Commands::newSeek(consumerId_, requestId, timestamp), MessageId::earliest(),
1499-
timestamp, callback);
1500+
seekAsyncInternal(requestId, Commands::newSeek(consumerId_, requestId, timestamp), SeekArg{timestamp},
1501+
callback);
15001502
}
15011503

15021504
bool ConsumerImpl::isReadCompacted() { return readCompacted_; }
@@ -1509,7 +1511,7 @@ void ConsumerImpl::hasMessageAvailableAsync(HasMessageAvailableCallback callback
15091511
(lastDequedMessageId_ == MessageId::earliest()) &&
15101512
(startMessageId_.get().value_or(MessageId::earliest()) == MessageId::latest());
15111513
}
1512-
if (compareMarkDeletePosition) {
1514+
if (compareMarkDeletePosition || hasSoughtByTimestamp_.load(std::memory_order_acquire)) {
15131515
auto self = get_shared_this_ptr();
15141516
getLastMessageIdAsync([self, callback](Result result, const GetLastMessageIdResponse& response) {
15151517
if (result != ResultOk) {
@@ -1518,8 +1520,8 @@ void ConsumerImpl::hasMessageAvailableAsync(HasMessageAvailableCallback callback
15181520
}
15191521
auto handleResponse = [self, response, callback] {
15201522
if (response.hasMarkDeletePosition() && response.getLastMessageId().entryId() >= 0) {
1521-
// We only care about comparing ledger ids and entry ids as mark delete position doesn't
1522-
// have other ids such as batch index
1523+
// We only care about comparing ledger ids and entry ids as mark delete position
1524+
// doesn't have other ids such as batch index
15231525
auto compareResult = compareLedgerAndEntryId(response.getMarkDeletePosition(),
15241526
response.getLastMessageId());
15251527
callback(ResultOk, self->config_.isStartMessageIdInclusive() ? compareResult <= 0
@@ -1528,7 +1530,8 @@ void ConsumerImpl::hasMessageAvailableAsync(HasMessageAvailableCallback callback
15281530
callback(ResultOk, false);
15291531
}
15301532
};
1531-
if (self->config_.isStartMessageIdInclusive()) {
1533+
if (self->config_.isStartMessageIdInclusive() &&
1534+
!self->hasSoughtByTimestamp_.load(std::memory_order_acquire)) {
15321535
self->seekAsync(response.getLastMessageId(), [callback, handleResponse](Result result) {
15331536
if (result != ResultOk) {
15341537
callback(result, {});
@@ -1644,8 +1647,8 @@ bool ConsumerImpl::isConnected() const { return !getCnx().expired() && state_ ==
16441647

16451648
uint64_t ConsumerImpl::getNumberOfConnectedConsumer() { return isConnected() ? 1 : 0; }
16461649

1647-
void ConsumerImpl::seekAsyncInternal(long requestId, SharedBuffer seek, const MessageId& seekId,
1648-
long timestamp, ResultCallback callback) {
1650+
void ConsumerImpl::seekAsyncInternal(long requestId, SharedBuffer seek, const SeekArg& seekArg,
1651+
ResultCallback callback) {
16491652
ClientConnectionPtr cnx = getCnx().lock();
16501653
if (!cnx) {
16511654
LOG_ERROR(getName() << " Client Connection not ready for Consumer");
@@ -1655,21 +1658,21 @@ void ConsumerImpl::seekAsyncInternal(long requestId, SharedBuffer seek, const Me
16551658

16561659
auto expected = SeekStatus::NOT_STARTED;
16571660
if (!seekStatus_.compare_exchange_strong(expected, SeekStatus::IN_PROGRESS)) {
1658-
LOG_ERROR(getName() << " attempted to seek (" << seekId << ", " << timestamp << " when the status is "
1661+
LOG_ERROR(getName() << " attempted to seek " << seekArg << " when the status is "
16591662
<< static_cast<int>(expected));
16601663
callback(ResultNotAllowedError);
16611664
return;
16621665
}
16631666

16641667
const auto originalSeekMessageId = seekMessageId_.get();
1665-
seekMessageId_ = seekId;
1666-
seekStatus_ = SeekStatus::IN_PROGRESS;
1667-
seekCallback_ = std::move(callback);
1668-
if (timestamp > 0) {
1669-
LOG_INFO(getName() << " Seeking subscription to " << timestamp);
1668+
if (boost::get<uint64_t>(&seekArg)) {
1669+
hasSoughtByTimestamp_.store(true, std::memory_order_release);
16701670
} else {
1671-
LOG_INFO(getName() << " Seeking subscription to " << seekId);
1671+
seekMessageId_ = *boost::get<MessageId>(&seekArg);
16721672
}
1673+
seekStatus_ = SeekStatus::IN_PROGRESS;
1674+
seekCallback_ = std::move(callback);
1675+
LOG_INFO(getName() << " Seeking subscription to " << seekArg);
16731676

16741677
std::weak_ptr<ConsumerImpl> weakSelf{get_shared_this_ptr()};
16751678

@@ -1692,7 +1695,9 @@ void ConsumerImpl::seekAsyncInternal(long requestId, SharedBuffer seek, const Me
16921695
// It's during reconnection, complete the seek future after connection is established
16931696
seekStatus_ = SeekStatus::COMPLETED;
16941697
} else {
1695-
startMessageId_ = seekMessageId_.get();
1698+
if (!hasSoughtByTimestamp_.load(std::memory_order_acquire)) {
1699+
startMessageId_ = seekMessageId_.get();
1700+
}
16961701
seekCallback_.release()(result);
16971702
}
16981703
} else {

lib/ConsumerImpl.h

Lines changed: 14 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,7 @@
2222
#include <pulsar/Reader.h>
2323

2424
#include <boost/optional.hpp>
25+
#include <boost/variant.hpp>
2526
#include <functional>
2627
#include <list>
2728
#include <memory>
@@ -201,7 +202,18 @@ class ConsumerImpl : public ConsumerImplBase {
201202
BrokerGetLastMessageIdCallback callback);
202203

203204
void clearReceiveQueue();
204-
void seekAsyncInternal(long requestId, SharedBuffer seek, const MessageId& seekId, long timestamp,
205+
using SeekArg = boost::variant<uint64_t, MessageId>;
206+
friend std::ostream& operator<<(std::ostream& os, const SeekArg& seekArg) {
207+
auto ptr = boost::get<uint64_t>(&seekArg);
208+
if (ptr) {
209+
os << *ptr;
210+
} else {
211+
os << *boost::get<MessageId>(&seekArg);
212+
}
213+
return os;
214+
}
215+
216+
void seekAsyncInternal(long requestId, SharedBuffer seek, const SeekArg& seekArg,
205217
ResultCallback callback);
206218
void processPossibleToDLQ(const MessageId& messageId, ProcessDLQCallBack cb);
207219

@@ -250,6 +262,7 @@ class ConsumerImpl : public ConsumerImplBase {
250262
Synchronized<ResultCallback> seekCallback_{[](Result) {}};
251263
Synchronized<boost::optional<MessageId>> startMessageId_;
252264
Synchronized<MessageId> seekMessageId_{MessageId::earliest()};
265+
std::atomic<bool> hasSoughtByTimestamp_{false};
253266

254267
bool duringSeek() const { return seekStatus_ != SeekStatus::NOT_STARTED; }
255268

tests/ReaderTest.cc

Lines changed: 67 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -700,9 +700,16 @@ TEST_P(ReaderTest, testReceiveAfterSeek) {
700700
client.close();
701701
}
702702

703-
TEST(ReaderSeekTest, testSeekForMessageId) {
704-
Client client(serviceUrl);
703+
class ReaderSeekTest : public ::testing::TestWithParam<bool> {
704+
public:
705+
void SetUp() override { client = Client{serviceUrl}; }
705706

707+
void TearDown() override { client.close(); }
708+
709+
Client client{serviceUrl};
710+
};
711+
712+
TEST_F(ReaderSeekTest, testSeekForMessageId) {
706713
const std::string topic = "test-seek-for-message-id-" + std::to_string(time(nullptr));
707714

708715
Producer producer;
@@ -752,18 +759,24 @@ TEST(ReaderSeekTest, testSeekForMessageId) {
752759
producer.close();
753760
}
754761

755-
class ReaderSeekTest : public ::testing::TestWithParam<bool> {};
756-
757-
TEST(ReaderSeekTest, testStartAtLatestMessageId) {
758-
Client client(serviceUrl);
762+
#define EXPECT_HAS_MESSAGE_AVAILABLE(reader, expected) \
763+
{ \
764+
bool actual; \
765+
ASSERT_EQ(ResultOk, reader.hasMessageAvailable(actual)); \
766+
EXPECT_EQ(actual, (expected)); \
767+
}
759768

769+
TEST_F(ReaderSeekTest, testStartAtLatestMessageId) {
760770
const std::string topic = "test-seek-latest-message-id-" + std::to_string(time(nullptr));
761771

762772
Producer producer;
763773
ASSERT_EQ(ResultOk, client.createProducer(topic, producer));
764774

765775
MessageId id;
766-
ASSERT_EQ(ResultOk, producer.send(MessageBuilder().setContent("msg").build(), id));
776+
for (int i = 0; i < 10; i++) {
777+
ASSERT_EQ(ResultOk,
778+
producer.send(MessageBuilder().setContent("msg-" + std::to_string(i)).build(), id));
779+
}
767780

768781
Reader readerExclusive;
769782
ASSERT_EQ(ResultOk,
@@ -774,20 +787,24 @@ TEST(ReaderSeekTest, testStartAtLatestMessageId) {
774787
client.createReader(topic, MessageId::latest(),
775788
ReaderConfiguration().setStartMessageIdInclusive(true), readerInclusive));
776789

790+
EXPECT_HAS_MESSAGE_AVAILABLE(readerExclusive, false);
791+
EXPECT_HAS_MESSAGE_AVAILABLE(readerInclusive, true);
792+
777793
Message msg;
778-
bool hasMsgAvaliable = false;
779-
readerInclusive.hasMessageAvailable(hasMsgAvaliable);
780-
ASSERT_TRUE(hasMsgAvaliable);
781794
ASSERT_EQ(ResultOk, readerInclusive.readNext(msg, 3000));
782-
ASSERT_EQ(ResultTimeout, readerExclusive.readNext(msg, 3000));
795+
ASSERT_EQ(msg.getDataAsString(), "msg-9");
796+
797+
readerInclusive.seek(0L);
798+
EXPECT_HAS_MESSAGE_AVAILABLE(readerInclusive, true);
799+
ASSERT_EQ(ResultOk, readerInclusive.readNext(msg, 3000));
800+
ASSERT_EQ(msg.getDataAsString(), "msg-0");
783801

784802
readerExclusive.close();
785803
readerInclusive.close();
786804
producer.close();
787805
}
788806

789-
TEST(ReaderTest, testSeekInProgress) {
790-
Client client(serviceUrl);
807+
TEST_F(ReaderSeekTest, testSeekInProgress) {
791808
const auto topic = "test-seek-in-progress-" + std::to_string(time(nullptr));
792809
Reader reader;
793810
ASSERT_EQ(ResultOk, client.createReader(topic, MessageId::earliest(), {}, reader));
@@ -798,11 +815,9 @@ TEST(ReaderTest, testSeekInProgress) {
798815
Result result;
799816
promise.getFuture().get(result);
800817
ASSERT_EQ(result, ResultNotAllowedError);
801-
client.close();
802818
}
803819

804820
TEST_P(ReaderSeekTest, testHasMessageAvailableAfterSeekToEnd) {
805-
Client client(serviceUrl);
806821
const auto topic = "test-has-message-available-after-seek-to-end-" + std::to_string(time(nullptr));
807822
Producer producer;
808823
ASSERT_EQ(ResultOk, client.createProducer(topic, producer));
@@ -814,7 +829,6 @@ TEST_P(ReaderSeekTest, testHasMessageAvailableAfterSeekToEnd) {
814829

815830
bool hasMessageAvailable;
816831
if (GetParam()) {
817-
// Test the case when `ConsumerImpl.lastMessageIdInBroker_` has been initialized
818832
ASSERT_EQ(ResultOk, reader.hasMessageAvailable(hasMessageAvailable));
819833
}
820834

@@ -834,8 +848,44 @@ TEST_P(ReaderSeekTest, testHasMessageAvailableAfterSeekToEnd) {
834848
ASSERT_EQ(ResultOk, reader.seek(MessageId::latest()));
835849
ASSERT_EQ(ResultOk, reader.hasMessageAvailable(hasMessageAvailable));
836850
ASSERT_FALSE(hasMessageAvailable);
851+
}
837852

838-
client.close();
853+
TEST_P(ReaderSeekTest, testHasMessageAvailableAfterSeekTimestamp) {
854+
using namespace std::chrono;
855+
const auto topic = "test-has-message-available-after-seek-timestamp-" + std::to_string(time(nullptr));
856+
Producer producer;
857+
ASSERT_EQ(ResultOk, client.createProducer(topic, producer));
858+
MessageId sentMsgId;
859+
const auto timestampBeforeSend =
860+
duration_cast<milliseconds>(system_clock::now().time_since_epoch()).count();
861+
ASSERT_EQ(ResultOk, producer.send(MessageBuilder().setContent("msg").build(), sentMsgId));
862+
863+
auto createReader = [this, &topic](Reader& reader, const MessageId& msgId) {
864+
ASSERT_EQ(ResultOk, client.createReader(topic, msgId, {}, reader));
865+
if (GetParam()) {
866+
if (msgId == MessageId::earliest()) {
867+
EXPECT_HAS_MESSAGE_AVAILABLE(reader, true);
868+
} else {
869+
EXPECT_HAS_MESSAGE_AVAILABLE(reader, false);
870+
}
871+
}
872+
};
873+
874+
std::vector<MessageId> msgIds{MessageId::earliest(), sentMsgId, MessageId::latest()};
875+
876+
for (auto&& msgId : msgIds) {
877+
Reader reader;
878+
createReader(reader, msgId);
879+
ASSERT_EQ(ResultOk,
880+
reader.seek(duration_cast<milliseconds>(system_clock::now().time_since_epoch()).count()));
881+
EXPECT_HAS_MESSAGE_AVAILABLE(reader, false);
882+
}
883+
for (auto&& msgId : msgIds) {
884+
Reader reader;
885+
createReader(reader, msgId);
886+
ASSERT_EQ(ResultOk, reader.seek(timestampBeforeSend));
887+
EXPECT_HAS_MESSAGE_AVAILABLE(reader, true);
888+
}
839889
}
840890

841891
INSTANTIATE_TEST_SUITE_P(Pulsar, ReaderTest, ::testing::Values(true, false));

0 commit comments

Comments
 (0)