Skip to content

Commit c82e899

Browse files
committed
Fix possible zombie consumer when closing after reconnection
1 parent 3be5267 commit c82e899

File tree

3 files changed

+102
-21
lines changed

3 files changed

+102
-21
lines changed

lib/ConsumerImpl.cc

Lines changed: 60 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -255,6 +255,8 @@ Future<Result, bool> ConsumerImpl::connectionOpened(const ClientConnectionPtr& c
255255
if (duringSeek()) {
256256
ackGroupingTrackerPtr_->flushAndClean();
257257
}
258+
// Set connection after flushing the ack tracker to avoid sending ACKs on the new connection
259+
setCnx(cnx);
258260

259261
Lock lockForMessageId(mutexForMessageId_);
260262
clearReceiveQueue();
@@ -312,7 +314,6 @@ Result ConsumerImpl::handleCreateConsumer(const ClientConnectionPtr& cnx, Result
312314
LOG_INFO(getName() << "Created consumer on broker " << cnx->cnxString());
313315
{
314316
Lock mutexLock(mutex_);
315-
setCnx(cnx);
316317
incomingMessages_.clear();
317318
possibleSendToDeadLetterTopicMessages_.clear();
318319
state_ = Ready;
@@ -338,8 +339,14 @@ Result ConsumerImpl::handleCreateConsumer(const ClientConnectionPtr& cnx, Result
338339
} else if (messageListener_) {
339340
sendFlowPermitsToBroker(cnx, 1);
340341
}
342+
{
343+
std::lock_guard<std::mutex> lock{subscribeMutex_};
344+
subscribePromise_.setValue(true); // trigger pending requests like seek or getLastMessageId
345+
}
341346
consumerCreatedPromise_.setValue(get_shared_this_ptr());
342347
} else {
348+
// TODO: it's not tested yet because it's hard to mock subscribe failure after reconnection
349+
subscribePromise_.setFailed(result);
343350
if (result == ResultTimeout) {
344351
// Creating the consumer has timed out. We need to ensure the broker closes the consumer
345352
// in case it was indeed created, otherwise it might prevent new subscribe operation,
@@ -1311,6 +1318,10 @@ void ConsumerImpl::disconnectConsumer(const boost::optional<std::string>& assign
13111318
LOG_INFO("Broker notification of Closed consumer: "
13121319
<< consumerId_ << (assignedBrokerUrl ? (" assignedBrokerUrl: " + assignedBrokerUrl.get()) : ""));
13131320
resetCnx();
1321+
{
1322+
std::lock_guard<std::mutex> lock{subscribeMutex_};
1323+
subscribePromise_ = Promise<Result, bool>{};
1324+
}
13141325
scheduleReconnection(assignedBrokerUrl);
13151326
}
13161327

@@ -1542,13 +1553,23 @@ void ConsumerImpl::seekAsync(const MessageId& msgId, const ResultCallback& callb
15421553
return;
15431554
}
15441555

1545-
ClientImplPtr client = client_.lock();
1546-
if (!client) {
1547-
LOG_ERROR(getName() << "Client is expired when seekAsync " << msgId);
1548-
return;
1549-
}
1550-
const auto requestId = client->newRequestId();
1551-
seekAsyncInternal(requestId, Commands::newSeek(consumerId_, requestId, msgId), SeekArg{msgId}, callback);
1556+
auto self = shared_from_this();
1557+
sendRequestAfterSubscribed([this, self, msgId, callback](Result result) {
1558+
if (result != ResultOk) {
1559+
if (callback) {
1560+
callback(result);
1561+
}
1562+
return;
1563+
}
1564+
ClientImplPtr client = client_.lock();
1565+
if (!client) {
1566+
LOG_ERROR(getName() << "Client is expired when seekAsync " << msgId);
1567+
return;
1568+
}
1569+
const auto requestId = client->newRequestId();
1570+
seekAsyncInternal(requestId, Commands::newSeek(consumerId_, requestId, msgId), SeekArg{msgId},
1571+
callback);
1572+
});
15521573
}
15531574

15541575
void ConsumerImpl::seekAsync(uint64_t timestamp, const ResultCallback& callback) {
@@ -1561,14 +1582,23 @@ void ConsumerImpl::seekAsync(uint64_t timestamp, const ResultCallback& callback)
15611582
return;
15621583
}
15631584

1564-
ClientImplPtr client = client_.lock();
1565-
if (!client) {
1566-
LOG_ERROR(getName() << "Client is expired when seekAsync " << timestamp);
1567-
return;
1568-
}
1569-
const auto requestId = client->newRequestId();
1570-
seekAsyncInternal(requestId, Commands::newSeek(consumerId_, requestId, timestamp), SeekArg{timestamp},
1571-
callback);
1585+
auto self = shared_from_this();
1586+
sendRequestAfterSubscribed([this, self, timestamp, callback](Result result) {
1587+
if (result != ResultOk) {
1588+
if (callback) {
1589+
callback(result);
1590+
}
1591+
return;
1592+
}
1593+
ClientImplPtr client = client_.lock();
1594+
if (!client) {
1595+
LOG_ERROR(getName() << "Client is expired when seekAsync " << timestamp);
1596+
return;
1597+
}
1598+
const auto requestId = client->newRequestId();
1599+
seekAsyncInternal(requestId, Commands::newSeek(consumerId_, requestId, timestamp), SeekArg{timestamp},
1600+
callback);
1601+
});
15721602
}
15731603

15741604
bool ConsumerImpl::isReadCompacted() { return readCompacted_; }
@@ -1645,11 +1675,20 @@ void ConsumerImpl::getLastMessageIdAsync(const BrokerGetLastMessageIdCallback& c
16451675
return;
16461676
}
16471677

1648-
TimeDuration operationTimeout = seconds(client_.lock()->conf().getOperationTimeoutSeconds());
1649-
BackoffPtr backoff = std::make_shared<Backoff>(milliseconds(100), operationTimeout * 2, milliseconds(0));
1650-
DeadlineTimerPtr timer = executor_->createDeadlineTimer();
1651-
1652-
internalGetLastMessageIdAsync(backoff, operationTimeout, timer, callback);
1678+
auto self = shared_from_this();
1679+
sendRequestAfterSubscribed([this, self, callback](Result result) {
1680+
if (result != ResultOk) {
1681+
if (callback) {
1682+
callback(result, {});
1683+
}
1684+
return;
1685+
}
1686+
TimeDuration operationTimeout = seconds(client_.lock()->conf().getOperationTimeoutSeconds());
1687+
BackoffPtr backoff =
1688+
std::make_shared<Backoff>(milliseconds(100), operationTimeout * 2, milliseconds(0));
1689+
DeadlineTimerPtr timer = executor_->createDeadlineTimer();
1690+
internalGetLastMessageIdAsync(backoff, operationTimeout, timer, callback);
1691+
});
16531692
}
16541693

16551694
void ConsumerImpl::internalGetLastMessageIdAsync(const BackoffPtr& backoff, TimeDuration remainTime,

lib/ConsumerImpl.h

Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -266,6 +266,21 @@ class ConsumerImpl : public ConsumerImplBase {
266266
Synchronized<MessageId> seekMessageId_{MessageId::earliest()};
267267
std::atomic<bool> hasSoughtByTimestamp_{false};
268268

269+
mutable std::mutex subscribeMutex_;
270+
Promise<Result, bool> subscribePromise_;
271+
272+
template <typename SendRequestFunc>
273+
inline void sendRequestAfterSubscribed(SendRequestFunc&& sendRequest) {
274+
Lock lock{subscribeMutex_};
275+
if (subscribePromise_.isComplete()) {
276+
lock.unlock();
277+
sendRequest(ResultOk);
278+
} else {
279+
subscribePromise_.getFuture().addListener(
280+
[sendRequest](Result result, bool) { sendRequest(result); });
281+
}
282+
}
283+
269284
bool hasSoughtByTimestamp() const { return hasSoughtByTimestamp_.load(std::memory_order_acquire); }
270285
bool duringSeek() const { return seekStatus_ != SeekStatus::NOT_STARTED; }
271286

tests/ConsumerTest.cc

Lines changed: 27 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1560,4 +1560,31 @@ TEST(ConsumerTest, testConsumerListenerShouldNotSegfaultAfterClose) {
15601560
ASSERT_EQ(ResultOk, client.close());
15611561
}
15621562

1563+
TEST(ConsumerTest, testCloseAfterSeek) {
1564+
const auto topic = "test-close-after-seek-" + std::to_string(time(nullptr));
1565+
const auto subscription = "sub";
1566+
Client client(lookupUrl);
1567+
Consumer consumer;
1568+
ASSERT_EQ(ResultOk, client.subscribe(topic, subscription, consumer));
1569+
ASSERT_EQ(ResultOk, consumer.seek(TimeUtils::currentTimeMillis()));
1570+
ASSERT_EQ(ResultOk, consumer.close());
1571+
1572+
// Test the previous consumer will be closed even after seek is done, at the moment the connection might
1573+
// not be established.
1574+
ASSERT_EQ(ResultOk, client.subscribe(topic, subscription, consumer));
1575+
client.close();
1576+
}
1577+
1578+
TEST(ConsumerTest, testSeekAfterSeek) {
1579+
const auto topic = "test-seek-after-seek-" + std::to_string(time(nullptr));
1580+
const auto subscription = "sub";
1581+
Client client(lookupUrl);
1582+
Consumer consumer;
1583+
ASSERT_EQ(ResultOk, client.subscribe(topic, subscription, consumer));
1584+
ASSERT_EQ(ResultOk, consumer.seek(TimeUtils::currentTimeMillis()));
1585+
ASSERT_EQ(ResultOk, consumer.seek(MessageId::earliest()));
1586+
ASSERT_EQ(ResultOk, consumer.seek(TimeUtils::currentTimeMillis()));
1587+
client.close();
1588+
}
1589+
15631590
} // namespace pulsar

0 commit comments

Comments
 (0)