Skip to content

Commit 543b2f9

Browse files
committed
Design changes that move most transient local publish functionalities out of
intra process manager into intra process manager Signed-off-by: Jeffery Hsu <[email protected]>
1 parent 38fc4d2 commit 543b2f9

File tree

7 files changed

+320
-151
lines changed

7 files changed

+320
-151
lines changed

rclcpp/include/rclcpp/experimental/intra_process_manager.hpp

Lines changed: 77 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -34,6 +34,7 @@
3434
#include "rclcpp/experimental/action_server_intra_process_base.hpp"
3535
#include "rclcpp/experimental/client_intra_process.hpp"
3636
#include "rclcpp/experimental/client_intra_process_base.hpp"
37+
#include "rclcpp/experimental/buffers/intra_process_buffer.hpp"
3738
#include "rclcpp/experimental/ros_message_intra_process_buffer.hpp"
3839
#include "rclcpp/experimental/service_intra_process.hpp"
3940
#include "rclcpp/experimental/service_intra_process_base.hpp"
@@ -120,9 +121,76 @@ class IntraProcessManager
120121
* \param subscription the SubscriptionIntraProcess to register.
121122
* \return an unsigned 64-bit integer which is the subscription's unique id.
122123
*/
124+
template<
125+
typename ROSMessageType,
126+
typename Alloc = std::allocator<ROSMessageType>
127+
>
123128
RCLCPP_PUBLIC
124129
uint64_t
125-
add_subscription(rclcpp::experimental::SubscriptionIntraProcessBase::SharedPtr subscription);
130+
add_subscription(rclcpp::experimental::SubscriptionIntraProcessBase::SharedPtr subscription)
131+
{
132+
using ROSMessageTypeAllocatorTraits = allocator::AllocRebind<ROSMessageType, Alloc>;
133+
using ROSMessageTypeAllocator = typename ROSMessageTypeAllocatorTraits::allocator_type;
134+
using ROSMessageTypeDeleter = allocator::Deleter<ROSMessageTypeAllocator, ROSMessageType>;
135+
136+
std::unique_lock<std::shared_timed_mutex> lock(mutex_);
137+
138+
uint64_t sub_id = IntraProcessManager::get_next_unique_id();
139+
140+
subscriptions_[sub_id] = subscription;
141+
142+
// adds the subscription id to all the matchable publishers
143+
for (auto & pair : publishers_) {
144+
auto publisher = pair.second.lock();
145+
if (!publisher) {
146+
continue;
147+
}
148+
if (can_communicate(publisher, subscription)) {
149+
uint64_t pub_id = pair.first;
150+
insert_sub_id_for_pub(sub_id, pub_id, subscription->use_take_shared_method());
151+
if (publisher->is_durability_transient_local() &&
152+
subscription->is_durability_transient_local())
153+
{
154+
auto publisher_buffer = publisher_buffers_[pub_id].lock();
155+
if (!publisher_buffer) {
156+
throw std::runtime_error("publisher buffer has unexpectedly gone out of scope");
157+
}
158+
auto buffer = std::dynamic_pointer_cast<
159+
rclcpp::experimental::buffers::IntraProcessBuffer<
160+
ROSMessageType,
161+
ROSMessageTypeAllocator,
162+
ROSMessageTypeDeleter
163+
>
164+
>(publisher_buffer);
165+
if (!buffer) {
166+
throw std::runtime_error(
167+
"failed to dynamic cast publisher's IntraProcessBufferBase to "
168+
"IntraProcessBuffer<ROSMessageType,ROSMessageTypeAllocator,"
169+
"ROSMessageTypeDeleter> which can happen when the publisher and "
170+
"subscription use different allocator types, which is not supported");
171+
}
172+
if (subscription->use_take_shared_method()) {
173+
auto data_vec = buffer->get_all_data_shared();
174+
for (auto shared_data : data_vec) {
175+
this->template add_shared_msg_to_buffer<
176+
ROSMessageType, ROSMessageTypeAllocator, ROSMessageTypeDeleter, ROSMessageType>(
177+
shared_data, sub_id);
178+
}
179+
} else {
180+
auto data_vec = buffer->get_all_data_unique();
181+
for (auto & owned_data : data_vec) {
182+
auto allocator = ROSMessageTypeAllocator();
183+
this->template add_owned_msg_to_buffer<
184+
ROSMessageType, ROSMessageTypeAllocator, ROSMessageTypeDeleter, ROSMessageType>(
185+
std::move(owned_data), sub_id, allocator);
186+
}
187+
}
188+
}
189+
}
190+
}
191+
192+
return sub_id;
193+
}
126194

