Skip to content

Commit cc30a7b

Browse files
committed
Adapt to latest Asio APIs (Asio 1.32 or Boost.Asio 1.87)
1 parent fca3827 commit cc30a7b

27 files changed

+237
-278
lines changed

.github/workflows/ci-pr-validation.yaml

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -115,6 +115,11 @@ jobs:
115115
- name: Run unit tests
116116
run: RETRY_FAILED=3 ./run-unit-tests.sh
117117

118+
- name: Build with Boost.Asio
119+
run: |
120+
cmake -B build-boost-asio -DINTEGRATE_VCPKG=ON -DBUILD_TESTS=ON
121+
cmake --build build-boost-asio -j8
122+
118123
- name: Build perf tools
119124
run: |
120125
cmake . -DINTEGRATE_VCPKG=ON -DBUILD_TESTS=ON -DBUILD_PERF_TOOLS=ON

CMakeLists.txt

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -19,15 +19,16 @@
1919

2020
cmake_minimum_required(VERSION 3.13)
2121

22-
option(USE_ASIO "Use Asio instead of Boost.Asio" OFF)
23-
2422
option(INTEGRATE_VCPKG "Integrate with Vcpkg" OFF)
2523
if (INTEGRATE_VCPKG)
26-
set(USE_ASIO ON)
24+
option(USE_ASIO "Use Asio instead of Boost.Asio" ON)
2725
if (NOT CMAKE_TOOLCHAIN_FILE)
2826
set(CMAKE_TOOLCHAIN_FILE "${CMAKE_SOURCE_DIR}/vcpkg/scripts/buildsystems/vcpkg.cmake")
2927
endif ()
28+
else ()
29+
option(USE_ASIO "Use Asio instead of Boost.Asio" OFF)
3030
endif ()
31+
message(STATUS "USE_ASIO: ${USE_ASIO}")
3132

3233
option(BUILD_TESTS "Build tests" ON)
3334
message(STATUS "BUILD_TESTS: " ${BUILD_TESTS})

lib/AckGroupingTrackerEnabled.cc

Lines changed: 2 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -117,8 +117,7 @@ void AckGroupingTrackerEnabled::close() {
117117
this->flush();
118118
std::lock_guard<std::mutex> lock(this->mutexTimer_);
119119
if (this->timer_) {
120-
ASIO_ERROR ec;
121-
this->timer_->cancel(ec);
120+
this->timer_->cancel();
122121
}
123122
}
124123

