Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions .github/workflows/build-macos.yml
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@ jobs:
-DTCP_PUBSUB_USE_BUILTIN_RECYCLE=ON \
-DTCP_PUBSUB_USE_BUILTIN_GTEST=ON \
-DCMAKE_BUILD_TYPE=${{env.BUILD_TYPE}} \
-DCMAKE_CXX_FLAGS=-DASIO_NO_DEPRECATED
shell: bash

- name: Build
Expand Down
3 changes: 2 additions & 1 deletion .github/workflows/build-ubuntu.yml
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,7 @@ jobs:
-DTCP_PUBSUB_USE_BUILTIN_GTEST=ON \
-DTCP_PUBSUB_LIBRARY_TYPE=${{env.tcp_pubsub_library_type}} \
-DCMAKE_BUILD_TYPE=${{env.BUILD_TYPE}} \
-DCMAKE_CXX_FLAGS=-DASIO_NO_DEPRECATED \
-DBUILD_SHARED_LIBS=${{ env.build_shared_libs }}

- name: Build
Expand Down Expand Up @@ -136,4 +137,4 @@ jobs:
- name: Run integration test (Debug)
run: ./integration_test
working-directory: ${{ github.workspace }}/samples/integration_test/_build/debug
if: ${{ matrix.library_type != 'object' }}
if: ${{ matrix.library_type != 'object' }}
5 changes: 3 additions & 2 deletions .github/workflows/build-windows.yml
Original file line number Diff line number Diff line change
Expand Up @@ -9,8 +9,8 @@ env:
# Customize the CMake build type here (Release, Debug, RelWithDebInfo, etc.)
INSTALL_PREFIX: _install
PROJECT_NAME: tcp_pubsub
VS_TOOLSET: v140
VS_NAME: vs2015
VS_TOOLSET: v141
VS_NAME: vs2017

jobs:
build-windows:
Expand Down Expand Up @@ -71,6 +71,7 @@ jobs:
-DTCP_PUBSUB_USE_BUILTIN_RECYCLE=ON ^
-DTCP_PUBSUB_USE_BUILTIN_GTEST=ON ^
-DTCP_PUBSUB_LIBRARY_TYPE=${{env.tcp_pubsub_library_type}} ^
-DCMAKE_CXX_FLAGS=/DASIO_NO_DEPRECATED ^
-DCMAKE_INSTALL_PREFIX=${{env.INSTALL_PREFIX}}

- name: Build (Release)
Expand Down
12 changes: 6 additions & 6 deletions tcp_pubsub/src/executor_impl.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -18,8 +18,8 @@ namespace tcp_pubsub
{
Executor_Impl::Executor_Impl(const logger::logger_t& log_function)
: log_(log_function)
, io_service_(std::make_shared<asio::io_service>())
, dummy_work_(std::make_shared<asio::io_service::work>(*io_service_))
, io_context_(std::make_shared<asio::io_context>())
, dummy_work_(std::make_shared<work_guard_t>(io_context_->get_executor()))
{
#if (TCP_PUBSUB_LOG_DEBUG_ENABLED)
log_(logger::LogLevel::Debug, "Executor: Creating Executor.");
Expand Down Expand Up @@ -76,7 +76,7 @@ namespace tcp_pubsub
me->log_(logger::LogLevel::Debug, "Executor: IoService::Run() in thread " + thread_id);
#endif

me->io_service_->run();
me->io_context_->run();

#if (TCP_PUBSUB_LOG_DEBUG_ENABLED)
me->log_(logger::LogLevel::Debug, "Executor: IoService: Shutdown of thread " + thread_id);
Expand All @@ -95,12 +95,12 @@ namespace tcp_pubsub
dummy_work_.reset();

// Stop the IO Service
io_service_->stop();
io_context_->stop();
}

std::shared_ptr<asio::io_service> Executor_Impl::ioService() const
std::shared_ptr<asio::io_context> Executor_Impl::ioService() const
{
return io_service_;
return io_context_;
}

logger::logger_t Executor_Impl::logFunction() const
Expand Down
7 changes: 4 additions & 3 deletions tcp_pubsub/src/executor_impl.h
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ namespace tcp_pubsub
void start(size_t thread_count);
void stop();

std::shared_ptr<asio::io_service> ioService() const;
std::shared_ptr<asio::io_context> ioService() const;
logger::logger_t logFunction() const;


Expand All @@ -43,9 +43,10 @@ namespace tcp_pubsub

private:
const logger::logger_t log_; /// Logger
std::shared_ptr<asio::io_service> io_service_; /// global io service
std::shared_ptr<asio::io_context> io_context_; /// global io service

std::vector<std::thread> thread_pool_; /// Asio threadpool executing the io servic
std::shared_ptr<asio::io_service::work> dummy_work_; /// Dummy work, so the io_service will never run out of work and shut down, even if there is no publisher or subscriber at the moment
using work_guard_t = asio::executor_work_guard<asio::io_context::executor_type>;
std::shared_ptr<work_guard_t> dummy_work_; /// Dummy work, so the io_context will never run out of work and shut down, even if there is no publisher or subscriber at the moment
};
}
2 changes: 1 addition & 1 deletion tcp_pubsub/src/publisher_impl.h
Original file line number Diff line number Diff line change
Expand Up @@ -83,7 +83,7 @@ namespace tcp_pubsub
std::atomic<bool> is_running_; /// Indicates whether this publisher is running and can send data. May be false, if e.g. binding to the given address has failed.