127195
/// Register an intra-process client with the manager, returns the client unique id.
128196
/**
@@ -218,7 +286,10 @@ class IntraProcessManager
218286
*/
219287
RCLCPP_PUBLIC
220288
uint64_t
221-
add_publisher(rclcpp::PublisherBase::SharedPtr publisher);
289+
add_publisher(
290+
rclcpp::PublisherBase::SharedPtr publisher,
291+
rclcpp::experimental::buffers::IntraProcessBufferBase::SharedPtr buffer =
292+
rclcpp::experimental::buffers::IntraProcessBufferBase::SharedPtr());
222293

223294
/// Unregister a publisher using the publisher's unique id.
224295
/**
@@ -789,6 +860,9 @@ class IntraProcessManager
789860
using PublisherMap =
790861
std::unordered_map<uint64_t, rclcpp::PublisherBase::WeakPtr>;
791862

863+
using PublisherBufferMap =
864+
std::unordered_map<uint64_t, rclcpp::experimental::buffers::IntraProcessBufferBase::WeakPtr>;
865+
792866
using PublisherToSubscriptionIdsMap =
793867
std::unordered_map<uint64_t, SplittedSubscriptions>;
794868

@@ -1023,6 +1097,7 @@ class IntraProcessManager
10231097
ActionClientToServerIdsMap action_clients_to_servers_;
10241098

10251099
std::unordered_map<size_t, uint64_t> clients_uuid_to_id_;
1100+
PublisherBufferMap publisher_buffers_;
10261101

10271102
mutable std::shared_timed_mutex mutex_;
10281103
};

rclcpp/include/rclcpp/publisher.hpp

Lines changed: 4 additions & 51 deletions
Original file line numberDiff line numberDiff line change
@@ -112,11 +112,11 @@ class Publisher : public PublisherBase
112112
[[deprecated("use std::shared_ptr<const PublishedType>")]] =
113113
std::shared_ptr<const PublishedType>;
114114

115-
using BufferUniquePtr = typename rclcpp::experimental::buffers::IntraProcessBuffer<
115+
using BufferSharedPtr = typename rclcpp::experimental::buffers::IntraProcessBuffer<
116116
ROSMessageType,
117117
ROSMessageTypeAllocator,
118118
ROSMessageTypeDeleter
119-
>::UniquePtr;
119+
>::SharedPtr;
120120

121121
RCLCPP_SMART_PTR_DEFINITIONS(Publisher<MessageT, AllocatorT>)
122122

@@ -211,7 +211,7 @@ class Publisher : public PublisherBase
211211
qos,
212212
std::make_shared<ROSMessageTypeAllocator>(ros_message_type_allocator_));
213213
}
214-
uint64_t intra_process_publisher_id = ipm->add_publisher(this->shared_from_this());
214+
uint64_t intra_process_publisher_id = ipm->add_publisher(this->shared_from_this(), buffer_);
215215
this->setup_intra_process(
216216
intra_process_publisher_id,
217217
ipm);
@@ -470,53 +470,6 @@ class Publisher : public PublisherBase
470470
return ros_message_type_allocator_;
471471
}
472472

473-
/// Publish shared messages from intra process buffer for late joiner
474-
/**
475-
* This signature allows the user to give a sub_id from intra process manager.
476-
* The function will publish messages currently held in buffer_ to the subscription
477-
* as shared pointers.
478-
*
479-
* \param[in] sud_id subscription id in ipm to publish data to
480-
*/
481-
void do_shared_intra_process_publish_for_late_joiner(const uint64_t sub_id) override
482-
{
483-
auto ipm = weak_ipm_.lock();
484-
if (!buffer_ || !ipm) {
485-
throw std::runtime_error(
486-
"intra process publish for late joiner called "
487-
"after destruction of intra process manager and/or transient_local buffer");
488-
}
489-
auto data_vec = buffer_->get_all_data_shared();
490-
for (auto shared_data : data_vec) {
491-
ipm->template add_shared_msg_to_buffer<
492-
ROSMessageType, AllocatorT, ROSMessageTypeDeleter, ROSMessageType>(shared_data, sub_id);
493-
}
494-
}
495-
496-
/// Publish owned messages from intra process buffer for late joiner
497-
/**
498-
* This signature allows the user to give a sub_id from intra process manager.
499-
* The function will publish messages currently held in buffer_ to the subscription
500-
* as unique pointers.
501-
*
502-
* \param[in] sud_id subscription id in ipm to publish data to
503-
*/
504-
void do_unique_intra_process_publish_for_late_joiner(const uint64_t sub_id) override
505-
{
506-
auto ipm = weak_ipm_.lock();
507-
if (!buffer_ || !ipm) {
508-
throw std::runtime_error(
509-
"intra process publish for late joiner called "
510-
"after destruction of intra process manager and/or transient_local buffer");
511-
}
512-
auto data_vec = buffer_->get_all_data_unique();
513-
for (auto & owned_data : data_vec) {
514-
ipm->template add_owned_msg_to_buffer<
515-
ROSMessageType, AllocatorT, ROSMessageTypeDeleter, ROSMessageType>(
516-
std::move(owned_data), sub_id, ros_message_type_allocator_);
517-
}
518-
}
519-
520473
protected:
521474
void
522475
do_inter_process_publish(const ROSMessageType & msg)
@@ -669,7 +622,7 @@ class Publisher : public PublisherBase
669622
ROSMessageTypeAllocator ros_message_type_allocator_;
670623
ROSMessageTypeDeleter ros_message_type_deleter_;
671624

