Skip to content
Merged
Show file tree
Hide file tree
Changes from 4 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions .github/workflows/build-macos.yml
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@ jobs:
-DTCP_PUBSUB_USE_BUILTIN_RECYCLE=ON \
-DTCP_PUBSUB_USE_BUILTIN_GTEST=ON \
-DCMAKE_BUILD_TYPE=${{env.BUILD_TYPE}} \
-DCMAKE_CXX_FLAGS=ASIO_NO_DEPRECATED
shell: bash

- name: Build
Expand Down
3 changes: 2 additions & 1 deletion .github/workflows/build-ubuntu.yml
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,7 @@ jobs:
-DTCP_PUBSUB_USE_BUILTIN_GTEST=ON \
-DTCP_PUBSUB_LIBRARY_TYPE=${{env.tcp_pubsub_library_type}} \
-DCMAKE_BUILD_TYPE=${{env.BUILD_TYPE}} \
-DCMAKE_CXX_FLAGS=ASIO_NO_DEPRECATED \
-DBUILD_SHARED_LIBS=${{ env.build_shared_libs }}

- name: Build
Expand Down Expand Up @@ -136,4 +137,4 @@ jobs:
- name: Run integration test (Debug)
run: ./integration_test
working-directory: ${{ github.workspace }}/samples/integration_test/_build/debug
if: ${{ matrix.library_type != 'object' }}
if: ${{ matrix.library_type != 'object' }}
1 change: 1 addition & 0 deletions .github/workflows/build-windows.yml
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,7 @@ jobs:
-DTCP_PUBSUB_USE_BUILTIN_RECYCLE=ON ^
-DTCP_PUBSUB_USE_BUILTIN_GTEST=ON ^
-DTCP_PUBSUB_LIBRARY_TYPE=${{env.tcp_pubsub_library_type}} ^
-DCMAKE_CXX_FLAGS=ASIO_NO_DEPRECATED ^
-DCMAKE_INSTALL_PREFIX=${{env.INSTALL_PREFIX}}

- name: Build (Release)
Expand Down
12 changes: 6 additions & 6 deletions tcp_pubsub/src/executor_impl.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -18,8 +18,8 @@ namespace tcp_pubsub
{
Executor_Impl::Executor_Impl(const logger::logger_t& log_function)
: log_(log_function)
, io_service_(std::make_shared<asio::io_service>())
, dummy_work_(std::make_shared<asio::io_service::work>(*io_service_))
, io_context_(std::make_shared<asio::io_context>())
, dummy_work_(std::make_shared<work_guard_t>(io_context_->get_executor()))
{
#if (TCP_PUBSUB_LOG_DEBUG_ENABLED)
log_(logger::LogLevel::Debug, "Executor: Creating Executor.");
Expand Down Expand Up @@ -76,7 +76,7 @@ namespace tcp_pubsub
me->log_(logger::LogLevel::Debug, "Executor: IoService::Run() in thread " + thread_id);
#endif

me->io_service_->run();
me->io_context_->run();

#if (TCP_PUBSUB_LOG_DEBUG_ENABLED)
me->log_(logger::LogLevel::Debug, "Executor: IoService: Shutdown of thread " + thread_id);
Expand All @@ -95,12 +95,12 @@ namespace tcp_pubsub
dummy_work_.reset();

// Stop the IO Service
io_service_->stop();
io_context_->stop();
}

std::shared_ptr<asio::io_service> Executor_Impl::ioService() const
std::shared_ptr<asio::io_context> Executor_Impl::ioService() const
{
return io_service_;
return io_context_;
}

logger::logger_t Executor_Impl::logFunction() const
Expand Down
7 changes: 4 additions & 3 deletions tcp_pubsub/src/executor_impl.h
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ namespace tcp_pubsub
void start(size_t thread_count);
void stop();

std::shared_ptr<asio::io_service> ioService() const;
std::shared_ptr<asio::io_context> ioService() const;
logger::logger_t logFunction() const;


Expand All @@ -43,9 +43,10 @@ namespace tcp_pubsub

private:
const logger::logger_t log_; /// Logger
std::shared_ptr<asio::io_service> io_service_; /// global io service
std::shared_ptr<asio::io_context> io_context_; /// global io service

std::vector<std::thread> thread_pool_; /// Asio threadpool executing the io servic
std::shared_ptr<asio::io_service::work> dummy_work_; /// Dummy work, so the io_service will never run out of work and shut down, even if there is no publisher or subscriber at the moment
using work_guard_t = asio::executor_work_guard<asio::io_context::executor_type>;
std::shared_ptr<work_guard_t> dummy_work_; /// Dummy work, so the io_context will never run out of work and shut down, even if there is no publisher or subscriber at the moment
};
}
2 changes: 1 addition & 1 deletion tcp_pubsub/src/publisher_impl.h
Original file line number Diff line number Diff line change
Expand Up @@ -83,7 +83,7 @@ namespace tcp_pubsub
std::atomic<bool> is_running_; /// Indicates whether this publisher is running and can send data. May be false, if e.g. binding to the given address has failed.

