Skip to content

Commit dd0cd98

Browse files
Alberto Soragnaalsora
authored andcommitted
allow to set ipc sub callback from regular subscription
Signed-off-by: Alberto Soragna <[email protected]>
1 parent e151cc2 commit dd0cd98

File tree

3 files changed

+121
-6
lines changed

3 files changed

+121
-6
lines changed

rclcpp/include/rclcpp/subscription.hpp

Lines changed: 0 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -343,11 +343,6 @@ class Subscription : public SubscriptionBase
343343
message_memory_strategy_;
344344
/// Component which computes and publishes topic statistics for this subscriber
345345
SubscriptionTopicStatisticsSharedPtr subscription_topic_statistics_{nullptr};
346-
using SubscriptionIntraProcessT = rclcpp::experimental::SubscriptionIntraProcess<
347-
CallbackMessageT,
348-
AllocatorT,
349-
typename MessageUniquePtr::deleter_type>;
350-
std::shared_ptr<SubscriptionIntraProcessT> subscription_intra_process_;
351346
};
352347

353348
} // namespace rclcpp

rclcpp/include/rclcpp/subscription_base.hpp

Lines changed: 50 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -346,6 +346,55 @@ class SubscriptionBase : public std::enable_shared_from_this<SubscriptionBase>
346346
on_new_message_callback_ = nullptr;
347347
}
348348

349+
/// Set a callback to be called when each new intra-process message is received.
350+
/**
351+
* The callback receives a size_t which is the number of messages received
352+
* since the last time this callback was called.
353+
* Normally this is 1, but can be > 1 if messages were received before any
354+
* callback was set.
355+
*
356+
* Calling it again will clear any previously set callback.
357+
*
358+
* This function is thread-safe.
359+
*
360+
* If you want more information available in the callback, like the subscription
361+
* or other information, you may use a lambda with captures or std::bind.
362+
*
363+
* \sa rclcpp::SubscriptionIntraProcessBase::set_on_ready_callback
364+
*
365+
* \param[in] callback functor to be called when a new message is received
366+
*/
367+
void
368+
set_on_new_intra_process_message_callback(std::function<void(size_t)> callback)
369+
{
370+
if (!use_intra_process_) {
371+
RCLCPP_WARN(
372+
rclcpp::get_logger("rclcpp"),
373+
"Calling set_on_new_intra_process_message_callback for subscription with IPC disabled");
374+
return;
375+
}
376+
377+
// The on_ready_callback signature has an extra `int` argument used to disambiguate between
378+
// possible different entities within a generic waitable.
379+
// We hide that detail to users of this method.
380+
std::function<void(size_t, int)> new_callback = std::bind(callback, std::placeholders::_1);
381+
subscription_intra_process_->set_on_ready_callback(new_callback);
382+
}
383+
384+
/// Unset the callback registered for new intra-process messages, if any.
385+
void
386+
clear_on_new_intra_process_message_callback()
387+
{
388+
if (!use_intra_process_) {
389+
RCLCPP_WARN(
390+
rclcpp::get_logger("rclcpp"),
391+
"Calling clear_on_new_intra_process_message_callback for subscription with IPC disabled");
392+
return;
393+
}
394+
395+
subscription_intra_process_->clear_on_ready_callback();
396+
}
397+
349398
protected:
350399
template<typename EventCallbackT>
351400
void
@@ -386,6 +435,7 @@ class SubscriptionBase : public std::enable_shared_from_this<SubscriptionBase>
386435
bool use_intra_process_;
387436
IntraProcessManagerWeakPtr weak_ipm_;
388437
uint64_t intra_process_subscription_id_;
438+
std::shared_ptr<SubscriptionIntraProcessBase> subscription_intra_process_;
389439

390440
private:
391441
RCLCPP_DISABLE_COPY(SubscriptionBase)

rclcpp/test/rclcpp/test_subscription.cpp

