@@ -129,10 +129,6 @@ class IntraProcessManager
129129 uint64_t
130130 add_subscription (rclcpp::experimental::SubscriptionIntraProcessBase::SharedPtr subscription)
131131 {
132- using ROSMessageTypeAllocatorTraits = allocator::AllocRebind<ROSMessageType, Alloc>;
133- using ROSMessageTypeAllocator = typename ROSMessageTypeAllocatorTraits::allocator_type;
134- using ROSMessageTypeDeleter = allocator::Deleter<ROSMessageTypeAllocator, ROSMessageType>;
135-
136132 std::unique_lock<std::shared_timed_mutex> lock (mutex_);
137133
138134 uint64_t sub_id = IntraProcessManager::get_next_unique_id ();
@@ -151,40 +147,9 @@ class IntraProcessManager
151147 if (publisher->is_durability_transient_local () &&
152148 subscription->is_durability_transient_local ())
153149 {
154- auto publisher_buffer = publisher_buffers_[pub_id].lock ();
155- if (!publisher_buffer) {
156- throw std::runtime_error (" publisher buffer has unexpectedly gone out of scope" );
157- }
158- auto buffer = std::dynamic_pointer_cast<
159- rclcpp::experimental::buffers::IntraProcessBuffer<
160- ROSMessageType,
161- ROSMessageTypeAllocator,
162- ROSMessageTypeDeleter
163- >
164- >(publisher_buffer);
165- if (!buffer) {
166- throw std::runtime_error (
167- " failed to dynamic cast publisher's IntraProcessBufferBase to "
168- " IntraProcessBuffer<ROSMessageType,ROSMessageTypeAllocator,"
169- " ROSMessageTypeDeleter> which can happen when the publisher and "
170- " subscription use different allocator types, which is not supported" );
171- }
172- if (subscription->use_take_shared_method ()) {
173- auto data_vec = buffer->get_all_data_shared ();
174- for (auto shared_data : data_vec) {
175- this ->template add_shared_msg_to_buffer <
176- ROSMessageType, ROSMessageTypeAllocator, ROSMessageTypeDeleter, ROSMessageType>(
177- shared_data, sub_id);
178- }
179- } else {
180- auto data_vec = buffer->get_all_data_unique ();
181- for (auto & owned_data : data_vec) {
182- auto allocator = ROSMessageTypeAllocator ();
183- this ->template add_owned_msg_to_buffer <
184- ROSMessageType, ROSMessageTypeAllocator, ROSMessageTypeDeleter, ROSMessageType>(
185- std::move (owned_data), sub_id, allocator);
186- }
187- }
150+ do_transient_local_publish<ROSMessageType, Alloc>(
151+ pub_id, sub_id,
152+ subscription->use_take_shared_method ());
188153 }
189154 }
190155 }
@@ -911,6 +876,55 @@ class IntraProcessManager
911876 rclcpp::experimental::ActionClientIntraProcessBase::SharedPtr client,
912877 rclcpp::experimental::ActionServerIntraProcessBase::SharedPtr service) const ;
913878
879+ template <
880+ typename ROSMessageType,
881+ typename Alloc = std::allocator<ROSMessageType>
882+ >
883+ RCLCPP_PUBLIC
884+ void do_transient_local_publish (
885+ const uint64_t pub_id, const uint64_t sub_id,
886+ const bool use_take_shared_method)
887+ {
888+ using ROSMessageTypeAllocatorTraits = allocator::AllocRebind<ROSMessageType, Alloc>;
889+ using ROSMessageTypeAllocator = typename ROSMessageTypeAllocatorTraits::allocator_type;
890+ using ROSMessageTypeDeleter = allocator::Deleter<ROSMessageTypeAllocator, ROSMessageType>;
891+
892+ auto publisher_buffer = publisher_buffers_[pub_id].lock ();
893+ if (!publisher_buffer) {
894+ throw std::runtime_error (" publisher buffer has unexpectedly gone out of scope" );
895+ }
896+ auto buffer = std::dynamic_pointer_cast<
897+ rclcpp::experimental::buffers::IntraProcessBuffer<
898+ ROSMessageType,
899+ ROSMessageTypeAllocator,
900+ ROSMessageTypeDeleter
901+ >
902+ >(publisher_buffer);
903+ if (!buffer) {
904+ throw std::runtime_error (
905+ " failed to dynamic cast publisher's IntraProcessBufferBase to "
906+ " IntraProcessBuffer<ROSMessageType,ROSMessageTypeAllocator,"
907+ " ROSMessageTypeDeleter> which can happen when the publisher and "
908+ " subscription use different allocator types, which is not supported" );
909+ }
910+ if (use_take_shared_method) {
911+ auto data_vec = buffer->get_all_data_shared ();
912+ for (auto shared_data : data_vec) {
913+ this ->template add_shared_msg_to_buffer <
914+ ROSMessageType, ROSMessageTypeAllocator, ROSMessageTypeDeleter, ROSMessageType>(
915+ shared_data, sub_id);
916+ }
917+ } else {
918+ auto data_vec = buffer->get_all_data_unique ();
919+ for (auto & owned_data : data_vec) {
920+ auto allocator = ROSMessageTypeAllocator ();
921+ this ->template add_owned_msg_to_buffer <
922+ ROSMessageType, ROSMessageTypeAllocator, ROSMessageTypeDeleter, ROSMessageType>(
923+ std::move (owned_data), sub_id, allocator);
924+ }
925+ }
926+ }
927+
914928 template <
915929 typename MessageT,
916930 typename Alloc,
0 commit comments