From 8f8dd5730bf23ccd2f4fe24b8f925930c67c7830 Mon Sep 17 00:00:00 2001 From: DownerCase Date: Fri, 4 Apr 2025 19:17:45 +0100 Subject: [PATCH 1/7] Update asio to 1.34 --- tcp_pubsub/src/executor_impl.cpp | 6 +-- tcp_pubsub/src/executor_impl.h | 7 ++-- tcp_pubsub/src/publisher_session.cpp | 2 +- tcp_pubsub/src/publisher_session.h | 8 ++-- tcp_pubsub/src/subscriber_session_impl.cpp | 38 ++++++++++--------- tcp_pubsub/src/subscriber_session_impl.h | 6 +-- tests/tcp_pubsub_test/src/tcp_pubsub_test.cpp | 4 +- thirdparty/asio/asio | 2 +- 8 files changed, 39 insertions(+), 34 deletions(-) diff --git a/tcp_pubsub/src/executor_impl.cpp b/tcp_pubsub/src/executor_impl.cpp index bfcf4c5..b49ded9 100644 --- a/tcp_pubsub/src/executor_impl.cpp +++ b/tcp_pubsub/src/executor_impl.cpp @@ -18,8 +18,8 @@ namespace tcp_pubsub { Executor_Impl::Executor_Impl(const logger::logger_t& log_function) : log_(log_function) - , io_service_(std::make_shared()) - , dummy_work_(std::make_shared(*io_service_)) + , io_service_(std::make_shared()) + , dummy_work_(std::make_shared(io_service_->get_executor())) { #if (TCP_PUBSUB_LOG_DEBUG_ENABLED) log_(logger::LogLevel::Debug, "Executor: Creating Executor."); @@ -98,7 +98,7 @@ namespace tcp_pubsub io_service_->stop(); } - std::shared_ptr Executor_Impl::ioService() const + std::shared_ptr Executor_Impl::ioService() const { return io_service_; } diff --git a/tcp_pubsub/src/executor_impl.h b/tcp_pubsub/src/executor_impl.h index ca8716d..a409884 100644 --- a/tcp_pubsub/src/executor_impl.h +++ b/tcp_pubsub/src/executor_impl.h @@ -32,7 +32,7 @@ namespace tcp_pubsub void start(size_t thread_count); void stop(); - std::shared_ptr ioService() const; + std::shared_ptr ioService() const; logger::logger_t logFunction() const; @@ -43,9 +43,10 @@ namespace tcp_pubsub private: const logger::logger_t log_; /// Logger - std::shared_ptr io_service_; /// global io service + std::shared_ptr io_service_; /// global io service std::vector thread_pool_; /// Asio threadpool executing the io servic - std::shared_ptr dummy_work_; /// Dummy work, so the io_service will never run out of work and shut down, even if there is no publisher or subscriber at the moment + using work_guard_t = asio::executor_work_guard; + std::shared_ptr dummy_work_; /// Dummy work, so the io_service will never run out of work and shut down, even if there is no publisher or subscriber at the moment }; } diff --git a/tcp_pubsub/src/publisher_session.cpp b/tcp_pubsub/src/publisher_session.cpp index 1b742e4..197d14d 100644 --- a/tcp_pubsub/src/publisher_session.cpp +++ b/tcp_pubsub/src/publisher_session.cpp @@ -30,7 +30,7 @@ namespace tcp_pubsub /// Constructor & Destructor ////////////////////////////////////////////// - PublisherSession::PublisherSession(const std::shared_ptr& io_service + PublisherSession::PublisherSession(const std::shared_ptr& io_service , const std::function&)>& session_closed_handler , const tcp_pubsub::logger::logger_t& log_function) : io_service_ (io_service) diff --git a/tcp_pubsub/src/publisher_session.h b/tcp_pubsub/src/publisher_session.h index dc7583b..79e84e8 100644 --- a/tcp_pubsub/src/publisher_session.h +++ b/tcp_pubsub/src/publisher_session.h @@ -36,7 +36,7 @@ namespace tcp_pubsub /// Constructor & Destructor ////////////////////////////////////////////// public: - PublisherSession(const std::shared_ptr& io_service + PublisherSession(const std::shared_ptr& io_service , const std::function&)>& session_closed_handler , const tcp_pubsub::logger::logger_t& log_function); @@ -97,7 +97,7 @@ namespace tcp_pubsub ////////////////////////////////////////////// private: // Asio IO Service - std::shared_ptr io_service_; + std::shared_ptr io_service_; // Whether the session has been canceled std::atomic state_; @@ -109,11 +109,11 @@ namespace tcp_pubsub // TCP Socket & Queue (protected by the strand!) asio::ip::tcp::socket data_socket_; - asio::io_service::strand data_strand_; + asio::io_context::strand data_strand_; // Variable holding if we are currently sending any data and what data to send next std::mutex next_buffer_mutex_; bool sending_in_progress_; std::shared_ptr> next_buffer_to_send_; }; -} \ No newline at end of file +} diff --git a/tcp_pubsub/src/subscriber_session_impl.cpp b/tcp_pubsub/src/subscriber_session_impl.cpp index 540f105..b63a193 100644 --- a/tcp_pubsub/src/subscriber_session_impl.cpp +++ b/tcp_pubsub/src/subscriber_session_impl.cpp @@ -32,7 +32,7 @@ namespace tcp_pubsub /// Constructor & Destructor ////////////////////////////////////////////// - SubscriberSession_Impl::SubscriberSession_Impl(const std::shared_ptr& io_service + SubscriberSession_Impl::SubscriberSession_Impl(const std::shared_ptr& io_service , const std::vector>& publisher_list , int max_reconnection_attempts , const std::function>()>& get_buffer_handler @@ -88,16 +88,15 @@ namespace tcp_pubsub void SubscriberSession_Impl::resolveEndpoint(size_t publisher_list_index) { - const asio::ip::tcp::resolver::query query(publisher_list_[publisher_list_index].first, std::to_string(publisher_list_[publisher_list_index].second)); - if (canceled_) { connectionFailedHandler(); return; } - resolver_.async_resolve(query - , [me = shared_from_this(), publisher_list_index](asio::error_code ec, const asio::ip::tcp::resolver::iterator& resolved_endpoints) + resolver_.async_resolve(publisher_list_[publisher_list_index].first + , std::to_string(publisher_list_[publisher_list_index].second) + , [me = shared_from_this(), publisher_list_index](asio::error_code ec, const asio::ip::tcp::resolver::results_type& resolved_endpoints) { if (ec) { @@ -136,7 +135,7 @@ namespace tcp_pubsub }); } - void SubscriberSession_Impl::connectToEndpoint(const asio::ip::tcp::resolver::iterator& resolved_endpoints, size_t publisher_list_index) + void SubscriberSession_Impl::connectToEndpoint(const asio::ip::tcp::resolver::results_type& resolved_endpoints, size_t publisher_list_index) { if (canceled_) { @@ -147,9 +146,9 @@ namespace tcp_pubsub // Convert the resolved_endpoints iterator to an endpoint sequence // (i.e. a vector of endpoints) auto endpoint_sequence = std::make_shared>(); - for (auto it = resolved_endpoints; it != asio::ip::tcp::resolver::iterator(); ++it) + for (const auto& endpoint : resolved_endpoints) { - endpoint_sequence->push_back(*it); + endpoint_sequence->push_back(endpoint); } asio::async_connect(data_socket_ @@ -465,7 +464,8 @@ namespace tcp_pubsub return; } me->synchronous_callback_(data_buffer, header); - }); + }, + asio::get_associated_allocator(me->data_strand_)); } else @@ -479,7 +479,8 @@ namespace tcp_pubsub me->data_strand_.post([me]() { me->readHeaderLength(); - }); + }, + asio::get_associated_allocator(me->data_strand_)); })); } @@ -498,7 +499,8 @@ namespace tcp_pubsub data_strand_.post([me = shared_from_this(), callback]() { me->synchronous_callback_ = callback; - }); + }, + asio::get_associated_allocator(data_strand_)); } std::vector> SubscriberSession_Impl::getPublisherList() const @@ -545,14 +547,16 @@ namespace tcp_pubsub } { - asio::error_code ec; - retry_timer_.cancel(ec); + try { + static_cast(retry_timer_.cancel()); #if (TCP_PUBSUB_LOG_DEBUG_VERBOSE_ENABLED) - if (ec) - log_(logger::LogLevel::DebugVerbose, "SubscriberSession " + endpointToString() + ": Failed canceling retry timer: " + ec.message()); - else - log_(logger::LogLevel::DebugVerbose, "SubscriberSession " + endpointToString() + ": Successfully canceled retry timer."); + log_(logger::LogLevel::DebugVerbose, "SubscriberSession " + endpointToString() + ": Successfully canceled retry timer."); #endif + } catch (asio::system_error& err){ +#if (TCP_PUBSUB_LOG_DEBUG_VERBOSE_ENABLED) + log_(logger::LogLevel::DebugVerbose, "SubscriberSession " + endpointToString() + ": Failed canceling retry timer: " + err.what()); +#endif + } } resolver_.cancel(); diff --git a/tcp_pubsub/src/subscriber_session_impl.h b/tcp_pubsub/src/subscriber_session_impl.h index 56b3dd4..359692c 100644 --- a/tcp_pubsub/src/subscriber_session_impl.h +++ b/tcp_pubsub/src/subscriber_session_impl.h @@ -25,7 +25,7 @@ class SubscriberSession_Impl : public std::enable_shared_from_this& io_service + SubscriberSession_Impl(const std::shared_ptr& io_service , const std::vector>& publisher_list , int max_reconnection_attempts , const std::function>()>& get_buffer_handler @@ -52,7 +52,7 @@ class SubscriberSession_Impl : public std::enable_shared_from_this>()> get_buffer_handler_; ///< Function for retrieving / constructing an empty buffer diff --git a/tests/tcp_pubsub_test/src/tcp_pubsub_test.cpp b/tests/tcp_pubsub_test/src/tcp_pubsub_test.cpp index 4fda36e..8533621 100644 --- a/tests/tcp_pubsub_test/src/tcp_pubsub_test.cpp +++ b/tests/tcp_pubsub_test/src/tcp_pubsub_test.cpp @@ -344,8 +344,8 @@ TEST(tcp_pubsub, publisher_list_test) // Register the callback hello_world_subscriber.setCallback(callback_function); - // Wait up to 2 seconds for the subscriber to connect - for (int i = 0; i < 20; ++i) + // Wait up to 4.5 seconds for the subscriber to connect + for (int i = 0; i < 45; ++i) { if (hello_world_subscriber.getSessions().at(0)->isConnected() && hello_world_publisher.getSubscriberCount() >= 1) diff --git a/thirdparty/asio/asio b/thirdparty/asio/asio index 03ae834..a892f73 160000 --- a/thirdparty/asio/asio +++ b/thirdparty/asio/asio @@ -1 +1 @@ -Subproject commit 03ae834edbace31a96157b89bf50e5ee464e5ef9 +Subproject commit a892f73dc96bfaf92db98a3fe219f920fad007ea From befba60509af64a27f73766a9df4ad6027218364 Mon Sep 17 00:00:00 2001 From: DownerCase Date: Mon, 7 Apr 2025 09:19:32 +0100 Subject: [PATCH 2/7] Replace .wrap with bind_executor --- tcp_pubsub/src/publisher_session.cpp | 10 +++++----- tcp_pubsub/src/subscriber_session_impl.cpp | 10 +++++----- tests/tcp_pubsub_test/src/tcp_pubsub_test.cpp | 4 ++-- thirdparty/asio/Module/Findasio.cmake | 4 ++-- 4 files changed, 14 insertions(+), 14 deletions(-) diff --git a/tcp_pubsub/src/publisher_session.cpp b/tcp_pubsub/src/publisher_session.cpp index 197d14d..e485cfd 100644 --- a/tcp_pubsub/src/publisher_session.cpp +++ b/tcp_pubsub/src/publisher_session.cpp @@ -132,7 +132,7 @@ namespace tcp_pubsub asio::async_read(data_socket_ , asio::buffer(&(header->header_size), sizeof(header->header_size)) , asio::transfer_at_least(sizeof(header->header_size)) - , data_strand_.wrap([me = shared_from_this(), header](asio::error_code ec, std::size_t /*length*/) + , asio::bind_executor(data_strand_, [me = shared_from_this(), header](asio::error_code ec, std::size_t /*length*/) { if (ec) { @@ -165,7 +165,7 @@ namespace tcp_pubsub asio::async_read(data_socket_ , asio::buffer(&reinterpret_cast(header.get())[sizeof(header->header_size)], bytes_to_read_from_socket) , asio::transfer_at_least(bytes_to_read_from_socket) - , data_strand_.wrap([me = shared_from_this(), header, bytes_to_discard_from_socket](asio::error_code ec, std::size_t /*length*/) + , asio::bind_executor(data_strand_, [me = shared_from_this(), header, bytes_to_discard_from_socket](asio::error_code ec, std::size_t /*length*/) { if (ec) { @@ -207,7 +207,7 @@ namespace tcp_pubsub asio::async_read(data_socket_ , asio::buffer(data_to_discard.data(), bytes_to_discard) , asio::transfer_at_least(bytes_to_discard) - , data_strand_.wrap([me = shared_from_this(), header](asio::error_code ec, std::size_t /*length*/) + , asio::bind_executor(data_strand_, [me = shared_from_this(), header](asio::error_code ec, std::size_t /*length*/) { if (ec) { @@ -240,7 +240,7 @@ namespace tcp_pubsub asio::async_read(data_socket_ , asio::buffer(data_buffer->data(), le64toh(header->data_size)) , asio::transfer_at_least(le64toh(header->data_size)) - , data_strand_.wrap([me = shared_from_this(), header, data_buffer](asio::error_code ec, std::size_t /*length*/) + , asio::bind_executor(data_strand_, [me = shared_from_this(), header, data_buffer](asio::error_code ec, std::size_t /*length*/) { if (ec) { @@ -341,7 +341,7 @@ namespace tcp_pubsub asio::async_write(data_socket_ , asio::buffer(*buffer) - , data_strand_.wrap( + , asio::bind_executor(data_strand_, [me = shared_from_this(), buffer](asio::error_code ec, std::size_t /*bytes_to_transfer*/) { #if (TCP_PUBSUB_LOG_DEBUG_VERBOSE_ENABLED) diff --git a/tcp_pubsub/src/subscriber_session_impl.cpp b/tcp_pubsub/src/subscriber_session_impl.cpp index b63a193..93e87b5 100644 --- a/tcp_pubsub/src/subscriber_session_impl.cpp +++ b/tcp_pubsub/src/subscriber_session_impl.cpp @@ -219,7 +219,7 @@ namespace tcp_pubsub asio::async_write(data_socket_ , asio::buffer(*buffer) - , data_strand_.wrap( + , asio::bind_executor(data_strand_, [me = shared_from_this(), buffer](asio::error_code ec, std::size_t /*bytes_to_transfer*/) { if (ec) @@ -300,7 +300,7 @@ namespace tcp_pubsub asio::async_read(data_socket_ , asio::buffer(&(header->header_size), sizeof(header->header_size)) , asio::transfer_at_least(sizeof(header->header_size)) - , data_strand_.wrap([me = shared_from_this(), header](asio::error_code ec, std::size_t /*length*/) + , asio::bind_executor(data_strand_, [me = shared_from_this(), header](asio::error_code ec, std::size_t /*length*/) { if (ec) { @@ -336,7 +336,7 @@ namespace tcp_pubsub asio::async_read(data_socket_ , asio::buffer(&reinterpret_cast(header.get())[sizeof(header->header_size)], bytes_to_read_from_socket) , asio::transfer_at_least(bytes_to_read_from_socket) - , data_strand_.wrap([me = shared_from_this(), header, bytes_to_discard_from_socket](asio::error_code ec, std::size_t /*length*/) + , asio::bind_executor(data_strand_, [me = shared_from_this(), header, bytes_to_discard_from_socket](asio::error_code ec, std::size_t /*length*/) { if (ec) { @@ -379,7 +379,7 @@ namespace tcp_pubsub asio::async_read(data_socket_ , asio::buffer(data_to_discard.data(), bytes_to_discard) , asio::transfer_at_least(bytes_to_discard) - , data_strand_.wrap([me = shared_from_this(), header](asio::error_code ec, std::size_t /*length*/) + , asio::bind_executor(data_strand_, [me = shared_from_this(), header](asio::error_code ec, std::size_t /*length*/) { if (ec) { @@ -423,7 +423,7 @@ namespace tcp_pubsub asio::async_read(data_socket_ , asio::buffer(data_buffer->data(), le64toh(header->data_size)) , asio::transfer_at_least(le64toh(header->data_size)) - , data_strand_.wrap([me = shared_from_this(), header, data_buffer](asio::error_code ec, std::size_t /*length*/) + , asio::bind_executor(data_strand_, [me = shared_from_this(), header, data_buffer](asio::error_code ec, std::size_t /*length*/) { if (ec) { diff --git a/tests/tcp_pubsub_test/src/tcp_pubsub_test.cpp b/tests/tcp_pubsub_test/src/tcp_pubsub_test.cpp index 8533621..9793685 100644 --- a/tests/tcp_pubsub_test/src/tcp_pubsub_test.cpp +++ b/tests/tcp_pubsub_test/src/tcp_pubsub_test.cpp @@ -344,8 +344,8 @@ TEST(tcp_pubsub, publisher_list_test) // Register the callback hello_world_subscriber.setCallback(callback_function); - // Wait up to 4.5 seconds for the subscriber to connect - for (int i = 0; i < 45; ++i) + // Wait up to 10 seconds for the subscriber to connect + for (int i = 0; i < 100; ++i) { if (hello_world_subscriber.getSessions().at(0)->isConnected() && hello_world_publisher.getSubscriberCount() >= 1) diff --git a/thirdparty/asio/Module/Findasio.cmake b/thirdparty/asio/Module/Findasio.cmake index 10ba9ed..9277398 100644 --- a/thirdparty/asio/Module/Findasio.cmake +++ b/thirdparty/asio/Module/Findasio.cmake @@ -24,7 +24,7 @@ if(asio_FOUND) add_library(asio::asio INTERFACE IMPORTED) set_target_properties(asio::asio PROPERTIES INTERFACE_INCLUDE_DIRECTORIES ${asio_INCLUDE_DIR} - INTERFACE_COMPILE_DEFINITIONS ASIO_STANDALONE) + INTERFACE_COMPILE_DEFINITIONS "ASIO_STANDALONE;ASIO_NO_DEPRECATED") mark_as_advanced(asio_INCLUDE_DIR) endif() -endif() \ No newline at end of file +endif() From a2c12502f63ad4b026cbd483e1e80d6ea3e0af78 Mon Sep 17 00:00:00 2001 From: DownerCase Date: Mon, 7 Apr 2025 09:22:19 +0100 Subject: [PATCH 3/7] Rename io_service to io_context --- tcp_pubsub/src/executor_impl.cpp | 10 +++++----- tcp_pubsub/src/executor_impl.h | 4 ++-- tcp_pubsub/src/publisher_impl.h | 2 +- tcp_pubsub/src/publisher_session.cpp | 8 ++++---- tcp_pubsub/src/publisher_session.h | 4 ++-- tcp_pubsub/src/subscriber_session_impl.cpp | 10 +++++----- tcp_pubsub/src/subscriber_session_impl.h | 2 +- 7 files changed, 20 insertions(+), 20 deletions(-) diff --git a/tcp_pubsub/src/executor_impl.cpp b/tcp_pubsub/src/executor_impl.cpp index b49ded9..c407f28 100644 --- a/tcp_pubsub/src/executor_impl.cpp +++ b/tcp_pubsub/src/executor_impl.cpp @@ -18,8 +18,8 @@ namespace tcp_pubsub { Executor_Impl::Executor_Impl(const logger::logger_t& log_function) : log_(log_function) - , io_service_(std::make_shared()) - , dummy_work_(std::make_shared(io_service_->get_executor())) + , io_context_(std::make_shared()) + , dummy_work_(std::make_shared(io_context_->get_executor())) { #if (TCP_PUBSUB_LOG_DEBUG_ENABLED) log_(logger::LogLevel::Debug, "Executor: Creating Executor."); @@ -76,7 +76,7 @@ namespace tcp_pubsub me->log_(logger::LogLevel::Debug, "Executor: IoService::Run() in thread " + thread_id); #endif - me->io_service_->run(); + me->io_context_->run(); #if (TCP_PUBSUB_LOG_DEBUG_ENABLED) me->log_(logger::LogLevel::Debug, "Executor: IoService: Shutdown of thread " + thread_id); @@ -95,12 +95,12 @@ namespace tcp_pubsub dummy_work_.reset(); // Stop the IO Service - io_service_->stop(); + io_context_->stop(); } std::shared_ptr Executor_Impl::ioService() const { - return io_service_; + return io_context_; } logger::logger_t Executor_Impl::logFunction() const diff --git a/tcp_pubsub/src/executor_impl.h b/tcp_pubsub/src/executor_impl.h index a409884..e42b57f 100644 --- a/tcp_pubsub/src/executor_impl.h +++ b/tcp_pubsub/src/executor_impl.h @@ -43,10 +43,10 @@ namespace tcp_pubsub private: const logger::logger_t log_; /// Logger - std::shared_ptr io_service_; /// global io service + std::shared_ptr io_context_; /// global io service std::vector thread_pool_; /// Asio threadpool executing the io servic using work_guard_t = asio::executor_work_guard; - std::shared_ptr dummy_work_; /// Dummy work, so the io_service will never run out of work and shut down, even if there is no publisher or subscriber at the moment + std::shared_ptr dummy_work_; /// Dummy work, so the io_context will never run out of work and shut down, even if there is no publisher or subscriber at the moment }; } diff --git a/tcp_pubsub/src/publisher_impl.h b/tcp_pubsub/src/publisher_impl.h index 6e2e152..f95de92 100644 --- a/tcp_pubsub/src/publisher_impl.h +++ b/tcp_pubsub/src/publisher_impl.h @@ -83,7 +83,7 @@ namespace tcp_pubsub std::atomic is_running_; /// Indicates whether this publisher is running and can send data. May be false, if e.g. binding to the given address has failed. // Asio - const std::shared_ptr executor_; /// Global Executor (holding the io_service and thread pool) + const std::shared_ptr executor_; /// Global Executor (holding the io_context and thread pool) asio::ip::tcp::acceptor acceptor_; /// Acceptor used for waiting for clients (i.e. subscribers) // Logger diff --git a/tcp_pubsub/src/publisher_session.cpp b/tcp_pubsub/src/publisher_session.cpp index e485cfd..380984e 100644 --- a/tcp_pubsub/src/publisher_session.cpp +++ b/tcp_pubsub/src/publisher_session.cpp @@ -30,15 +30,15 @@ namespace tcp_pubsub /// Constructor & Destructor ////////////////////////////////////////////// - PublisherSession::PublisherSession(const std::shared_ptr& io_service + PublisherSession::PublisherSession(const std::shared_ptr& io_context , const std::function&)>& session_closed_handler , const tcp_pubsub::logger::logger_t& log_function) - : io_service_ (io_service) + : io_context_ (io_context) , state_ (State::NotStarted) , session_closed_handler_ (session_closed_handler) , log_ (log_function) - , data_socket_ (*io_service_) - , data_strand_ (*io_service_) + , data_socket_ (*io_context_) + , data_strand_ (*io_context_) , sending_in_progress_ (false) { #if (TCP_PUBSUB_LOG_DEBUG_VERBOSE_ENABLED) diff --git a/tcp_pubsub/src/publisher_session.h b/tcp_pubsub/src/publisher_session.h index 79e84e8..78a0a25 100644 --- a/tcp_pubsub/src/publisher_session.h +++ b/tcp_pubsub/src/publisher_session.h @@ -36,7 +36,7 @@ namespace tcp_pubsub /// Constructor & Destructor ////////////////////////////////////////////// public: - PublisherSession(const std::shared_ptr& io_service + PublisherSession(const std::shared_ptr& io_context , const std::function&)>& session_closed_handler , const tcp_pubsub::logger::logger_t& log_function); @@ -97,7 +97,7 @@ namespace tcp_pubsub ////////////////////////////////////////////// private: // Asio IO Service - std::shared_ptr io_service_; + std::shared_ptr io_context_; // Whether the session has been canceled std::atomic state_; diff --git a/tcp_pubsub/src/subscriber_session_impl.cpp b/tcp_pubsub/src/subscriber_session_impl.cpp index 93e87b5..22d0e95 100644 --- a/tcp_pubsub/src/subscriber_session_impl.cpp +++ b/tcp_pubsub/src/subscriber_session_impl.cpp @@ -32,20 +32,20 @@ namespace tcp_pubsub /// Constructor & Destructor ////////////////////////////////////////////// - SubscriberSession_Impl::SubscriberSession_Impl(const std::shared_ptr& io_service + SubscriberSession_Impl::SubscriberSession_Impl(const std::shared_ptr& io_context , const std::vector>& publisher_list , int max_reconnection_attempts , const std::function>()>& get_buffer_handler , const std::function&)>& session_closed_handler , const tcp_pubsub::logger::logger_t& log_function) : publisher_list_ (publisher_list) - , resolver_ (*io_service) + , resolver_ (*io_context) , max_reconnection_attempts_(max_reconnection_attempts) , retries_left_ (max_reconnection_attempts) - , retry_timer_ (*io_service, std::chrono::seconds(1)) + , retry_timer_ (*io_context, std::chrono::seconds(1)) , canceled_ (false) - , data_socket_ (*io_service) - , data_strand_ (*io_service) + , data_socket_ (*io_context) + , data_strand_ (*io_context) , get_buffer_handler_ (get_buffer_handler) , session_closed_handler_ (session_closed_handler) , log_ (log_function) diff --git a/tcp_pubsub/src/subscriber_session_impl.h b/tcp_pubsub/src/subscriber_session_impl.h index 359692c..7ab8902 100644 --- a/tcp_pubsub/src/subscriber_session_impl.h +++ b/tcp_pubsub/src/subscriber_session_impl.h @@ -25,7 +25,7 @@ class SubscriberSession_Impl : public std::enable_shared_from_this& io_service + SubscriberSession_Impl(const std::shared_ptr& io_context , const std::vector>& publisher_list , int max_reconnection_attempts , const std::function>()>& get_buffer_handler From 658ea30a49463063d4ca178f84d686751ebc0e10 Mon Sep 17 00:00:00 2001 From: DownerCase Date: Mon, 7 Apr 2025 15:07:39 +0100 Subject: [PATCH 4/7] Don't push ASIO_NO_DEPRECATED to consumers --- .github/workflows/build-macos.yml | 1 + .github/workflows/build-ubuntu.yml | 3 ++- .github/workflows/build-windows.yml | 1 + thirdparty/asio/Module/Findasio.cmake | 2 +- 4 files changed, 5 insertions(+), 2 deletions(-) diff --git a/.github/workflows/build-macos.yml b/.github/workflows/build-macos.yml index 4be4cf0..d6132ab 100644 --- a/.github/workflows/build-macos.yml +++ b/.github/workflows/build-macos.yml @@ -35,6 +35,7 @@ jobs: -DTCP_PUBSUB_USE_BUILTIN_RECYCLE=ON \ -DTCP_PUBSUB_USE_BUILTIN_GTEST=ON \ -DCMAKE_BUILD_TYPE=${{env.BUILD_TYPE}} \ + -DCMAKE_CXX_FLAGS=ASIO_NO_DEPRECATED shell: bash - name: Build diff --git a/.github/workflows/build-ubuntu.yml b/.github/workflows/build-ubuntu.yml index b0e569e..a38c59f 100644 --- a/.github/workflows/build-ubuntu.yml +++ b/.github/workflows/build-ubuntu.yml @@ -64,6 +64,7 @@ jobs: -DTCP_PUBSUB_USE_BUILTIN_GTEST=ON \ -DTCP_PUBSUB_LIBRARY_TYPE=${{env.tcp_pubsub_library_type}} \ -DCMAKE_BUILD_TYPE=${{env.BUILD_TYPE}} \ + -DCMAKE_CXX_FLAGS=ASIO_NO_DEPRECATED \ -DBUILD_SHARED_LIBS=${{ env.build_shared_libs }} - name: Build @@ -136,4 +137,4 @@ jobs: - name: Run integration test (Debug) run: ./integration_test working-directory: ${{ github.workspace }}/samples/integration_test/_build/debug - if: ${{ matrix.library_type != 'object' }} \ No newline at end of file + if: ${{ matrix.library_type != 'object' }} diff --git a/.github/workflows/build-windows.yml b/.github/workflows/build-windows.yml index d8b420b..a947aa3 100644 --- a/.github/workflows/build-windows.yml +++ b/.github/workflows/build-windows.yml @@ -71,6 +71,7 @@ jobs: -DTCP_PUBSUB_USE_BUILTIN_RECYCLE=ON ^ -DTCP_PUBSUB_USE_BUILTIN_GTEST=ON ^ -DTCP_PUBSUB_LIBRARY_TYPE=${{env.tcp_pubsub_library_type}} ^ + -DCMAKE_CXX_FLAGS=ASIO_NO_DEPRECATED ^ -DCMAKE_INSTALL_PREFIX=${{env.INSTALL_PREFIX}} - name: Build (Release) diff --git a/thirdparty/asio/Module/Findasio.cmake b/thirdparty/asio/Module/Findasio.cmake index 9277398..e2e12de 100644 --- a/thirdparty/asio/Module/Findasio.cmake +++ b/thirdparty/asio/Module/Findasio.cmake @@ -24,7 +24,7 @@ if(asio_FOUND) add_library(asio::asio INTERFACE IMPORTED) set_target_properties(asio::asio PROPERTIES INTERFACE_INCLUDE_DIRECTORIES ${asio_INCLUDE_DIR} - INTERFACE_COMPILE_DEFINITIONS "ASIO_STANDALONE;ASIO_NO_DEPRECATED") + INTERFACE_COMPILE_DEFINITIONS ASIO_STANDALONE) mark_as_advanced(asio_INCLUDE_DIR) endif() endif() From 89c2adb530a6313023de3bd354ae1af730c4243b Mon Sep 17 00:00:00 2001 From: DownerCase Date: Wed, 9 Apr 2025 11:29:47 +0100 Subject: [PATCH 5/7] Use asio::post --- tcp_pubsub/src/subscriber_session_impl.cpp | 18 +++++++----------- 1 file changed, 7 insertions(+), 11 deletions(-) diff --git a/tcp_pubsub/src/subscriber_session_impl.cpp b/tcp_pubsub/src/subscriber_session_impl.cpp index 22d0e95..f2fd32e 100644 --- a/tcp_pubsub/src/subscriber_session_impl.cpp +++ b/tcp_pubsub/src/subscriber_session_impl.cpp @@ -456,7 +456,7 @@ namespace tcp_pubsub me->log_(logger::LogLevel::DebugVerbose, "SubscriberSession " + me->endpointToString() + ": Received message of type \"RegularPayload\""); #endif // Call the callback first, ... - me->data_strand_.post([me, data_buffer, header]() + asio::post(me->data_strand_, [me, data_buffer, header]() { if (me->canceled_) { @@ -464,8 +464,7 @@ namespace tcp_pubsub return; } me->synchronous_callback_(data_buffer, header); - }, - asio::get_associated_allocator(me->data_strand_)); + }); } else @@ -476,11 +475,9 @@ namespace tcp_pubsub } // ... then start reading the next message - me->data_strand_.post([me]() - { - me->readHeaderLength(); - }, - asio::get_associated_allocator(me->data_strand_)); + asio::post(me->data_strand_, [me]() { + me->readHeaderLength(); + }); })); } @@ -496,11 +493,10 @@ namespace tcp_pubsub // - We can protect the variable with the data_strand => If the callback is currently running, the new callback will be applied afterwards // - We don't need an additional mutex, so a synchronous callback should actually be able to set another callback that gets activated once the current callback call ends // - Reading the next message will start once the callback call is finished. Therefore, read and callback are synchronized and the callback calls don't start stacking up - data_strand_.post([me = shared_from_this(), callback]() + asio::post(data_strand_, [me = shared_from_this(), callback]() { me->synchronous_callback_ = callback; - }, - asio::get_associated_allocator(data_strand_)); + }); } std::vector> SubscriberSession_Impl::getPublisherList() const From f9057198034b6c2cc36f1ef706dd6804682c31fe Mon Sep 17 00:00:00 2001 From: DownerCase Date: Wed, 9 Apr 2025 11:36:41 +0100 Subject: [PATCH 6/7] Fix CI mistake --- .github/workflows/build-macos.yml | 2 +- .github/workflows/build-ubuntu.yml | 2 +- .github/workflows/build-windows.yml | 2 +- 3 files changed, 3 insertions(+), 3 deletions(-) diff --git a/.github/workflows/build-macos.yml b/.github/workflows/build-macos.yml index d6132ab..0a0aa75 100644 --- a/.github/workflows/build-macos.yml +++ b/.github/workflows/build-macos.yml @@ -35,7 +35,7 @@ jobs: -DTCP_PUBSUB_USE_BUILTIN_RECYCLE=ON \ -DTCP_PUBSUB_USE_BUILTIN_GTEST=ON \ -DCMAKE_BUILD_TYPE=${{env.BUILD_TYPE}} \ - -DCMAKE_CXX_FLAGS=ASIO_NO_DEPRECATED + -DCMAKE_CXX_FLAGS=-DASIO_NO_DEPRECATED shell: bash - name: Build diff --git a/.github/workflows/build-ubuntu.yml b/.github/workflows/build-ubuntu.yml index a38c59f..00aad9f 100644 --- a/.github/workflows/build-ubuntu.yml +++ b/.github/workflows/build-ubuntu.yml @@ -64,7 +64,7 @@ jobs: -DTCP_PUBSUB_USE_BUILTIN_GTEST=ON \ -DTCP_PUBSUB_LIBRARY_TYPE=${{env.tcp_pubsub_library_type}} \ -DCMAKE_BUILD_TYPE=${{env.BUILD_TYPE}} \ - -DCMAKE_CXX_FLAGS=ASIO_NO_DEPRECATED \ + -DCMAKE_CXX_FLAGS=-DASIO_NO_DEPRECATED \ -DBUILD_SHARED_LIBS=${{ env.build_shared_libs }} - name: Build diff --git a/.github/workflows/build-windows.yml b/.github/workflows/build-windows.yml index a947aa3..ff92649 100644 --- a/.github/workflows/build-windows.yml +++ b/.github/workflows/build-windows.yml @@ -71,7 +71,7 @@ jobs: -DTCP_PUBSUB_USE_BUILTIN_RECYCLE=ON ^ -DTCP_PUBSUB_USE_BUILTIN_GTEST=ON ^ -DTCP_PUBSUB_LIBRARY_TYPE=${{env.tcp_pubsub_library_type}} ^ - -DCMAKE_CXX_FLAGS=ASIO_NO_DEPRECATED ^ + -DCMAKE_CXX_FLAGS=/DASIO_NO_DEPRECATED ^ -DCMAKE_INSTALL_PREFIX=${{env.INSTALL_PREFIX}} - name: Build (Release) From 7c030804d641736a6ede58a94870bd3c1304abb0 Mon Sep 17 00:00:00 2001 From: DownerCase Date: Wed, 9 Apr 2025 12:34:01 +0100 Subject: [PATCH 7/7] Try updating vs toolset? --- .github/workflows/build-windows.yml | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/.github/workflows/build-windows.yml b/.github/workflows/build-windows.yml index ff92649..c31f3ac 100644 --- a/.github/workflows/build-windows.yml +++ b/.github/workflows/build-windows.yml @@ -9,8 +9,8 @@ env: # Customize the CMake build type here (Release, Debug, RelWithDebInfo, etc.) INSTALL_PREFIX: _install PROJECT_NAME: tcp_pubsub - VS_TOOLSET: v140 - VS_NAME: vs2015 + VS_TOOLSET: v141 + VS_NAME: vs2017 jobs: build-windows: