Skip to content

Commit eaeda0f

Browse files
mauropasseMauro Passerino
andauthored
Humble backport and new fixes (#154)
* Always publish inter-process on TRANSIENT_LOCAL pubs (#152) * Backport upstream Action fixes - ros2#2531 * Fix actions feedback race - ros2#2451 * Fix data race in Actions: Part 3 --------- Co-authored-by: Mauro Passerino <[email protected]>
1 parent c2acb19 commit eaeda0f

File tree

4 files changed

+576
-251
lines changed

4 files changed

+576
-251
lines changed

rclcpp/include/rclcpp/experimental/intra_process_manager.hpp

Lines changed: 5 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -577,11 +577,14 @@ class IntraProcessManager
577577
auto server = get_matching_intra_process_action_server<ActionT>(ipc_action_client_id);
578578

579579
if (server) {
580+
{
581+
std::unique_lock<std::shared_timed_mutex> lock(mutex_);
582+
clients_uuid_to_id_[goal_id] = ipc_action_client_id;
583+
}
584+
580585
server->store_ipc_action_goal_request(
581586
ipc_action_client_id, std::move(goal_request));
582587

583-
std::unique_lock<std::shared_timed_mutex> lock(mutex_);
584-
clients_uuid_to_id_[goal_id] = ipc_action_client_id;
585588
return true;
586589
}
587590
return false;

rclcpp/include/rclcpp/publisher.hpp

Lines changed: 5 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -351,7 +351,11 @@ class Publisher : public PublisherBase
351351
bool inter_process_publish_needed =
352352
get_subscription_count() > get_intra_process_subscription_count();
353353

354-
if (inter_process_publish_needed) {
354+
// If the publisher is configured with transient local durability, we must publish
355+
// inter-process. This ensures that the RMW stores the messages for late joiner subscriptions.
356+
// This has the consequence of subscriptions experiencing the double-delivery issue
357+
// mentioned in https://github.com/ros2/rclcpp/issues/1750
358+
if (inter_process_publish_needed || buffer_) {
355359
auto ros_msg_ptr = std::make_shared<ROSMessageType>();
356360
// TODO(clalancette): This is unnecessarily doing an additional conversion
357361
// that may have already been done in do_intra_process_publish_and_return_shared().
@@ -363,11 +367,6 @@ class Publisher : public PublisherBase
363367
buffer_->add_shared(ros_msg_ptr);
364368
}
365369
} else {
366-
if (buffer_) {
367-
auto ros_msg_ptr = std::make_shared<ROSMessageType>();
368-
rclcpp::TypeAdapter<MessageT>::convert_to_ros_message(*msg, *ros_msg_ptr);
369-
buffer_->add_shared(ros_msg_ptr);
370-
}
371370
this->do_intra_process_publish(std::move(msg));
372371
}
373372
}

0 commit comments

Comments
 (0)