Skip to content

Commit 3543cb1

Browse files
committed
fix timer cancel invocations
1 parent f05e135 commit 3543cb1

18 files changed

+69
-97
lines changed

lib/AckGroupingTrackerEnabled.cc

Lines changed: 2 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -121,8 +121,7 @@ AckGroupingTrackerEnabled::~AckGroupingTrackerEnabled() {
121121
this->flush();
122122
std::lock_guard<std::mutex> lock(this->mutexTimer_);
123123
if (this->timer_) {
124-
ASIO_ERROR ec;
125-
this->timer_->cancel(ec);
124+
cancelTimer(*this->timer_);
126125
}
127126
}
128127

@@ -172,7 +171,7 @@ void AckGroupingTrackerEnabled::scheduleTimer() {
172171

173172
std::lock_guard<std::mutex> lock(this->mutexTimer_);
174173
this->timer_ = this->executor_->createDeadlineTimer();
175-
this->timer_->expires_from_now(std::chrono::milliseconds(std::max(1L, this->ackGroupingTimeMs_)));
174+
this->timer_->expires_after(std::chrono::milliseconds(std::max(1L, this->ackGroupingTimeMs_)));
176175
std::weak_ptr<AckGroupingTracker> weakSelf = shared_from_this();
177176
this->timer_->async_wait([this, weakSelf](const ASIO_ERROR& ec) -> void {
178177
auto self = weakSelf.lock();

lib/AsioTimer.h

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -29,3 +29,12 @@
2929
#include "AsioDefines.h"
3030

3131
using DeadlineTimerPtr = std::shared_ptr<ASIO::steady_timer>;
32+
33+
inline void cancelTimer(ASIO::steady_timer& timer) {
34+
try {
35+
timer.cancel();
36+
} catch (const ASIO_SYSTEM_ERROR& ignored) {
37+
// Most of the time the exception can be ignored unless the following logic depends on the fact that
38+
// the timer is cancelled.
39+
}
40+
}

lib/ClientConnection.cc

Lines changed: 18 additions & 37 deletions
Original file line numberDiff line numberDiff line change
@@ -317,7 +317,7 @@ void ClientConnection::handlePulsarConnected(const proto::CommandConnected& cmdC
317317
// Only send keep-alive probes if the broker supports it
318318
keepAliveTimer_ = executor_->createDeadlineTimer();
319319
if (keepAliveTimer_) {
320-
keepAliveTimer_->expires_from_now(std::chrono::seconds(keepAliveIntervalInSeconds_));
320+
keepAliveTimer_->expires_after(std::chrono::seconds(keepAliveIntervalInSeconds_));
321321
auto weakSelf = weak_from_this();
322322
keepAliveTimer_->async_wait([weakSelf](const ASIO_ERROR&) {
323323
auto self = weakSelf.lock();
@@ -362,7 +362,7 @@ void ClientConnection::startConsumerStatsTimer(std::vector<uint64_t> consumerSta
362362
// If the close operation has reset the consumerStatsRequestTimer_ then the use_count will be zero
363363
// Check if we have a timer still before we set the request timer to pop again.
364364
if (consumerStatsRequestTimer_) {
365-
consumerStatsRequestTimer_->expires_from_now(operationsTimeout_);
365+
consumerStatsRequestTimer_->expires_after(operationsTimeout_);
366366
auto weakSelf = weak_from_this();
367367
consumerStatsRequestTimer_->async_wait([weakSelf, consumerStatsRequests](const ASIO_ERROR& err) {
368368
auto self = weakSelf.lock();
@@ -621,32 +621,12 @@ void ClientConnection::handleResolve(ASIO_ERROR err, const tcp::resolver::result
621621
ptr->connectTimeoutTask_->stop();
622622
});
623623

624-
ASIO::async_connect(socket_, results, [weakSelf](const ASIO_ERROR& err, const tcp::endpoint& endpoint) {
624+
ASIO::async_connect(*socket_, results, [weakSelf](const ASIO_ERROR& err, const tcp::endpoint& endpoint) {
625625
auto self = weakSelf.lock();
626626
if (self) {
627627
self->handleTcpConnected(err, endpoint);
628628
}
629629
});
630-
631-
// TODO: use the new resolver results API to iterate over endpoints
632-
/*
633-
LOG_DEBUG(cnxString_ << "Connecting to " << endpointIterator->endpoint() << "...");
634-
connectTimeoutTask_->start();
635-
if (endpointIterator != tcp::resolver::iterator()) {
636-
LOG_DEBUG(cnxString_ << "Resolved hostname " << endpointIterator->host_name() //
637-
<< " to " << endpointIterator->endpoint());
638-
socket_->async_connect(*endpointIterator, [weakSelf, endpointIterator](const ASIO_ERROR& err) {
639-
auto self = weakSelf.lock();
640-
if (self) {
641-
self->handleTcpConnected(err, endpointIterator);
642-
}
643-
});
644-
} else {
645-
LOG_WARN(cnxString_ << "No IP address found");
646-
close();
647-
return;
648-
}
649-
*/
650630
}
651631

652632
void ClientConnection::readNextCommand() {
@@ -1050,7 +1030,7 @@ void ClientConnection::newLookup(const SharedBuffer& cmd, uint64_t requestId,
10501030
LookupRequestData requestData;
10511031
requestData.promise = promise;
10521032
requestData.timer = executor_->createDeadlineTimer();
1053-
requestData.timer->expires_from_now(operationsTimeout_);
1033+
requestData.timer->expires_after(operationsTimeout_);
10541034
auto weakSelf = weak_from_this();
10551035
requestData.timer->async_wait([weakSelf, requestData](const ASIO_ERROR& ec) {
10561036
auto self = weakSelf.lock();
@@ -1190,7 +1170,7 @@ Future<Result, ResponseData> ClientConnection::sendRequestWithId(const SharedBuf
11901170

11911171
PendingRequestData requestData;
11921172
requestData.timer = executor_->createDeadlineTimer();
1193-
requestData.timer->expires_from_now(operationsTimeout_);
1173+
requestData.timer->expires_after(operationsTimeout_);
11941174
auto weakSelf = weak_from_this();
11951175
requestData.timer->async_wait([weakSelf, requestData](const ASIO_ERROR& ec) {
11961176
auto self = weakSelf.lock();
@@ -1245,7 +1225,7 @@ void ClientConnection::handleKeepAliveTimeout() {
12451225
// be zero And we do not attempt to dereference the pointer.
12461226
Lock lock(mutex_);
12471227
if (keepAliveTimer_) {
1248-
keepAliveTimer_->expires_from_now(std::chrono::seconds(keepAliveIntervalInSeconds_));
1228+
keepAliveTimer_->expires_after(std::chrono::seconds(keepAliveIntervalInSeconds_));
12491229
auto weakSelf = weak_from_this();
12501230
keepAliveTimer_->async_wait([weakSelf](const ASIO_ERROR&) {
12511231
auto self = weakSelf.lock();
@@ -1307,12 +1287,12 @@ void ClientConnection::close(Result result, bool detach) {
13071287
numOfPendingLookupRequest_ = 0;
13081288

13091289
if (keepAliveTimer_) {
1310-
keepAliveTimer_->cancel();
1290+
cancelTimer(*keepAliveTimer_);
13111291
keepAliveTimer_.reset();
13121292
}
13131293

13141294
if (consumerStatsRequestTimer_) {
1315-
consumerStatsRequestTimer_->cancel();
1295+
cancelTimer(*consumerStatsRequestTimer_);
13161296
consumerStatsRequestTimer_.reset();
13171297
}
13181298

@@ -1424,7 +1404,7 @@ Future<Result, GetLastMessageIdResponse> ClientConnection::newGetLastMessageId(u
14241404
LastMessageIdRequestData requestData;
14251405
requestData.promise = promise;
14261406
requestData.timer = executor_->createDeadlineTimer();
1427-
requestData.timer->expires_from_now(operationsTimeout_);
1407+
requestData.timer->expires_after(operationsTimeout_);
14281408
auto weakSelf = weak_from_this();
14291409
requestData.timer->async_wait([weakSelf, requestData](const ASIO_ERROR& ec) {
14301410
auto self = weakSelf.lock();
@@ -1472,7 +1452,7 @@ Future<Result, SchemaInfo> ClientConnection::newGetSchema(const std::string& top
14721452
lock.unlock();
14731453

14741454
auto weakSelf = weak_from_this();
1475-
timer->expires_from_now(operationsTimeout_);
1455+
timer->expires_after(operationsTimeout_);
14761456
timer->async_wait([this, weakSelf, requestId](const ASIO_ERROR& ec) {
14771457
auto self = weakSelf.lock();
14781458
if (!self) {
@@ -1559,7 +1539,7 @@ void ClientConnection::handleSuccess(const proto::CommandSuccess& success) {
15591539
lock.unlock();
15601540

15611541
requestData.promise.setValue({});
1562-
requestData.timer->cancel();
1542+
cancelTimer(*requestData.timer);
15631543
}
15641544
}
15651545

@@ -1571,7 +1551,8 @@ void ClientConnection::handlePartitionedMetadataResponse(
15711551
Lock lock(mutex_);
15721552
auto it = pendingLookupRequests_.find(partitionMetadataResponse.request_id());
15731553
if (it != pendingLookupRequests_.end()) {
1574-
it->second.timer->cancel();
1554+
cancelTimer(*it->second.timer);
1555+
15751556
LookupDataResultPromisePtr lookupDataPromise = it->second.promise;
15761557
pendingLookupRequests_.erase(it);
15771558
numOfPendingLookupRequest_--;
@@ -1650,7 +1631,7 @@ void ClientConnection::handleLookupTopicRespose(
16501631
Lock lock(mutex_);
16511632
auto it = pendingLookupRequests_.find(lookupTopicResponse.request_id());
16521633
if (it != pendingLookupRequests_.end()) {
1653-
it->second.timer->cancel();
1634+
cancelTimer(*it->second.timer);
16541635
LookupDataResultPromisePtr lookupDataPromise = it->second.promise;
16551636
pendingLookupRequests_.erase(it);
16561637
numOfPendingLookupRequest_--;
@@ -1728,7 +1709,7 @@ void ClientConnection::handleProducerSuccess(const proto::CommandProducerSuccess
17281709
data.topicEpoch = boost::none;
17291710
}
17301711
requestData.promise.setValue(data);
1731-
requestData.timer->cancel();
1712+
cancelTimer(*requestData.timer);
17321713
}
17331714
}
17341715
}
@@ -1748,7 +1729,7 @@ void ClientConnection::handleError(const proto::CommandError& error) {
17481729
lock.unlock();
17491730

17501731
requestData.promise.setFailed(result);
1751-
requestData.timer->cancel();
1732+
cancelTimer(*requestData.timer);
17521733
} else {
17531734
PendingGetLastMessageIdRequestsMap::iterator it =
17541735
pendingGetLastMessageIdRequests_.find(error.request_id());
@@ -2041,8 +2022,8 @@ void ClientConnection::unsafeRemovePendingRequest(long requestId) {
20412022
auto it = pendingRequests_.find(requestId);
20422023
if (it != pendingRequests_.end()) {
20432024
it->second.promise.setFailed(ResultDisconnected);
2044-
ASIO_ERROR ec;
2045-
it->second.timer->cancel(ec);
2025+
cancelTimer(*it->second.timer);
2026+
20462027
pendingRequests_.erase(it);
20472028
}
20482029
}

lib/ConsumerImpl.cc

Lines changed: 4 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -423,7 +423,7 @@ void ConsumerImpl::discardChunkMessages(const std::string& uuid, const MessageId
423423
}
424424

425425
void ConsumerImpl::triggerCheckExpiredChunkedTimer() {
426-
checkExpiredChunkedTimer_->expires_from_now(milliseconds(expireTimeOfIncompleteChunkedMessageMs_));
426+
checkExpiredChunkedTimer_->expires_after(milliseconds(expireTimeOfIncompleteChunkedMessageMs_));
427427
std::weak_ptr<ConsumerImplBase> weakSelf{shared_from_this()};
428428
checkExpiredChunkedTimer_->async_wait([this, weakSelf](const ASIO_ERROR& ec) -> void {
429429
auto self = weakSelf.lock();
@@ -1690,7 +1690,7 @@ void ConsumerImpl::internalGetLastMessageIdAsync(const BackoffPtr& backoff, Time
16901690
}
16911691
remainTime -= next;
16921692

1693-
timer->expires_from_now(next);
1693+
timer->expires_after(next);
16941694

16951695
auto self = shared_from_this();
16961696
timer->async_wait([this, backoff, remainTime, timer, next, callback,
@@ -1814,9 +1814,8 @@ std::shared_ptr<ConsumerImpl> ConsumerImpl::get_shared_this_ptr() {
18141814
}
18151815

18161816
void ConsumerImpl::cancelTimers() noexcept {
1817-
ASIO_ERROR ec;
1818-
batchReceiveTimer_->cancel(ec);
1819-
checkExpiredChunkedTimer_->cancel(ec);
1817+
cancelTimer(*batchReceiveTimer_);
1818+
cancelTimer(*checkExpiredChunkedTimer_);
18201819
unAckedMessageTrackerPtr_->stop();
18211820
consumerStatsBasePtr_->stop();
18221821
}

lib/ConsumerImplBase.cc

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -49,7 +49,7 @@ ConsumerImplBase::ConsumerImplBase(const ClientImplPtr& client, const std::strin
4949

5050
void ConsumerImplBase::triggerBatchReceiveTimerTask(long timeoutMs) {
5151
if (timeoutMs > 0) {
52-
batchReceiveTimer_->expires_from_now(std::chrono::milliseconds(timeoutMs));
52+
batchReceiveTimer_->expires_after(std::chrono::milliseconds(timeoutMs));
5353
std::weak_ptr<ConsumerImplBase> weakSelf{shared_from_this()};
5454
batchReceiveTimer_->async_wait([weakSelf](const ASIO_ERROR& ec) {
5555
auto self = weakSelf.lock();

lib/ExecutorService.cc

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -38,8 +38,8 @@ void ExecutorService::start() {
3838
try {
3939
io_context_.run();
4040
LOG_DEBUG("Event loop of ExecutorService exits successfully");
41-
} catch (const ASIO_ERROR &e) {
42-
LOG_ERROR("Failed to run io_context: " << e.message());
41+
} catch (const ASIO_SYSTEM_ERROR &e) {
42+
LOG_ERROR("Failed to run io_context: " << e.what());
4343
}
4444
}
4545
{

lib/HandlerBase.cc

Lines changed: 6 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -50,9 +50,8 @@ HandlerBase::HandlerBase(const ClientImplPtr& client, const std::string& topic,
5050
redirectedClusterURI_("") {}
5151

5252
HandlerBase::~HandlerBase() {
53-
ASIO_ERROR ignored;
54-
timer_->cancel(ignored);
55-
creationTimer_->cancel(ignored);
53+
cancelTimer(*timer_);
54+
cancelTimer(*creationTimer_);
5655
}
5756

5857
void HandlerBase::start() {
@@ -61,15 +60,14 @@ void HandlerBase::start() {
6160
if (state_.compare_exchange_strong(state, Pending)) {
6261
grabCnx();
6362
}
64-
creationTimer_->expires_from_now(operationTimeut_);
63+
creationTimer_->expires_after(operationTimeut_);
6564
std::weak_ptr<HandlerBase> weakSelf{shared_from_this()};
6665
creationTimer_->async_wait([this, weakSelf](const ASIO_ERROR& error) {
6766
auto self = weakSelf.lock();
6867
if (self && !error) {
6968
LOG_WARN("Cancel the pending reconnection due to the start timeout");
7069
connectionFailed(ResultTimeout);
71-
ASIO_ERROR ignored;
72-
timer_->cancel(ignored);
70+
cancelTimer(*timer_);
7371
}
7472
});
7573
}
@@ -133,8 +131,7 @@ void HandlerBase::grabCnx(const boost::optional<std::string>& assignedBrokerUrl)
133131
connectionTimeMs_ =
134132
duration_cast<milliseconds>(high_resolution_clock::now() - before).count();
135133
// Prevent the creationTimer_ from cancelling the timer_ in future
136-
ASIO_ERROR ignored;
137-
creationTimer_->cancel(ignored);
134+
cancelTimer(*creationTimer_);
138135
LOG_INFO("Finished connecting to broker after " << connectionTimeMs_ << " ms")
139136
} else if (isResultRetryable(result)) {
140137
scheduleReconnection();
@@ -188,7 +185,7 @@ void HandlerBase::scheduleReconnection(const boost::optional<std::string>& assig
188185
TimeDuration delay = assignedBrokerUrl ? std::chrono::milliseconds(0) : backoff_.next();
189186

190187
LOG_INFO(getName() << "Schedule reconnection in " << (toMillis(delay) / 1000.0) << " s");
191-
timer_->expires_from_now(delay);
188+
timer_->expires_after(delay);
192189
// passing shared_ptr here since time_ will get destroyed, so tasks will be cancelled
193190
// so we will not run into the case where grabCnx is invoked on out of scope handler
194191
auto name = getName();

lib/MultiTopicsConsumerImpl.cc

Lines changed: 3 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -507,7 +507,7 @@ void MultiTopicsConsumerImpl::closeAsync(const ResultCallback& originalCallback)
507507
failPendingBatchReceiveCallback();
508508

509509
// cancel timer
510-
batchReceiveTimer_->cancel();
510+
cancelTimer(*batchReceiveTimer_);
511511
}
512512

513513
void MultiTopicsConsumerImpl::messageReceived(const Consumer& consumer, const Message& msg) {
@@ -973,7 +973,7 @@ uint64_t MultiTopicsConsumerImpl::getNumberOfConnectedConsumer() {
973973
return numberOfConnectedConsumer;
974974
}
975975
void MultiTopicsConsumerImpl::runPartitionUpdateTask() {
976-
partitionsUpdateTimer_->expires_from_now(partitionsUpdateInterval_);
976+
partitionsUpdateTimer_->expires_after(partitionsUpdateInterval_);
977977
auto weakSelf = weak_from_this();
978978
partitionsUpdateTimer_->async_wait([weakSelf](const ASIO_ERROR& ec) {
979979
// If two requests call runPartitionUpdateTask at the same time, the timer will fail, and it
@@ -1126,8 +1126,7 @@ void MultiTopicsConsumerImpl::beforeConnectionChange(ClientConnection& cnx) {
11261126

11271127
void MultiTopicsConsumerImpl::cancelTimers() noexcept {
11281128
if (partitionsUpdateTimer_) {
1129-
ASIO_ERROR ec;
1130-
partitionsUpdateTimer_->cancel(ec);
1129+
cancelTimer(*partitionsUpdateTimer_);
11311130
}
11321131
}
11331132

lib/NegativeAcksTracker.cc

Lines changed: 2 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -56,7 +56,7 @@ void NegativeAcksTracker::scheduleTimer() {
5656
return;
5757
}
5858
std::weak_ptr<NegativeAcksTracker> weakSelf{shared_from_this()};
59-
timer_->expires_from_now(timerInterval_);
59+
timer_->expires_after(timerInterval_);
6060
timer_->async_wait([weakSelf](const ASIO_ERROR &ec) {
6161
if (auto self = weakSelf.lock()) {
6262
self->handleTimer(ec);
@@ -135,8 +135,7 @@ void NegativeAcksTracker::add(const MessageId &m) {
135135

136136
void NegativeAcksTracker::close() {
137137
closed_ = true;
138-
ASIO_ERROR ec;
139-
timer_->cancel(ec);
138+
cancelTimer(*timer_);
140139
std::lock_guard<std::mutex> lock(mutex_);
141140
nackedMessages_.clear();
142141
}

lib/PartitionedProducerImpl.cc

Lines changed: 2 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -421,7 +421,7 @@ void PartitionedProducerImpl::flushAsync(FlushCallback callback) {
421421

422422
void PartitionedProducerImpl::runPartitionUpdateTask() {
423423
auto weakSelf = weak_from_this();
424-
partitionsUpdateTimer_->expires_from_now(partitionsUpdateInterval_);
424+
partitionsUpdateTimer_->expires_after(partitionsUpdateInterval_);
425425
partitionsUpdateTimer_->async_wait([weakSelf](const ASIO_ERROR& ec) {
426426
auto self = weakSelf.lock();
427427
if (self) {
@@ -524,8 +524,7 @@ uint64_t PartitionedProducerImpl::getNumberOfConnectedProducer() {
524524

525525
void PartitionedProducerImpl::cancelTimers() noexcept {
526526
if (partitionsUpdateTimer_) {
527-
ASIO_ERROR ec;
528-
partitionsUpdateTimer_->cancel(ec);
527+
cancelTimer(*partitionsUpdateTimer_);
529528
}
530529
}
531530

0 commit comments

Comments
 (0)