Skip to content

Commit cb08c79

Browse files
Implement matched event (#2105)
Signed-off-by: Barry Xu <[email protected]>
1 parent bff5992 commit cb08c79

File tree

5 files changed

+221
-10
lines changed

5 files changed

+221
-10
lines changed

rclcpp/include/rclcpp/event_handler.hpp

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -47,6 +47,7 @@ using QOSOfferedIncompatibleQoSInfo = rmw_offered_qos_incompatible_event_status_
4747
using QOSRequestedIncompatibleQoSInfo = rmw_requested_qos_incompatible_event_status_t;
4848

4949
using IncompatibleTypeInfo = rmw_incompatible_type_status_t;
50+
using MatchedInfo = rmw_matched_status_t;
5051

5152
using QOSDeadlineRequestedCallbackType = std::function<void (QOSDeadlineRequestedInfo &)>;
5253
using QOSDeadlineOfferedCallbackType = std::function<void (QOSDeadlineOfferedInfo &)>;
@@ -58,6 +59,8 @@ using QOSRequestedIncompatibleQoSCallbackType =
5859
std::function<void (QOSRequestedIncompatibleQoSInfo &)>;
5960

6061
using IncompatibleTypeCallbackType = std::function<void (IncompatibleTypeInfo &)>;
62+
using PublisherMatchedCallbackType = std::function<void (MatchedInfo &)>;
63+
using SubscriptionMatchedCallbackType = std::function<void (MatchedInfo &)>;
6164

6265
/// Contains callbacks for various types of events a Publisher can receive from the middleware.
6366
struct PublisherEventCallbacks
@@ -66,6 +69,7 @@ struct PublisherEventCallbacks
6669
QOSLivelinessLostCallbackType liveliness_callback;
6770
QOSOfferedIncompatibleQoSCallbackType incompatible_qos_callback;
6871
IncompatibleTypeCallbackType incompatible_type_callback;
72+
PublisherMatchedCallbackType matched_callback;
6973
};
7074

7175
/// Contains callbacks for non-message events that a Subscription can receive from the middleware.
@@ -76,6 +80,7 @@ struct SubscriptionEventCallbacks
7680
QOSRequestedIncompatibleQoSCallbackType incompatible_qos_callback;
7781
QOSMessageLostCallbackType message_lost_callback;
7882
IncompatibleTypeCallbackType incompatible_type_callback;
83+
SubscriptionMatchedCallbackType matched_callback;
7984
};
8085

8186
class UnsupportedEventTypeException : public exceptions::RCLErrorBase, public std::runtime_error

rclcpp/src/rclcpp/publisher_base.cpp

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -183,6 +183,11 @@ PublisherBase::bind_event_callbacks(
183183
rclcpp::get_logger("rclcpp"),
184184
"Failed to add event handler for incompatible type; wrong callback type");
185185
}
186+
if (event_callbacks.matched_callback) {
187+
this->add_event_handler(
188+
event_callbacks.matched_callback,
189+
RCL_PUBLISHER_MATCHED);
190+
}
186191
}
187192

188193
size_t

rclcpp/src/rclcpp/subscription_base.cpp

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -161,6 +161,11 @@ SubscriptionBase::bind_event_callbacks(
161161
event_callbacks.message_lost_callback,
162162
RCL_SUBSCRIPTION_MESSAGE_LOST);
163163
}
164+
if (event_callbacks.matched_callback) {
165+
this->add_event_handler(
166+
event_callbacks.matched_callback,
167+
RCL_SUBSCRIPTION_MATCHED);
168+
}
164169
}
165170

166171
const char *

rclcpp/test/rclcpp/CMakeLists.txt