// Asio
const std::shared_ptr<Executor> executor_; /// Global Executor (holding the io_service and thread pool)
const std::shared_ptr<Executor> executor_; /// Global Executor (holding the io_context and thread pool)
asio::ip::tcp::acceptor acceptor_; /// Acceptor used for waiting for clients (i.e. subscribers)

// Logger
Expand Down
18 changes: 9 additions & 9 deletions tcp_pubsub/src/publisher_session.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -30,15 +30,15 @@ namespace tcp_pubsub
/// Constructor & Destructor
//////////////////////////////////////////////

PublisherSession::PublisherSession(const std::shared_ptr<asio::io_service>& io_service
PublisherSession::PublisherSession(const std::shared_ptr<asio::io_context>& io_context
, const std::function<void(const std::shared_ptr<PublisherSession>&)>& session_closed_handler
, const tcp_pubsub::logger::logger_t& log_function)
: io_service_ (io_service)
: io_context_ (io_context)
, state_ (State::NotStarted)
, session_closed_handler_ (session_closed_handler)
, log_ (log_function)
, data_socket_ (*io_service_)
, data_strand_ (*io_service_)
, data_socket_ (*io_context_)
, data_strand_ (*io_context_)
, sending_in_progress_ (false)
{
#if (TCP_PUBSUB_LOG_DEBUG_VERBOSE_ENABLED)
Expand Down Expand Up @@ -132,7 +132,7 @@ namespace tcp_pubsub
asio::async_read(data_socket_
, asio::buffer(&(header->header_size), sizeof(header->header_size))
, asio::transfer_at_least(sizeof(header->header_size))
, data_strand_.wrap([me = shared_from_this(), header](asio::error_code ec, std::size_t /*length*/)
, asio::bind_executor(data_strand_, [me = shared_from_this(), header](asio::error_code ec, std::size_t /*length*/)
{
if (ec)
{
Expand Down Expand Up @@ -165,7 +165,7 @@ namespace tcp_pubsub
asio::async_read(data_socket_
, asio::buffer(&reinterpret_cast<char*>(header.get())[sizeof(header->header_size)], bytes_to_read_from_socket)
, asio::transfer_at_least(bytes_to_read_from_socket)
, data_strand_.wrap([me = shared_from_this(), header, bytes_to_discard_from_socket](asio::error_code ec, std::size_t /*length*/)
, asio::bind_executor(data_strand_, [me = shared_from_this(), header, bytes_to_discard_from_socket](asio::error_code ec, std::size_t /*length*/)
{
if (ec)
{
Expand Down Expand Up @@ -207,7 +207,7 @@ namespace tcp_pubsub
asio::async_read(data_socket_
, asio::buffer(data_to_discard.data(), bytes_to_discard)
, asio::transfer_at_least(bytes_to_discard)
, data_strand_.wrap([me = shared_from_this(), header](asio::error_code ec, std::size_t /*length*/)
, asio::bind_executor(data_strand_, [me = shared_from_this(), header](asio::error_code ec, std::size_t /*length*/)
{
if (ec)
{
Expand Down Expand Up @@ -240,7 +240,7 @@ namespace tcp_pubsub
asio::async_read(data_socket_
, asio::buffer(data_buffer->data(), le64toh(header->data_size))
, asio::transfer_at_least(le64toh(header->data_size))
, data_strand_.wrap([me = shared_from_this(), header, data_buffer](asio::error_code ec, std::size_t /*length*/)
, asio::bind_executor(data_strand_, [me = shared_from_this(), header, data_buffer](asio::error_code ec, std::size_t /*length*/)
{
if (ec)
{
Expand Down Expand Up @@ -341,7 +341,7 @@ namespace tcp_pubsub

asio::async_write(data_socket_
, asio::buffer(*buffer)
, data_strand_.wrap(
, asio::bind_executor(data_strand_,
[me = shared_from_this(), buffer](asio::error_code ec, std::size_t /*bytes_to_transfer*/)
{
#if (TCP_PUBSUB_LOG_DEBUG_VERBOSE_ENABLED)
Expand Down
8 changes: 4 additions & 4 deletions tcp_pubsub/src/publisher_session.h
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ namespace tcp_pubsub
/// Constructor & Destructor
//////////////////////////////////////////////
public:
PublisherSession(const std::shared_ptr<asio::io_service>& io_service
PublisherSession(const std::shared_ptr<asio::io_context>& io_context
, const std::function<void(const std::shared_ptr<PublisherSession>&)>& session_closed_handler
, const tcp_pubsub::logger::logger_t& log_function);

Expand Down Expand Up @@ -97,7 +97,7 @@ namespace tcp_pubsub
//////////////////////////////////////////////
private:
// Asio IO Service
std::shared_ptr<asio::io_service> io_service_;
std::shared_ptr<asio::io_context> io_context_;

// Whether the session has been canceled
std::atomic<State> state_;
Expand All @@ -109,11 +109,11 @@ namespace tcp_pubsub

// TCP Socket & Queue (protected by the strand!)
asio::ip::tcp::socket data_socket_;
asio::io_service::strand data_strand_;
asio::io_context::strand data_strand_;

// Variable holding if we are currently sending any data and what data to send next
std::mutex next_buffer_mutex_;
bool sending_in_progress_;
std::shared_ptr<std::vector<char>> next_buffer_to_send_;
};
}
}
58 changes: 29 additions & 29 deletions tcp_pubsub/src/subscriber_session_impl.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -32,20 +32,20 @@ namespace tcp_pubsub
/// Constructor & Destructor
//////////////////////////////////////////////

SubscriberSession_Impl::SubscriberSession_Impl(const std::shared_ptr<asio::io_service>& io_service
SubscriberSession_Impl::SubscriberSession_Impl(const std::shared_ptr<asio::io_context>& io_context
, const std::vector<std::pair<std::string, uint16_t>>& publisher_list
, int max_reconnection_attempts
, const std::function<std::shared_ptr<std::vector<char>>()>& get_buffer_handler
, const std::function<void(const std::shared_ptr<SubscriberSession_Impl>&)>& session_closed_handler
, const tcp_pubsub::logger::logger_t& log_function)
: publisher_list_ (publisher_list)
, resolver_ (*io_service)
, resolver_ (*io_context)
, max_reconnection_attempts_(max_reconnection_attempts)
, retries_left_ (max_reconnection_attempts)
, retry_timer_ (*io_service, std::chrono::seconds(1))
, retry_timer_ (*io_context, std::chrono::seconds(1))
, canceled_ (false)
, data_socket_ (*io_service)
, data_strand_ (*io_service)
, data_socket_ (*io_context)
, data_strand_ (*io_context)
, get_buffer_handler_ (get_buffer_handler)
, session_closed_handler_ (session_closed_handler)
, log_ (log_function)
Expand Down Expand Up @@ -88,16 +88,15 @@ namespace tcp_pubsub

void SubscriberSession_Impl::resolveEndpoint(size_t publisher_list_index)
{
const asio::ip::tcp::resolver::query query(publisher_list_[publisher_list_index].first, std::to_string(publisher_list_[publisher_list_index].second));

if (canceled_)
{
connectionFailedHandler();
return;
}

resolver_.async_resolve(query
, [me = shared_from_this(), publisher_list_index](asio::error_code ec, const asio::ip::tcp::resolver::iterator& resolved_endpoints)
resolver_.async_resolve(publisher_list_[publisher_list_index].first
, std::to_string(publisher_list_[publisher_list_index].second)
, [me = shared_from_this(), publisher_list_index](asio::error_code ec, const asio::ip::tcp::resolver::results_type& resolved_endpoints)
{
if (ec)
{
Expand Down Expand Up @@ -136,7 +135,7 @@ namespace tcp_pubsub
});
}

void SubscriberSession_Impl::connectToEndpoint(const asio::ip::tcp::resolver::iterator& resolved_endpoints, size_t publisher_list_index)
void SubscriberSession_Impl::connectToEndpoint(const asio::ip::tcp::resolver::results_type& resolved_endpoints, size_t publisher_list_index)
{
if (canceled_)
{
Expand All @@ -147,9 +146,9 @@ namespace tcp_pubsub
// Convert the resolved_endpoints iterator to an endpoint sequence
// (i.e. a vector of endpoints)
auto endpoint_sequence = std::make_shared<std::vector<asio::ip::tcp::endpoint>>();
for (auto it = resolved_endpoints; it != asio::ip::tcp::resolver::iterator(); ++it)
for (const auto& endpoint : resolved_endpoints)
{
endpoint_sequence->push_back(*it);
endpoint_sequence->push_back(endpoint);
}

asio::async_connect(data_socket_
Expand Down Expand Up @@ -220,7 +219,7 @@ namespace tcp_pubsub

asio::async_write(data_socket_
, asio::buffer(*buffer)
, data_strand_.wrap(
, asio::bind_executor(data_strand_,
[me = shared_from_this(), buffer](asio::error_code ec, std::size_t /*bytes_to_transfer*/)
{
if (ec)
Expand Down Expand Up @@ -301,7 +300,7 @@ namespace tcp_pubsub
asio::async_read(data_socket_
, asio::buffer(&(header->header_size), sizeof(header->header_size))
, asio::transfer_at_least(sizeof(header->header_size))
, data_strand_.wrap([me = shared_from_this(), header](asio::error_code ec, std::size_t /*length*/)
, asio::bind_executor(data_strand_, [me = shared_from_this(), header](asio::error_code ec, std::size_t /*length*/)
{
if (ec)
{
Expand Down Expand Up @@ -337,7 +336,7 @@ namespace tcp_pubsub
asio::async_read(data_socket_
, asio::buffer(&reinterpret_cast<char*>(header.get())[sizeof(header->header_size)], bytes_to_read_from_socket)
, asio::transfer_at_least(bytes_to_read_from_socket)
, data_strand_.wrap([me = shared_from_this(), header, bytes_to_discard_from_socket](asio::error_code ec, std::size_t /*length*/)
, asio::bind_executor(data_strand_, [me = shared_from_this(), header, bytes_to_discard_from_socket](asio::error_code ec, std::size_t /*length*/)
{
if (ec)
{
Expand Down Expand Up @@ -380,7 +379,7 @@ namespace tcp_pubsub
asio::async_read(data_socket_
, asio::buffer(data_to_discard.data(), bytes_to_discard)
, asio::transfer_at_least(bytes_to_discard)
, data_strand_.wrap([me = shared_from_this(), header](asio::error_code ec, std::size_t /*length*/)
, asio::bind_executor(data_strand_, [me = shared_from_this(), header](asio::error_code ec, std::size_t /*length*/)
{
if (ec)
{
Expand Down Expand Up @@ -424,7 +423,7 @@ namespace tcp_pubsub
asio::async_read(data_socket_
, asio::buffer(data_buffer->data(), le64toh(header->data_size))
, asio::transfer_at_least(le64toh(header->data_size))
, data_strand_.wrap([me = shared_from_this(), header, data_buffer](asio::error_code ec, std::size_t /*length*/)
, asio::bind_executor(data_strand_, [me = shared_from_this(), header, data_buffer](asio::error_code ec, std::size_t /*length*/)
{
if (ec)
{
Expand Down Expand Up @@ -457,7 +456,7 @@ namespace tcp_pubsub
me->log_(logger::LogLevel::DebugVerbose, "SubscriberSession " + me->endpointToString() + ": Received message of type \"RegularPayload\"");
#endif
// Call the callback first, ...
me->data_strand_.post([me, data_buffer, header]()
asio::post(me->data_strand_, [me, data_buffer, header]()
{
if (me->canceled_)
{
Expand All @@ -476,10 +475,9 @@ namespace tcp_pubsub
}

// ... then start reading the next message
me->data_strand_.post([me]()
{
me->readHeaderLength();
});
asio::post(me->data_strand_, [me]() {
me->readHeaderLength();
});
}));
}

Expand All @@ -495,7 +493,7 @@ namespace tcp_pubsub
// - We can protect the variable with the data_strand => If the callback is currently running, the new callback will be applied afterwards
// - We don't need an additional mutex, so a synchronous callback should actually be able to set another callback that gets activated once the current callback call ends
// - Reading the next message will start once the callback call is finished. Therefore, read and callback are synchronized and the callback calls don't start stacking up
data_strand_.post([me = shared_from_this(), callback]()
asio::post(data_strand_, [me = shared_from_this(), callback]()
{
me->synchronous_callback_ = callback;
});
Expand Down Expand Up @@ -545,14 +543,16 @@ namespace tcp_pubsub
}

{
asio::error_code ec;
retry_timer_.cancel(ec);
try {
static_cast<void>(retry_timer_.cancel());
#if (TCP_PUBSUB_LOG_DEBUG_VERBOSE_ENABLED)
if (ec)
log_(logger::LogLevel::DebugVerbose, "SubscriberSession " + endpointToString() + ": Failed canceling retry timer: " + ec.message());
else
log_(logger::LogLevel::DebugVerbose, "SubscriberSession " + endpointToString() + ": Successfully canceled retry timer.");
log_(logger::LogLevel::DebugVerbose, "SubscriberSession " + endpointToString() + ": Successfully canceled retry timer.");
#endif
} catch (asio::system_error& err){
#if (TCP_PUBSUB_LOG_DEBUG_VERBOSE_ENABLED)
log_(logger::LogLevel::DebugVerbose, "SubscriberSession " + endpointToString() + ": Failed canceling retry timer: " + err.what());
#endif
}
}

resolver_.cancel();
Expand Down
Loading