// Asio
const std::shared_ptr<Executor> executor_; /// Global Executor (holding the io_service and thread pool)
const std::shared_ptr<Executor> executor_; /// Global Executor (holding the io_context and thread pool)
asio::ip::tcp::acceptor acceptor_; /// Acceptor used for waiting for clients (i.e. subscribers)

// Logger
Expand Down
18 changes: 9 additions & 9 deletions tcp_pubsub/src/publisher_session.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -30,15 +30,15 @@ namespace tcp_pubsub
/// Constructor & Destructor
//////////////////////////////////////////////

PublisherSession::PublisherSession(const std::shared_ptr<asio::io_service>& io_service
PublisherSession::PublisherSession(const std::shared_ptr<asio::io_context>& io_context
, const std::function<void(const std::shared_ptr<PublisherSession>&)>& session_closed_handler
, const tcp_pubsub::logger::logger_t& log_function)
: io_service_ (io_service)
: io_context_ (io_context)
, state_ (State::NotStarted)
, session_closed_handler_ (session_closed_handler)
, log_ (log_function)
, data_socket_ (*io_service_)
, data_strand_ (*io_service_)
, data_socket_ (*io_context_)
, data_strand_ (*io_context_)
, sending_in_progress_ (false)
{
#if (TCP_PUBSUB_LOG_DEBUG_VERBOSE_ENABLED)
Expand Down Expand Up @@ -132,7 +132,7 @@ namespace tcp_pubsub
asio::async_read(data_socket_
, asio::buffer(&(header->header_size), sizeof(header->header_size))
, asio::transfer_at_least(sizeof(header->header_size))
, data_strand_.wrap([me = shared_from_this(), header](asio::error_code ec, std::size_t /*length*/)
, asio::bind_executor(data_strand_, [me = shared_from_this(), header](asio::error_code ec, std::size_t /*length*/)
{
if (ec)
{
Expand Down Expand Up @@ -165,7 +165,7 @@ namespace tcp_pubsub
asio::async_read(data_socket_
, asio::buffer(&reinterpret_cast<char*>(header.get())[sizeof(header->header_size)], bytes_to_read_from_socket)
, asio::transfer_at_least(bytes_to_read_from_socket)
, data_strand_.wrap([me = shared_from_this(), header, bytes_to_discard_from_socket](asio::error_code ec, std::size_t /*length*/)
, asio::bind_executor(data_strand_, [me = shared_from_this(), header, bytes_to_discard_from_socket](asio::error_code ec, std::size_t /*length*/)
{
if (ec)
{
Expand Down Expand Up @@ -207,7 +207,7 @@ namespace tcp_pubsub
asio::async_read(data_socket_
, asio::buffer(data_to_discard.data(), bytes_to_discard)
, asio::transfer_at_least(bytes_to_discard)
, data_strand_.wrap([me = shared_from_this(), header](asio::error_code ec, std::size_t /*length*/)
, asio::bind_executor(data_strand_, [me = shared_from_this(), header](asio::error_code ec, std::size_t /*length*/)
{
if (ec)
{
Expand Down Expand Up @@ -240,7 +240,7 @@ namespace tcp_pubsub
asio::async_read(data_socket_
, asio::buffer(data_buffer->data(), le64toh(header->data_size))
, asio::transfer_at_least(le64toh(header->data_size))
, data_strand_.wrap([me = shared_from_this(), header, data_buffer](asio::error_code ec, std::size_t /*length*/)
, asio::bind_executor(data_strand_, [me = shared_from_this(), header, data_buffer](asio::error_code ec, std::size_t /*length*/)
{
if (ec)
{
Expand Down Expand Up @@ -341,7 +341,7 @@ namespace tcp_pubsub

asio::async_write(data_socket_
, asio::buffer(*buffer)
, data_strand_.wrap(
, asio::bind_executor(data_strand_,
[me = shared_from_this(), buffer](asio::error_code ec, std::size_t /*bytes_to_transfer*/)
{
#if (TCP_PUBSUB_LOG_DEBUG_VERBOSE_ENABLED)
Expand Down
8 changes: 4 additions & 4 deletions tcp_pubsub/src/publisher_session.h
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ namespace tcp_pubsub
/// Constructor & Destructor
//////////////////////////////////////////////
public:
PublisherSession(const std::shared_ptr<asio::io_service>& io_service
PublisherSession(const std::shared_ptr<asio::io_context>& io_context
, const std::function<void(const std::shared_ptr<PublisherSession>&)>& session_closed_handler
, const tcp_pubsub::logger::logger_t& log_function);

Expand Down Expand Up @@ -97,7 +97,7 @@ namespace tcp_pubsub
//////////////////////////////////////////////
private:
// Asio IO Service
std::shared_ptr<asio::io_service> io_service_;
std::shared_ptr<asio::io_context> io_context_;

// Whether the session has been canceled
std::atomic<State> state_;
Expand All @@ -109,11 +109,11 @@ namespace tcp_pubsub

// TCP Socket & Queue (protected by the strand!)
asio::ip::tcp::socket data_socket_;
asio::io_service::strand data_strand_;
asio::io_context::strand data_strand_;

// Variable holding if we are currently sending any data and what data to send next
std::mutex next_buffer_mutex_;
bool sending_in_progress_;
std::shared_ptr<std::vector<char>> next_buffer_to_send_;
};
}
}
56 changes: 30 additions & 26 deletions tcp_pubsub/src/subscriber_session_impl.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -32,20 +32,20 @@ namespace tcp_pubsub
/// Constructor & Destructor
//////////////////////////////////////////////

SubscriberSession_Impl::SubscriberSession_Impl(const std::shared_ptr<asio::io_service>& io_service
SubscriberSession_Impl::SubscriberSession_Impl(const std::shared_ptr<asio::io_context>& io_context
, const std::vector<std::pair<std::string, uint16_t>>& publisher_list
, int max_reconnection_attempts
, const std::function<std::shared_ptr<std::vector<char>>()>& get_buffer_handler
, const std::function<void(const std::shared_ptr<SubscriberSession_Impl>&)>& session_closed_handler
, const tcp_pubsub::logger::logger_t& log_function)
: publisher_list_ (publisher_list)
, resolver_ (*io_service)
, resolver_ (*io_context)
, max_reconnection_attempts_(max_reconnection_attempts)
, retries_left_ (max_reconnection_attempts)
, retry_timer_ (*io_service, std::chrono::seconds(1))
, retry_timer_ (*io_context, std::chrono::seconds(1))
, canceled_ (false)
, data_socket_ (*io_service)
, data_strand_ (*io_service)
, data_socket_ (*io_context)
, data_strand_ (*io_context)
, get_buffer_handler_ (get_buffer_handler)
, session_closed_handler_ (session_closed_handler)
, log_ (log_function)
Expand Down Expand Up @@ -88,16 +88,15 @@ namespace tcp_pubsub

void SubscriberSession_Impl::resolveEndpoint(size_t publisher_list_index)
{
const asio::ip::tcp::resolver::query query(publisher_list_[publisher_list_index].first, std::to_string(publisher_list_[publisher_list_index].second));

if (canceled_)
{
connectionFailedHandler();
return;
}

resolver_.async_resolve(query
, [me = shared_from_this(), publisher_list_index](asio::error_code ec, const asio::ip::tcp::resolver::iterator& resolved_endpoints)
resolver_.async_resolve(publisher_list_[publisher_list_index].first
, std::to_string(publisher_list_[publisher_list_index].second)
, [me = shared_from_this(), publisher_list_index](asio::error_code ec, const asio::ip::tcp::resolver::results_type& resolved_endpoints)
{
if (ec)
{
Expand Down Expand Up @@ -136,7 +135,7 @@ namespace tcp_pubsub
});
}

void SubscriberSession_Impl::connectToEndpoint(const asio::ip::tcp::resolver::iterator& resolved_endpoints, size_t publisher_list_index)
void SubscriberSession_Impl::connectToEndpoint(const asio::ip::tcp::resolver::results_type& resolved_endpoints, size_t publisher_list_index)
{
if (canceled_)
{
Expand All @@ -147,9 +146,9 @@ namespace tcp_pubsub
// Convert the resolved_endpoints iterator to an endpoint sequence
// (i.e. a vector of endpoints)
auto endpoint_sequence = std::make_shared<std::vector<asio::ip::tcp::endpoint>>();
for (auto it = resolved_endpoints; it != asio::ip::tcp::resolver::iterator(); ++it)
for (const auto& endpoint : resolved_endpoints)
{
endpoint_sequence->push_back(*it);
endpoint_sequence->push_back(endpoint);
}

asio::async_connect(data_socket_
Expand Down Expand Up @@ -220,7 +219,7 @@ namespace tcp_pubsub

asio::async_write(data_socket_
, asio::buffer(*buffer)
, data_strand_.wrap(
, asio::bind_executor(data_strand_,
[me = shared_from_this(), buffer](asio::error_code ec, std::size_t /*bytes_to_transfer*/)
{
if (ec)
Expand Down Expand Up @@ -301,7 +300,7 @@ namespace tcp_pubsub
asio::async_read(data_socket_
, asio::buffer(&(header->header_size), sizeof(header->header_size))
, asio::transfer_at_least(sizeof(header->header_size))
, data_strand_.wrap([me = shared_from_this(), header](asio::error_code ec, std::size_t /*length*/)
, asio::bind_executor(data_strand_, [me = shared_from_this(), header](asio::error_code ec, std::size_t /*length*/)
{
if (ec)
{
Expand Down Expand Up @@ -337,7 +336,7 @@ namespace tcp_pubsub
asio::async_read(data_socket_
, asio::buffer(&reinterpret_cast<char*>(header.get())[sizeof(header->header_size)], bytes_to_read_from_socket)
, asio::transfer_at_least(bytes_to_read_from_socket)
, data_strand_.wrap([me = shared_from_this(), header, bytes_to_discard_from_socket](asio::error_code ec, std::size_t /*length*/)
, asio::bind_executor(data_strand_, [me = shared_from_this(), header, bytes_to_discard_from_socket](asio::error_code ec, std::size_t /*length*/)
{
if (ec)
{
Expand Down Expand Up @@ -380,7 +379,7 @@ namespace tcp_pubsub
asio::async_read(data_socket_
, asio::buffer(data_to_discard.data(), bytes_to_discard)
, asio::transfer_at_least(bytes_to_discard)
, data_strand_.wrap([me = shared_from_this(), header](asio::error_code ec, std::size_t /*length*/)
, asio::bind_executor(data_strand_, [me = shared_from_this(), header](asio::error_code ec, std::size_t /*length*/)
{
if (ec)
{
Expand Down Expand Up @@ -424,7 +423,7 @@ namespace tcp_pubsub
asio::async_read(data_socket_
, asio::buffer(data_buffer->data(), le64toh(header->data_size))
, asio::transfer_at_least(le64toh(header->data_size))
, data_strand_.wrap([me = shared_from_this(), header, data_buffer](asio::error_code ec, std::size_t /*length*/)
, asio::bind_executor(data_strand_, [me = shared_from_this(), header, data_buffer](asio::error_code ec, std::size_t /*length*/)
{
if (ec)
{
Expand Down Expand Up @@ -465,7 +464,8 @@ namespace tcp_pubsub
return;
}
me->synchronous_callback_(data_buffer, header);
});
},
asio::get_associated_allocator(me->data_strand_));
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why is this line of code necessary?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Because the remaining signatures of io_context_strand::post requires an allocator even though the function doesn't use it...

https://github.com/chriskohlhoff/asio/blob/a892f73dc96bfaf92db98a3fe219f920fad007ea/asio/include/asio/io_context_strand.hpp#L174-L193

Looking at the commit that removed the originally used function, it actually says to use asio::post which does not require an allocator.

chriskohlhoff/asio@2f9c4ef#diff-a9951ef828e91a6ea4683ee5f1da1046abaefaa093db694a8a9825a098f50431

Thank you. I will make that change.


}
else
Expand All @@ -479,7 +479,8 @@ namespace tcp_pubsub
me->data_strand_.post([me]()
{
me->readHeaderLength();
});
},
asio::get_associated_allocator(me->data_strand_));
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why is this line of code necessary?

}));
}

Expand All @@ -498,7 +499,8 @@ namespace tcp_pubsub
data_strand_.post([me = shared_from_this(), callback]()
{
me->synchronous_callback_ = callback;
});
},
asio::get_associated_allocator(data_strand_));
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why is this line of code necessary?

}

std::vector<std::pair<std::string, uint16_t>> SubscriberSession_Impl::getPublisherList() const
Expand Down Expand Up @@ -545,14 +547,16 @@ namespace tcp_pubsub
}

{
asio::error_code ec;
retry_timer_.cancel(ec);
try {
static_cast<void>(retry_timer_.cancel());
#if (TCP_PUBSUB_LOG_DEBUG_VERBOSE_ENABLED)
if (ec)
log_(logger::LogLevel::DebugVerbose, "SubscriberSession " + endpointToString() + ": Failed canceling retry timer: " + ec.message());
else
log_(logger::LogLevel::DebugVerbose, "SubscriberSession " + endpointToString() + ": Successfully canceled retry timer.");
log_(logger::LogLevel::DebugVerbose, "SubscriberSession " + endpointToString() + ": Successfully canceled retry timer.");
#endif
} catch (asio::system_error& err){
#if (TCP_PUBSUB_LOG_DEBUG_VERBOSE_ENABLED)
log_(logger::LogLevel::DebugVerbose, "SubscriberSession " + endpointToString() + ": Failed canceling retry timer: " + err.what());
#endif
}
}

resolver_.cancel();
Expand Down
Loading
Loading