672-
BufferUniquePtr buffer_;
625+
BufferSharedPtr buffer_{nullptr};
673626
};
674627

675628
} // namespace rclcpp

rclcpp/include/rclcpp/publisher_base.hpp

Lines changed: 0 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -319,27 +319,6 @@ class PublisherBase : public std::enable_shared_from_this<PublisherBase>
319319
event_handlers_[event_type]->clear_on_ready_callback();
320320
}
321321

322-
323-
/// Publish shared messages from intra process buffer for late joiner
324-
/**
325-
* This signature allows the user to give a sub_id from intra process manager.
326-
* The function will publish messages currently held in buffer_ to the subscription
327-
* as shared pointers.
328-
*
329-
* \param[in] sud_id subscription id in ipm to publish data to
330-
*/
331-
virtual void do_shared_intra_process_publish_for_late_joiner(const uint64_t sub_id);
332-
333-
/// Publish owned messages from intra process buffer for late joiner
334-
/**
335-
* This signature allows the user to give a sub_id from intra process manager.
336-
* The function will publish messages currently held in buffer_ to the subscription
337-
* as unique pointers.
338-
*
339-
* \param[in] sud_id subscription id in ipm to publish data to
340-
*/
341-
virtual void do_unique_intra_process_publish_for_late_joiner(const uint64_t sub_id);
342-
343322
protected:
344323
template<typename EventCallbackT>
345324
void

rclcpp/include/rclcpp/subscription.hpp

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -218,7 +218,8 @@ class Subscription : public SubscriptionBase
218218
// Add it to the intra process manager.
219219
using rclcpp::experimental::IntraProcessManager;
220220
auto ipm = context->get_sub_context<IntraProcessManager>();
221-
uint64_t intra_process_subscription_id = ipm->add_subscription(subscription_intra_process_);
221+
uint64_t intra_process_subscription_id = ipm->template add_subscription<
222+
ROSMessageType, ROSMessageTypeAllocator>(subscription_intra_process_);
222223
this->setup_intra_process(intra_process_subscription_id, ipm);
223224
}
224225