Lines changed: 71 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -444,7 +444,7 @@ TEST_F(TestSubscription, handle_loaned_message) {
444444
Testing on_new_message callbacks.
445445
*/
446446
TEST_F(TestSubscription, on_new_message_callback) {
447-
initialize();
447+
initialize(rclcpp::NodeOptions().use_intra_process_comms(false));
448448
using test_msgs::msg::Empty;
449449

450450
auto do_nothing = [](std::shared_ptr<const test_msgs::msg::Empty>) {FAIL();};
@@ -510,6 +510,76 @@ TEST_F(TestSubscription, on_new_message_callback) {
510510
EXPECT_THROW(sub->set_on_new_message_callback(invalid_cb), std::invalid_argument);
511511
}
512512

513+
/*
514+
Testing on_new_intra_process_message callbacks.
515+
*/
516+
TEST_F(TestSubscription, on_new_intra_process_message_callback) {
517+
initialize(rclcpp::NodeOptions().use_intra_process_comms(true));
518+
using test_msgs::msg::Empty;
519+
520+
auto do_nothing = [](std::shared_ptr<const test_msgs::msg::Empty>) {FAIL();};
521+
auto sub = node->create_subscription<test_msgs::msg::Empty>("~/test_take", 1, do_nothing);
522+
523+
std::atomic<size_t> c1 {0};
524+
auto increase_c1_cb = [&c1](size_t count_msgs) {c1 += count_msgs;};
525+
sub->set_on_new_intra_process_message_callback(increase_c1_cb);
526+
527+
auto pub = node->create_publisher<test_msgs::msg::Empty>("~/test_take", 1);
528+
{
529+
test_msgs::msg::Empty msg;
530+
pub->publish(msg);
531+
}
532+
533+
auto start = std::chrono::steady_clock::now();
534+
do {
535+
std::this_thread::sleep_for(100ms);
536+
} while (c1 == 0 && std::chrono::steady_clock::now() - start < 10s);
537+
538+
EXPECT_EQ(c1.load(), 1u);
539+
540+
std::atomic<size_t> c2 {0};
541+
auto increase_c2_cb = [&c2](size_t count_msgs) {c2 += count_msgs;};
542+
sub->set_on_new_intra_process_message_callback(increase_c2_cb);
543+
544+
{
545+
test_msgs::msg::Empty msg;
546+
pub->publish(msg);
547+
}
548+
549+
start = std::chrono::steady_clock::now();
550+
do {
551+
std::this_thread::sleep_for(100ms);
552+
} while (c2 == 0 && std::chrono::steady_clock::now() - start < 10s);
553+
554+
EXPECT_EQ(c1.load(), 1u);
555+
EXPECT_EQ(c2.load(), 1u);
556+
557+
sub->clear_on_new_intra_process_message_callback();
558+
559+
{
560+
test_msgs::msg::Empty msg;
561+
pub->publish(msg);
562+
pub->publish(msg);
563+
pub->publish(msg);
564+
}
565+
566+
std::atomic<size_t> c3 {0};
567+
auto increase_c3_cb = [&c3](size_t count_msgs) {c3 += count_msgs;};
568+
sub->set_on_new_intra_process_message_callback(increase_c3_cb);
569+
570+
start = std::chrono::steady_clock::now();
571+
do {
572+
std::this_thread::sleep_for(100ms);
573+
} while (c3 == 0 && std::chrono::steady_clock::now() - start < 10s);
574+
575+
EXPECT_EQ(c1.load(), 1u);
576+
EXPECT_EQ(c2.load(), 1u);
577+
EXPECT_EQ(c3.load(), 3u);
578+
579+
std::function<void(size_t)> invalid_cb = nullptr;
580+
EXPECT_THROW(sub->set_on_new_intra_process_message_callback(invalid_cb), std::invalid_argument);
581+
}
582+
513583
/*
514584
Testing subscription with intraprocess enabled and invalid QoS
515585
*/

0 commit comments

Comments
 (0)