Lines changed: 16 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -444,17 +444,23 @@ function(test_generic_pubsub_for_rmw_implementation)
444444
endif()
445445
endfunction()
446446
call_for_each_rmw_implementation(test_generic_pubsub_for_rmw_implementation)
447-
ament_add_gtest(test_qos_event test_qos_event.cpp)
448-
if(TARGET test_qos_event)
449-
ament_target_dependencies(test_qos_event
450-
"rmw"
451-
"test_msgs"
452-
)
453-
target_link_libraries(test_qos_event
454-
${PROJECT_NAME}
455-
mimick
447+
448+
function(test_qos_event_for_rmw_implementation)
449+
set(rmw_implementation_env_var RMW_IMPLEMENTATION=${rmw_implementation})
450+
ament_add_gmock(test_qos_event${target_suffix} test_qos_event.cpp
451+
ENV ${rmw_implementation_env_var}
456452
)
457-
endif()
453+
if(TARGET test_qos_event${target_suffix})
454+
target_link_libraries(test_qos_event${target_suffix} ${PROJECT_NAME} mimick)
455+
ament_target_dependencies(test_qos_event${target_suffix}
456+
"rmw"
457+
"rosidl_typesupport_cpp"
458+
"test_msgs"
459+
)
460+
endif()
461+
endfunction()
462+
call_for_each_rmw_implementation(test_qos_event_for_rmw_implementation)
463+
458464
ament_add_gmock(test_qos_overriding_options test_qos_overriding_options.cpp)
459465
if(TARGET test_qos_overriding_options)
460466
target_link_libraries(test_qos_overriding_options

rclcpp/test/rclcpp/test_qos_event.cpp

Lines changed: 190 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,7 @@
1414

1515
#include <gtest/gtest.h>
1616

17+
#include <atomic>
1718
#include <chrono>
1819
#include <functional>
1920
#include <future>
@@ -313,6 +314,11 @@ TEST_F(TestQosEvent, add_to_wait_set) {
313314

314315
TEST_F(TestQosEvent, test_on_new_event_callback)
315316
{
317+
// rmw_connextdds doesn't support rmw_event_set_callback() interface
318+
if (std::string(rmw_get_implementation_identifier()).find("rmw_connextdds") == 0) {
319+
GTEST_SKIP();
320+
}
321+
316322
auto offered_deadline = rclcpp::Duration(std::chrono::milliseconds(1));
317323
auto requested_deadline = rclcpp::Duration(std::chrono::milliseconds(2));
318324

@@ -354,6 +360,11 @@ TEST_F(TestQosEvent, test_on_new_event_callback)
354360

355361
TEST_F(TestQosEvent, test_invalid_on_new_event_callback)
356362
{
363+
// rmw_connextdds doesn't support rmw_event_set_callback() interface
364+
if (std::string(rmw_get_implementation_identifier()).find("rmw_connextdds") == 0) {
365+
GTEST_SKIP();
366+
}
367+
357368
auto pub = node->create_publisher<test_msgs::msg::Empty>(topic_name, 10);
358369
auto sub = node->create_subscription<test_msgs::msg::Empty>(topic_name, 10, message_callback);
359370
auto dummy_cb = [](size_t count_events) {(void)count_events;};
@@ -376,6 +387,12 @@ TEST_F(TestQosEvent, test_invalid_on_new_event_callback)
376387
EXPECT_NO_THROW(
377388
pub->clear_on_new_qos_event_callback(RCL_PUBLISHER_OFFERED_INCOMPATIBLE_QOS));
378389

390+
EXPECT_NO_THROW(
391+
pub->set_on_new_qos_event_callback(dummy_cb, RCL_PUBLISHER_MATCHED));
392+
393+
EXPECT_NO_THROW(
394+
pub->clear_on_new_qos_event_callback(RCL_PUBLISHER_MATCHED));
395+
379396
EXPECT_NO_THROW(
380397
sub->set_on_new_qos_event_callback(dummy_cb, RCL_SUBSCRIPTION_REQUESTED_DEADLINE_MISSED));
381398

@@ -394,6 +411,12 @@ TEST_F(TestQosEvent, test_invalid_on_new_event_callback)
394411
EXPECT_NO_THROW(
395412
sub->clear_on_new_qos_event_callback(RCL_SUBSCRIPTION_REQUESTED_INCOMPATIBLE_QOS));
396413

414+
EXPECT_NO_THROW(
415+
sub->set_on_new_qos_event_callback(dummy_cb, RCL_SUBSCRIPTION_MATCHED));
416+
417+
EXPECT_NO_THROW(
418+
sub->clear_on_new_qos_event_callback(RCL_SUBSCRIPTION_MATCHED));
419+
397420
std::function<void(size_t)> invalid_cb;
398421

399422
rclcpp::SubscriptionOptions sub_options;
@@ -413,3 +436,170 @@ TEST_F(TestQosEvent, test_invalid_on_new_event_callback)
413436
pub->set_on_new_qos_event_callback(invalid_cb, RCL_PUBLISHER_OFFERED_DEADLINE_MISSED),
414437
std::invalid_argument);
415438
}
439+
440+
TEST_F(TestQosEvent, test_pub_matched_event_by_set_event_callback)
441+
{
442+
// rmw_connextdds doesn't support rmw_event_set_callback() interface
443+
if (std::string(rmw_get_implementation_identifier()).find("rmw_connextdds") == 0) {
444+
GTEST_SKIP();
445+
}
446+
447+
std::atomic_size_t matched_count = 0;
448+
449+
rclcpp::PublisherOptions pub_options;
450+
pub_options.event_callbacks.matched_callback = [](auto) {};
451+
auto pub = node->create_publisher<test_msgs::msg::Empty>(
452+
topic_name, 10, pub_options);
453+
454+
auto matched_event_callback = [&matched_count](size_t count) {
455+
matched_count += count;
456+
};
457+
458+
pub->set_on_new_qos_event_callback(matched_event_callback, RCL_PUBLISHER_MATCHED);
459+
460+
rclcpp::executors::SingleThreadedExecutor ex;
461+
ex.add_node(node->get_node_base_interface());
462+
463+
const auto timeout = std::chrono::milliseconds(200);
464+
465+
{
466+
auto sub1 = node->create_subscription<test_msgs::msg::Empty>(topic_name, 10, message_callback);
467+
ex.spin_some(timeout);
468+
EXPECT_EQ(matched_count, static_cast<size_t>(1));
469+
470+
{
471+
auto sub2 = node->create_subscription<test_msgs::msg::Empty>(
472+
topic_name, 10, message_callback);
473+
ex.spin_some(timeout);
474+
EXPECT_EQ(matched_count, static_cast<size_t>(2));
475+
}
476+
ex.spin_some(timeout);
477+
EXPECT_EQ(matched_count, static_cast<size_t>(3));
478+
}
479+
480+
ex.spin_some(timeout);
481+
EXPECT_EQ(matched_count, static_cast<size_t>(4));
482+
}
483+
484+
TEST_F(TestQosEvent, test_sub_matched_event_by_set_event_callback)
485+
{
486+
// rmw_connextdds doesn't support rmw_event_set_callback() interface
487+
if (std::string(rmw_get_implementation_identifier()).find("rmw_connextdds") == 0) {
488+
GTEST_SKIP();
489+
}
490+
491+
std::atomic_size_t matched_count = 0;
492+
493+
rclcpp::SubscriptionOptions sub_options;
494+
sub_options.event_callbacks.matched_callback = [](auto) {};
495+
auto sub = node->create_subscription<test_msgs::msg::Empty>(
496+
topic_name, 10, message_callback, sub_options);
497+
498+
auto matched_event_callback = [&matched_count](size_t count) {
499+
matched_count += count;
500+
};
501+
502+
sub->set_on_new_qos_event_callback(matched_event_callback, RCL_SUBSCRIPTION_MATCHED);
503+
504+
rclcpp::executors::SingleThreadedExecutor ex;
505+
ex.add_node(node->get_node_base_interface());
506+
507+
const auto timeout = std::chrono::milliseconds(200);
508+
509+
{
510+
auto pub1 = node->create_publisher<test_msgs::msg::Empty>(topic_name, 10);
511+
512+
ex.spin_some(timeout);
513+
EXPECT_EQ(matched_count, static_cast<size_t>(1));
514+
515+
{
516+
auto pub2 = node->create_publisher<test_msgs::msg::Empty>(topic_name, 10);
517+
ex.spin_some(timeout);
518+
EXPECT_EQ(matched_count, static_cast<size_t>(2));
519+
}
520+
521+
ex.spin_some(timeout);
522+
EXPECT_EQ(matched_count, static_cast<size_t>(3));
523+
}
524+
525+
ex.spin_some(timeout);
526+
EXPECT_EQ(matched_count, static_cast<size_t>(4));
527+
}
528+
529+
TEST_F(TestQosEvent, test_pub_matched_event_by_option_event_callback)
530+
{
531+
rmw_matched_status_t matched_expected_result;
532+
533+
rclcpp::PublisherOptions pub_options;
534+
pub_options.event_callbacks.matched_callback =
535+
[&matched_expected_result](rmw_matched_status_t & s) {
536+
EXPECT_EQ(s.total_count, matched_expected_result.total_count);
537+
EXPECT_EQ(s.total_count_change, matched_expected_result.total_count_change);
538+
EXPECT_EQ(s.current_count, matched_expected_result.current_count);
539+
EXPECT_EQ(s.current_count_change, matched_expected_result.current_count_change);
540+
};
541+
542+
auto pub = node->create_publisher<test_msgs::msg::Empty>(
543+
topic_name, 10, pub_options);
544+
545+
rclcpp::executors::SingleThreadedExecutor ex;
546+
ex.add_node(node->get_node_base_interface());
547+
548+
// Create a connected subscription
549+
matched_expected_result.total_count = 1;
550+
matched_expected_result.total_count_change = 1;
551+
matched_expected_result.current_count = 1;
552+
matched_expected_result.current_count_change = 1;
553+
554+
const auto timeout = std::chrono::milliseconds(200);
555+
556+
{
557+
auto sub = node->create_subscription<test_msgs::msg::Empty>(topic_name, 10, message_callback);
558+
ex.spin_some(timeout);
559+
560+
// destroy a connected subscription
561+
matched_expected_result.total_count = 1;
562+
matched_expected_result.total_count_change = 0;
563+
matched_expected_result.current_count = 0;
564+
matched_expected_result.current_count_change = -1;
565+
}
566+
ex.spin_some(timeout);
567+
}
568+
569+
TEST_F(TestQosEvent, test_sub_matched_event_by_option_event_callback)
570+
{
571+
rmw_matched_status_t matched_expected_result;
572+
573+
rclcpp::SubscriptionOptions sub_options;
574+
sub_options.event_callbacks.matched_callback =
575+
[&matched_expected_result](rmw_matched_status_t & s) {
576+
EXPECT_EQ(s.total_count, matched_expected_result.total_count);
577+
EXPECT_EQ(s.total_count_change, matched_expected_result.total_count_change);
578+
EXPECT_EQ(s.current_count, matched_expected_result.current_count);
579+
EXPECT_EQ(s.current_count_change, matched_expected_result.current_count_change);
580+
};
581+
auto sub = node->create_subscription<test_msgs::msg::Empty>(
582+
topic_name, 10, message_callback, sub_options);
583+
584+
rclcpp::executors::SingleThreadedExecutor ex;
585+
ex.add_node(node->get_node_base_interface());
586+
587+
// Create a connected publisher
588+
matched_expected_result.total_count = 1;
589+
matched_expected_result.total_count_change = 1;
590+
matched_expected_result.current_count = 1;
591+
matched_expected_result.current_count_change = 1;
592+
593+
const auto timeout = std::chrono::milliseconds(200);
594+
{
595+
auto pub1 = node->create_publisher<test_msgs::msg::Empty>(topic_name, 10);
596+
ex.spin_some(timeout);
597+
598+
// destroy a connected publisher
599+
matched_expected_result.total_count = 1;
600+
matched_expected_result.total_count_change = 0;
601+
matched_expected_result.current_count = 0;
602+
matched_expected_result.current_count_change = -1;
603+
}
604+
ex.spin_some(timeout);
605+
}

0 commit comments

Comments
 (0)