rclcpp/src/rclcpp/intra_process_manager.cpp

Lines changed: 12 additions & 34 deletions
Original file line numberDiff line numberDiff line change
@@ -26,13 +26,24 @@ namespace experimental
2626
static std::atomic<uint64_t> _next_unique_id {1};
2727

2828
uint64_t
29-
IntraProcessManager::add_publisher(rclcpp::PublisherBase::SharedPtr publisher)
29+
IntraProcessManager::add_publisher(
30+
rclcpp::PublisherBase::SharedPtr publisher,
31+
rclcpp::experimental::buffers::IntraProcessBufferBase::SharedPtr buffer)
3032
{
3133
std::unique_lock<std::shared_timed_mutex> lock(mutex_);
3234

3335
uint64_t pub_id = IntraProcessManager::get_next_unique_id();
3436

3537
publishers_[pub_id] = publisher;
38+
if (publisher->is_durability_transient_local()) {
39+
if (buffer) {
40+
publisher_buffers_[pub_id] = buffer;
41+
} else {
42+
throw std::runtime_error(
43+
"transient_local publisher needs to pass"
44+
"a valid publisher buffer ptr when calling add_publisher()");
45+
}
46+
}
3647

3748
// Initialize the subscriptions storage for this publisher.
3849
pub_to_subs_[pub_id] = SplittedSubscriptions();
@@ -52,39 +63,6 @@ IntraProcessManager::add_publisher(rclcpp::PublisherBase::SharedPtr publisher)
5263
return pub_id;
5364
}
5465

55-
uint64_t
56-
IntraProcessManager::add_subscription(SubscriptionIntraProcessBase::SharedPtr subscription)
57-
{
58-
std::unique_lock<std::shared_timed_mutex> lock(mutex_);
59-
60-
uint64_t sub_id = IntraProcessManager::get_next_unique_id();
61-
62-
subscriptions_[sub_id] = subscription;
63-
64-
// adds the subscription id to all the matchable publishers
65-
for (auto & pair : publishers_) {
66-
auto publisher = pair.second.lock();
67-
if (!publisher) {
68-
continue;
69-
}
70-
if (can_communicate(publisher, subscription)) {
71-
uint64_t pub_id = pair.first;
72-
insert_sub_id_for_pub(sub_id, pub_id, subscription->use_take_shared_method());
73-
if (publisher->is_durability_transient_local() &&
74-
subscription->is_durability_transient_local())
75-
{
76-
if (subscription->use_take_shared_method()) {
77-
publisher->do_shared_intra_process_publish_for_late_joiner(sub_id);
78-
} else {
79-
publisher->do_unique_intra_process_publish_for_late_joiner(sub_id);
80-
}
81-
}
82-
}
83-
}
84-
85-
return sub_id;
86-
}
87-
8866
uint64_t
8967
IntraProcessManager::add_intra_process_client(ClientIntraProcessBase::SharedPtr client)
9068
{

rclcpp/src/rclcpp/publisher_base.cpp

Lines changed: 0 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -317,11 +317,3 @@ std::vector<rclcpp::NetworkFlowEndpoint> PublisherBase::get_network_flow_endpoin
317317
return network_flow_endpoint_vector;
318318
}
319319

320-
void PublisherBase::do_shared_intra_process_publish_for_late_joiner(const uint64_t)
321-
{
322-
throw std::runtime_error("intra process publish for late joiner is not implemented");
323-
}
324-
void PublisherBase::do_unique_intra_process_publish_for_late_joiner(const uint64_t)
325-
{
326-
throw std::runtime_error("intra process publish for late joiner is not implemented");
327-
}

0 commit comments

Comments
 (0)