Skip to content

Commit f5e08c2

Browse files
Fix transient local IPC publish (#2708)
* Fix transient local publish when inter and intra process communications are both present. * Apply the fix to TypeAdapted signature * Add an executor to intra_process_inter_process_mix_transient_local test case to enable inter process publishing test Signed-off-by: Jeffery Hsu <[email protected]> Co-authored-by: Tomoya Fujita <[email protected]>
1 parent 016cfea commit f5e08c2

File tree

2 files changed

+52
-7
lines changed

2 files changed

+52
-7
lines changed

rclcpp/include/rclcpp/publisher.hpp

Lines changed: 9 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -235,8 +235,12 @@ class Publisher : public PublisherBase
235235
// interprocess publish, resulting in lower publish-to-subscribe latency.
236236
// It's not possible to do that with an unique_ptr,
237237
// as do_intra_process_publish takes the ownership of the message.
238+
239+
// When durability is set to TransientLocal (i.e. there is a buffer),
240+
// inter process publish should always take place to ensure
241+
// late joiners receive past data.
238242
bool inter_process_publish_needed =
239-
get_subscription_count() > get_intra_process_subscription_count();
243+
get_subscription_count() > get_intra_process_subscription_count() || buffer_;
240244

241245
if (inter_process_publish_needed) {
242246
auto shared_msg =
@@ -313,8 +317,11 @@ class Publisher : public PublisherBase
313317
return;
314318
}
315319

320+
// When durability is set to TransientLocal (i.e. there is a buffer),
321+
// inter process publish should always take place to ensure
322+
// late joiners receive past data.
316323
bool inter_process_publish_needed =
317-
get_subscription_count() > get_intra_process_subscription_count();
324+
get_subscription_count() > get_intra_process_subscription_count() || buffer_;
318325

319326
if (inter_process_publish_needed) {
320327
auto ros_msg_ptr = std::make_shared<ROSMessageType>();

rclcpp/test/rclcpp/test_publisher.cpp

Lines changed: 43 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,7 @@
1414

1515
#include <gtest/gtest.h>
1616

17+
#include <future>
1718
#include <chrono>
1819
#include <memory>
1920
#include <string>
@@ -725,11 +726,7 @@ TEST_F(TestPublisher, intra_process_transient_local) {
725726
auto do_nothing = [](std::shared_ptr<const test_msgs::msg::Empty>) {};
726727
struct IntraProcessCallback
727728
{
728-
void callback_fun(size_t s)
729-
{
730-
(void) s;
731-
called = true;
732-
}
729+
void callback_fun(size_t) {called = true;}
733730
bool called = false;
734731
};
735732
rclcpp::SubscriptionOptions sub_options_ipm_disabled;
@@ -786,3 +783,44 @@ TEST_F(TestPublisher, intra_process_transient_local) {
786783
EXPECT_FALSE(callback3.called);
787784
EXPECT_FALSE(callback4.called);
788785
}
786+
787+
TEST_F(TestPublisher, intra_process_inter_process_mix_transient_local) {
788+
constexpr auto history_depth = 10u;
789+
initialize(rclcpp::NodeOptions().use_intra_process_comms(true));
790+
791+
rclcpp::PublisherOptionsWithAllocator<std::allocator<void>> pub_options_ipm_enabled;
792+
pub_options_ipm_enabled.use_intra_process_comm = rclcpp::IntraProcessSetting::Enable;
793+
794+
auto pub_ipm_enabled_transient_local_enabled = node->create_publisher<test_msgs::msg::Empty>(
795+
"topic1",
796+
rclcpp::QoS(rclcpp::KeepLast(history_depth)).transient_local(), pub_options_ipm_enabled);
797+
798+
test_msgs::msg::Empty msg;
799+
pub_ipm_enabled_transient_local_enabled->publish(msg);
800+
801+
auto do_nothing = [](std::shared_ptr<const test_msgs::msg::Empty>) {};
802+
struct SubscriptionCallback
803+
{
804+
void callback_fun(size_t) {called.set_value();}
805+
std::promise<void> called;
806+
};
807+
rclcpp::SubscriptionOptions sub_options_ipm_disabled;
808+
sub_options_ipm_disabled.use_intra_process_comm = rclcpp::IntraProcessSetting::Disable;
809+
SubscriptionCallback intra_callback, inter_callback;
810+
std::future<void> intra_callback_future = intra_callback.called.get_future(),
811+
inter_callback_future = inter_callback.called.get_future();
812+
auto sub_ipm_disabled_transient_local_enabled = node->create_subscription<test_msgs::msg::Empty>(
813+
"topic1",
814+
rclcpp::QoS(rclcpp::KeepLast(history_depth)).transient_local(),
815+
do_nothing, sub_options_ipm_disabled);
816+
sub_ipm_disabled_transient_local_enabled->set_on_new_intra_process_message_callback(
817+
std::bind(&SubscriptionCallback::callback_fun, &intra_callback, std::placeholders::_1));
818+
sub_ipm_disabled_transient_local_enabled->set_on_new_message_callback(
819+
std::bind(&SubscriptionCallback::callback_fun, &inter_callback, std::placeholders::_1));
820+
rclcpp::executors::SingleThreadedExecutor executor;
821+
executor.add_node(node);
822+
EXPECT_EQ(executor.spin_until_future_complete(inter_callback_future,
823+
std::chrono::milliseconds(100)), rclcpp::FutureReturnCode::SUCCESS);
824+
EXPECT_EQ(executor.spin_until_future_complete(intra_callback_future,
825+
std::chrono::milliseconds(100)), rclcpp::FutureReturnCode::TIMEOUT);
826+
}

0 commit comments

Comments
 (0)