@@ -168,7 +167,7 @@ void AckGroupingTrackerEnabled::scheduleTimer() {
168167

169168
std::lock_guard<std::mutex> lock(this->mutexTimer_);
170169
this->timer_ = this->executor_->createDeadlineTimer();
171-
this->timer_->expires_from_now(std::chrono::milliseconds(std::max(1L, this->ackGroupingTimeMs_)));
170+
this->timer_->expires_after(std::chrono::milliseconds(std::max(1L, this->ackGroupingTimeMs_)));
172171
auto self = shared_from_this();
173172
this->timer_->async_wait([this, self](const ASIO_ERROR& ec) -> void {
174173
if (!ec) {

lib/ClientConnection.cc

Lines changed: 137 additions & 168 deletions
Large diffs are not rendered by default.

lib/ClientConnection.h

Lines changed: 5 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -25,13 +25,13 @@
2525
#include <atomic>
2626
#ifdef USE_ASIO
2727
#include <asio/bind_executor.hpp>
28-
#include <asio/io_service.hpp>
28+
#include <asio/io_context.hpp>
2929
#include <asio/ip/tcp.hpp>
3030
#include <asio/ssl/stream.hpp>
3131
#include <asio/strand.hpp>
3232
#else
3333
#include <boost/asio/bind_executor.hpp>
34-
#include <boost/asio/io_service.hpp>
34+
#include <boost/asio/io_context.hpp>
3535
#include <boost/asio/ip/tcp.hpp>
3636
#include <boost/asio/ssl/stream.hpp>
3737
#include <boost/asio/strand.hpp>
@@ -231,13 +231,8 @@ class PULSAR_PUBLIC ClientConnection : public std::enable_shared_from_this<Clien
231231
DeadlineTimerPtr timer;
232232
};
233233

234-
/*
235-
* handler for connectAsync
236-
* creates a ConnectionPtr which has a valid ClientConnection object
237-
* although not usable at this point, since this is just tcp connection
238-
* Pulsar - Connect/Connected has yet to happen
239-
*/
240-
void handleTcpConnected(const ASIO_ERROR& err, ASIO::ip::tcp::resolver::iterator endpointIterator);
234+
void asyncConnect(const std::vector<ASIO::ip::tcp::endpoint>& endpoints, size_t index);
235+
void completeConnect(ASIO::ip::tcp::endpoint endpoint);
241236

242237
void handleHandshake(const ASIO_ERROR& err);
243238

@@ -260,8 +255,6 @@ class PULSAR_PUBLIC ClientConnection : public std::enable_shared_from_this<Clien
260255

261256
void handlePulsarConnected(const proto::CommandConnected& cmdConnected);
262257

263-
void handleResolve(const ASIO_ERROR& err, ASIO::ip::tcp::resolver::iterator endpointIterator);
264-
265258
void handleSend(const ASIO_ERROR& err, const SharedBuffer& cmd);
266259
void handleSendPair(const ASIO_ERROR& err);
267260
void sendPendingCommands();
@@ -324,7 +317,7 @@ class PULSAR_PUBLIC ClientConnection : public std::enable_shared_from_this<Clien
324317
*/
325318
SocketPtr socket_;
326319
TlsSocketPtr tlsSocket_;
327-
ASIO::strand<ASIO::io_service::executor_type> strand_;
320+
ASIO::strand<ASIO::io_context::executor_type> strand_;
328321

329322
const std::string logicalAddress_;
330323
/*

lib/ConsumerImpl.cc

Lines changed: 4 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -422,7 +422,7 @@ void ConsumerImpl::discardChunkMessages(std::string uuid, MessageId messageId, b
422422
}
423423

424424
void ConsumerImpl::triggerCheckExpiredChunkedTimer() {
425-
checkExpiredChunkedTimer_->expires_from_now(milliseconds(expireTimeOfIncompleteChunkedMessageMs_));
425+
checkExpiredChunkedTimer_->expires_after(milliseconds(expireTimeOfIncompleteChunkedMessageMs_));
426426
std::weak_ptr<ConsumerImplBase> weakSelf{shared_from_this()};
427427
checkExpiredChunkedTimer_->async_wait([this, weakSelf](const ASIO_ERROR& ec) -> void {
428428
auto self = weakSelf.lock();
@@ -1668,7 +1668,7 @@ void ConsumerImpl::internalGetLastMessageIdAsync(const BackoffPtr& backoff, Time
16681668
}
16691669
remainTime -= next;
16701670

1671-
timer->expires_from_now(next);
1671+
timer->expires_after(next);
16721672

16731673
auto self = shared_from_this();
16741674
timer->async_wait([this, backoff, remainTime, timer, next, callback,
@@ -1791,9 +1791,8 @@ std::shared_ptr<ConsumerImpl> ConsumerImpl::get_shared_this_ptr() {
17911791
}
17921792

17931793
void ConsumerImpl::cancelTimers() noexcept {
1794-
ASIO_ERROR ec;
1795-
batchReceiveTimer_->cancel(ec);
1796-
checkExpiredChunkedTimer_->cancel(ec);
1794+
batchReceiveTimer_->cancel();
1795+
checkExpiredChunkedTimer_->cancel();
17971796
unAckedMessageTrackerPtr_->stop();
17981797
consumerStatsBasePtr_->stop();
17991798
}

lib/ConsumerImplBase.cc

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -51,7 +51,7 @@ ConsumerImplBase::ConsumerImplBase(ClientImplPtr client, const std::string& topi
5151

5252
void ConsumerImplBase::triggerBatchReceiveTimerTask(long timeoutMs) {
5353
if (timeoutMs > 0) {
54-
batchReceiveTimer_->expires_from_now(std::chrono::milliseconds(timeoutMs));
54+
batchReceiveTimer_->expires_after(std::chrono::milliseconds(timeoutMs));
5555
std::weak_ptr<ConsumerImplBase> weakSelf{shared_from_this()};
5656
batchReceiveTimer_->async_wait([weakSelf](const ASIO_ERROR& ec) {
5757
auto self = weakSelf.lock();

lib/ExecutorService.cc

Lines changed: 20 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,12 @@
1818
*/
1919
#include "ExecutorService.h"
2020

21+
#ifdef USE_ASIO
22+
#include <asio/post.hpp>
23+
#else
24+
#include <boost/asio/post.hpp>
25+
#endif
26+
2127
#include "LogUtils.h"
2228
#include "TimeUtils.h"
2329
DECLARE_LOG_OBJECT()
@@ -31,18 +37,13 @@ ExecutorService::~ExecutorService() { close(0); }
3137
void ExecutorService::start() {
3238
auto self = shared_from_this();
3339
std::thread t{[this, self] {
34-
LOG_DEBUG("Run io_service in a single thread");
35-
ASIO_ERROR ec;
40+
LOG_DEBUG("Run io_context in a single thread");
3641
while (!closed_) {
37-
io_service_.restart();
38-
IOService::work work{getIOService()};
39-
io_service_.run(ec);
40-
}
41-
if (ec) {
42-
LOG_ERROR("Failed to run io_service: " << ec.message());
43-
} else {
44-
LOG_DEBUG("Event loop of ExecutorService exits successfully");
42+
io_context_.restart();
43+
auto work{ASIO::make_work_guard(io_context_)};
44+
io_context_.run();
4545
}
46+
LOG_DEBUG("Event loop of ExecutorService exits successfully");
4647
{
4748
std::lock_guard<std::mutex> lock{mutex_};
4849
ioServiceDone_ = true;
@@ -63,12 +64,12 @@ ExecutorServicePtr ExecutorService::create() {
6364
}
6465

6566
/*
66-
* factory method of ASIO::ip::tcp::socket associated with io_service_ instance
67+
* factory method of ASIO::ip::tcp::socket associated with io_context_ instance
6768
* @ returns shared_ptr to this socket
6869
*/
6970
SocketPtr ExecutorService::createSocket() {
7071
try {
71-
return SocketPtr(new ASIO::ip::tcp::socket(io_service_));
72+
return SocketPtr(new ASIO::ip::tcp::socket(io_context_));
7273
} catch (const ASIO_SYSTEM_ERROR &e) {
7374
restart();
7475
auto error = std::string("Failed to create socket: ") + e.what();
@@ -82,12 +83,12 @@ TlsSocketPtr ExecutorService::createTlsSocket(SocketPtr &socket, ASIO::ssl::cont
8283
}
8384

8485
/*
85-
* factory method of Resolver object associated with io_service_ instance
86+
* factory method of Resolver object associated with io_context_ instance
8687
* @returns shraed_ptr to resolver object
8788
*/
8889
TcpResolverPtr ExecutorService::createTcpResolver() {
8990
try {
90-
return TcpResolverPtr(new ASIO::ip::tcp::resolver(io_service_));
91+
return TcpResolverPtr(new ASIO::ip::tcp::resolver(io_context_));
9192
} catch (const ASIO_SYSTEM_ERROR &e) {
9293
restart();
9394
auto error = std::string("Failed to create resolver: ") + e.what();
@@ -97,36 +98,36 @@ TcpResolverPtr ExecutorService::createTcpResolver() {
9798

9899
DeadlineTimerPtr ExecutorService::createDeadlineTimer() {
99100
try {
100-
return DeadlineTimerPtr(new ASIO::steady_timer(io_service_));
101+
return DeadlineTimerPtr(new ASIO::steady_timer(io_context_));
101102
} catch (const ASIO_SYSTEM_ERROR &e) {
102103
restart();
103104
auto error = std::string("Failed to create steady_timer: ") + e.what();
104105
throw std::runtime_error(error);
105106
}
106107
}
107108

108-
void ExecutorService::restart() { io_service_.stop(); }
109+
void ExecutorService::restart() { io_context_.stop(); }
109110

110111
void ExecutorService::close(long timeoutMs) {
111112
bool expectedState = false;
112113
if (!closed_.compare_exchange_strong(expectedState, true)) {
113114
return;
114115
}
115116
if (timeoutMs == 0) { // non-blocking
116-
io_service_.stop();
117+
io_context_.stop();
117118
return;
118119
}
119120

120121
std::unique_lock<std::mutex> lock{mutex_};
121-
io_service_.stop();
122+
io_context_.stop();
122123
if (timeoutMs > 0) {
123124
cond_.wait_for(lock, std::chrono::milliseconds(timeoutMs), [this] { return ioServiceDone_; });
124125
} else { // < 0
125126
cond_.wait(lock, [this] { return ioServiceDone_; });
126127
}
127128
}
128129

129-
void ExecutorService::postWork(std::function<void(void)> task) { io_service_.post(task); }
130+
void ExecutorService::postWork(std::function<void(void)> task) { ASIO::post(io_context_, task); }
130131

131132
/////////////////////
132133

lib/ExecutorService.h

Lines changed: 6 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -23,11 +23,11 @@
2323

2424
#include <atomic>
2525
#ifdef USE_ASIO
26-
#include <asio/io_service.hpp>
26+
#include <asio/io_context.hpp>
2727
#include <asio/ip/tcp.hpp>
2828
#include <asio/ssl.hpp>
2929
#else
30-
#include <boost/asio/io_service.hpp>
30+
#include <boost/asio/io_context.hpp>
3131
#include <boost/asio/ip/tcp.hpp>
3232
#include <boost/asio/ssl.hpp>
3333
#endif
@@ -46,7 +46,7 @@ typedef std::shared_ptr<ASIO::ssl::stream<ASIO::ip::tcp::socket &> > TlsSocketPt
4646
typedef std::shared_ptr<ASIO::ip::tcp::resolver> TcpResolverPtr;
4747
class PULSAR_PUBLIC ExecutorService : public std::enable_shared_from_this<ExecutorService> {
4848
public:
49-
using IOService = ASIO::io_service;
49+
using IOService = ASIO::io_context;
5050
using SharedPtr = std::shared_ptr<ExecutorService>;
5151

5252
static SharedPtr create();
@@ -67,14 +67,14 @@ class PULSAR_PUBLIC ExecutorService : public std::enable_shared_from_this<Execut
6767
// See TimeoutProcessor for the semantics of the parameter.
6868
void close(long timeoutMs = 3000);
6969

70-
IOService &getIOService() { return io_service_; }
70+
IOService &getIOService() { return io_context_; }
7171
bool isClosed() const noexcept { return closed_; }
7272

7373
private:
7474
/*
75-
* io_service is our interface to os, io object schedule async ops on this object
75+
* io_context is our interface to os, io object schedule async ops on this object
7676
*/
77-
IOService io_service_;
77+
IOService io_context_;
7878

7979
std::atomic_bool closed_{false};
8080
std::mutex mutex_;

lib/HandlerBase.cc

Lines changed: 6 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -50,9 +50,8 @@ HandlerBase::HandlerBase(const ClientImplPtr& client, const std::string& topic,
5050
redirectedClusterURI_("") {}
5151

5252
HandlerBase::~HandlerBase() {
53-
ASIO_ERROR ignored;
54-
timer_->cancel(ignored);
55-
creationTimer_->cancel(ignored);
53+
timer_->cancel();
54+
creationTimer_->cancel();
5655
}
5756

5857
void HandlerBase::start() {
@@ -61,15 +60,14 @@ void HandlerBase::start() {
6160
if (state_.compare_exchange_strong(state, Pending)) {
6261
grabCnx();
6362
}
64-
creationTimer_->expires_from_now(operationTimeut_);
63+
creationTimer_->expires_after(operationTimeut_);
6564
std::weak_ptr<HandlerBase> weakSelf{shared_from_this()};
6665
creationTimer_->async_wait([this, weakSelf](const ASIO_ERROR& error) {
6766
auto self = weakSelf.lock();
6867
if (self && !error) {
6968
LOG_WARN("Cancel the pending reconnection due to the start timeout");
7069
connectionFailed(ResultTimeout);
71-
ASIO_ERROR ignored;
72-
timer_->cancel(ignored);
70+
timer_->cancel();
7371
}
7472
});
7573
}
@@ -133,8 +131,7 @@ void HandlerBase::grabCnx(const boost::optional<std::string>& assignedBrokerUrl)
133131
connectionTimeMs_ =
134132
duration_cast<milliseconds>(high_resolution_clock::now() - before).count();
135133
// Prevent the creationTimer_ from cancelling the timer_ in future
136-
ASIO_ERROR ignored;
137-
creationTimer_->cancel(ignored);
134+
creationTimer_->cancel();
138135
LOG_INFO("Finished connecting to broker after " << connectionTimeMs_ << " ms")
139136
} else if (isResultRetryable(result)) {
140137
scheduleReconnection();
@@ -188,7 +185,7 @@ void HandlerBase::scheduleReconnection(const boost::optional<std::string>& assig
188185
TimeDuration delay = assignedBrokerUrl ? std::chrono::milliseconds(0) : backoff_.next();
189186

190187
LOG_INFO(getName() << "Schedule reconnection in " << (toMillis(delay) / 1000.0) << " s");
191-
timer_->expires_from_now(delay);
188+
timer_->expires_after(delay);
192189
// passing shared_ptr here since time_ will get destroyed, so tasks will be cancelled
193190
// so we will not run into the case where grabCnx is invoked on out of scope handler
194191
auto name = getName();

0 commit comments

Comments
 (0)