Skip to content

Commit 476238b

Browse files
committed
expose qos listener APIs from pub and sub; add unit-tests
Signed-off-by: Alberto Soragna <[email protected]>
1 parent 01c2111 commit 476238b

File tree

9 files changed

+230
-18
lines changed

9 files changed

+230
-18
lines changed

rclcpp/include/rclcpp/publisher_base.hpp

Lines changed: 73 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,8 @@
2323
#include <memory>
2424
#include <sstream>
2525
#include <string>
26+
#include <unordered_map>
27+
#include <utility>
2628
#include <vector>
2729

2830
#include "rcl/publisher.h"
@@ -109,9 +111,10 @@ class PublisherBase : public std::enable_shared_from_this<PublisherBase>
109111
get_publisher_handle() const;
110112

111113
/// Get all the QoS event handlers associated with this publisher.
112-
/** \return The vector of QoS event handlers. */
114+
/** \return The map of QoS event handlers. */
113115
RCLCPP_PUBLIC
114-
const std::vector<std::shared_ptr<rclcpp::QOSEventHandlerBase>> &
116+
const
117+
std::unordered_map<rcl_publisher_event_type_t, std::shared_ptr<rclcpp::QOSEventHandlerBase>> &
115118
get_event_handlers() const;
116119

117120
/// Get subscription count
@@ -193,6 +196,71 @@ class PublisherBase : public std::enable_shared_from_this<PublisherBase>
193196
uint64_t intra_process_publisher_id,
194197
IntraProcessManagerSharedPtr ipm);
195198

199+
/// Set a callback to be called when each new qos event instance occurs.
200+
/**
201+
* The callback receives a size_t which is the number of events that occurred
202+
* since the last time this callback was called.
203+
* Normally this is 1, but can be > 1 if events occurred before any
204+
* callback was set.
205+
*
206+
* Since this callback is called from the middleware, you should aim to make
207+
* it fast and not blocking.
208+
* If you need to do a lot of work or wait for some other event, you should
209+
* spin it off to another thread, otherwise you risk blocking the middleware.
210+
*
211+
* Calling it again will clear any previously set callback.
212+
*
213+
* An exception will be thrown if the callback is not callable.
214+
*
215+
* This function is thread-safe.
216+
*
217+
* If you want more information available in the callback, like the qos event
218+
* or other information, you may use a lambda with captures or std::bind.
219+
*
220+
* \sa rclcpp::QOSEventHandlerBase::set_on_ready_callback
221+
*
222+
* \param[in] callback functor to be called when a new event occurs
223+
* \param[in] event_type identifier for the qos event we want to attach the callback to
224+
*/
225+
void
226+
set_on_new_qos_event_callback(
227+
std::function<void(size_t)> callback,
228+
rcl_publisher_event_type_t event_type)
229+
{
230+
if (event_handlers_.count(event_type) == 0) {
231+
RCLCPP_WARN(
232+
rclcpp::get_logger("rclcpp"),
233+
"Calling set_on_new_qos_event_callback for non registered event_type");
234+
return;
235+
}
236+
237+
if (!callback) {
238+
throw std::invalid_argument(
239+
"The callback passed to set_on_new_qos_event_callback "
240+
"is not callable.");
241+
}
242+
243+
// The on_ready_callback signature has an extra `int` argument used to disambiguate between
244+
// possible different entities within a generic waitable.
245+
// We hide that detail to users of this method.
246+
std::function<void(size_t, int)> new_callback = std::bind(callback, std::placeholders::_1);
247+
event_handlers_[event_type]->set_on_ready_callback(new_callback);
248+
}
249+
250+
/// Unset the callback registered for new qos events, if any.
251+
void
252+
clear_on_new_qos_event_callback(rcl_publisher_event_type_t event_type)
253+
{
254+
if (event_handlers_.count(event_type) == 0) {
255+
RCLCPP_WARN(
256+
rclcpp::get_logger("rclcpp"),
257+
"Calling clear_on_new_qos_event_callback for non registered event_type");
258+
return;
259+
}
260+
261+
event_handlers_[event_type]->clear_on_ready_callback();
262+
}
263+
196264
protected:
197265
template<typename EventCallbackT>
198266
void
@@ -206,7 +274,7 @@ class PublisherBase : public std::enable_shared_from_this<PublisherBase>
206274
rcl_publisher_event_init,
207275
publisher_handle_,
208276
event_type);
209-
event_handlers_.emplace_back(handler);
277+
event_handlers_.insert(std::make_pair(event_type, handler));
210278
}
211279

212280
RCLCPP_PUBLIC
@@ -216,7 +284,8 @@ class PublisherBase : public std::enable_shared_from_this<PublisherBase>
216284

217285
std::shared_ptr<rcl_publisher_t> publisher_handle_;
218286

219-
std::vector<std::shared_ptr<rclcpp::QOSEventHandlerBase>> event_handlers_;
287+
std::unordered_map<rcl_publisher_event_type_t,
288+
std::shared_ptr<rclcpp::QOSEventHandlerBase>> event_handlers_;
220289

221290
using IntraProcessManagerWeakPtr =
222291
std::weak_ptr<rclcpp::experimental::IntraProcessManager>;

rclcpp/include/rclcpp/subscription_base.hpp

Lines changed: 71 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -98,9 +98,10 @@ class SubscriptionBase : public std::enable_shared_from_this<SubscriptionBase>
9898
get_subscription_handle() const;
9999

100100
/// Get all the QoS event handlers associated with this subscription.
101-
/** \return The vector of QoS event handlers. */
101+
/** \return The map of QoS event handlers. */
102102
RCLCPP_PUBLIC
103-
const std::vector<std::shared_ptr<rclcpp::QOSEventHandlerBase>> &
103+
const
104+
std::unordered_map<rcl_subscription_event_type_t, std::shared_ptr<rclcpp::QOSEventHandlerBase>> &
104105
get_event_handlers() const;
105106

106107
/// Get the actual QoS settings, after the defaults have been determined.
@@ -401,6 +402,71 @@ class SubscriptionBase : public std::enable_shared_from_this<SubscriptionBase>
401402
subscription_intra_process_->clear_on_ready_callback();
402403
}
403404

405+
/// Set a callback to be called when each new qos event instance occurs.
406+
/**
407+
* The callback receives a size_t which is the number of events that occurred
408+
* since the last time this callback was called.
409+
* Normally this is 1, but can be > 1 if events occurred before any
410+
* callback was set.
411+
*
412+
* Since this callback is called from the middleware, you should aim to make
413+
* it fast and not blocking.
414+
* If you need to do a lot of work or wait for some other event, you should
415+
* spin it off to another thread, otherwise you risk blocking the middleware.
416+
*
417+
* Calling it again will clear any previously set callback.
418+
*
419+
* An exception will be thrown if the callback is not callable.
420+
*
421+
* This function is thread-safe.
422+
*
423+
* If you want more information available in the callback, like the qos event
424+
* or other information, you may use a lambda with captures or std::bind.
425+
*
426+
* \sa rclcpp::QOSEventHandlerBase::set_on_ready_callback
427+
*
428+
* \param[in] callback functor to be called when a new event occurs
429+
* \param[in] event_type identifier for the qos event we want to attach the callback to
430+
*/
431+
void
432+
set_on_new_qos_event_callback(
433+
std::function<void(size_t)> callback,
434+
rcl_subscription_event_type_t event_type)
435+
{
436+
if (event_handlers_.count(event_type) == 0) {
437+
RCLCPP_WARN(
438+
rclcpp::get_logger("rclcpp"),
439+
"Calling set_on_new_qos_event_callback for non registered event_type");
440+
return;
441+
}
442+
443+
if (!callback) {
444+
throw std::invalid_argument(
445+
"The callback passed to set_on_new_qos_event_callback "
446+
"is not callable.");
447+
}
448+
449+
// The on_ready_callback signature has an extra `int` argument used to disambiguate between
450+
// possible different entities within a generic waitable.
451+
// We hide that detail to users of this method.
452+
std::function<void(size_t, int)> new_callback = std::bind(callback, std::placeholders::_1);
453+
event_handlers_[event_type]->set_on_ready_callback(new_callback);
454+
}
455+
456+
/// Unset the callback registered for new qos events, if any.
457+
void
458+
clear_on_new_qos_event_callback(rcl_subscription_event_type_t event_type)
459+
{
460+
if (event_handlers_.count(event_type) == 0) {
461+
RCLCPP_WARN(
462+
rclcpp::get_logger("rclcpp"),
463+
"Calling clear_on_new_qos_event_callback for non registered event_type");
464+
return;
465+
}
466+
467+
event_handlers_[event_type]->clear_on_ready_callback();
468+
}
469+
404470
protected:
405471
template<typename EventCallbackT>
406472
void
@@ -415,7 +481,7 @@ class SubscriptionBase : public std::enable_shared_from_this<SubscriptionBase>
415481
get_subscription_handle(),
416482
event_type);
417483
qos_events_in_use_by_wait_set_.insert(std::make_pair(handler.get(), false));
418-
event_handlers_.emplace_back(handler);
484+
event_handlers_.insert(std::make_pair(event_type, handler));
419485
}
420486

421487
RCLCPP_PUBLIC
@@ -436,7 +502,8 @@ class SubscriptionBase : public std::enable_shared_from_this<SubscriptionBase>
436502
std::shared_ptr<rcl_subscription_t> intra_process_subscription_handle_;
437503
rclcpp::Logger node_logger_;
438504

439-
std::vector<std::shared_ptr<rclcpp::QOSEventHandlerBase>> event_handlers_;
505+
std::unordered_map<rcl_subscription_event_type_t,
506+
std::shared_ptr<rclcpp::QOSEventHandlerBase>> event_handlers_;
440507

441508
bool use_intra_process_;
442509
IntraProcessManagerWeakPtr weak_ipm_;

rclcpp/include/rclcpp/wait_set_template.hpp

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -155,7 +155,8 @@ class WaitSetTemplate final : private SynchronizationPolicy, private StoragePoli
155155
this->storage_add_subscription(std::move(local_subscription));
156156
}
157157
if (mask.include_events) {
158-
for (auto event : inner_subscription->get_event_handlers()) {
158+
for (auto key_event_pair : inner_subscription->get_event_handlers()) {
159+
auto event = key_event_pair.second;
159160
auto local_subscription = inner_subscription;
160161
bool already_in_use =
161162
local_subscription->exchange_in_use_by_wait_set_state(event.get(), true);
@@ -225,7 +226,8 @@ class WaitSetTemplate final : private SynchronizationPolicy, private StoragePoli
225226
this->storage_remove_subscription(std::move(local_subscription));
226227
}
227228
if (mask.include_events) {
228-
for (auto event : inner_subscription->get_event_handlers()) {
229+
for (auto key_event_pair : inner_subscription->get_event_handlers()) {
230+
auto event = key_event_pair.second;
229231
auto local_subscription = inner_subscription;
230232
local_subscription->exchange_in_use_by_wait_set_state(event.get(), false);
231233
this->storage_remove_waitable(std::move(event));

rclcpp/src/rclcpp/node_interfaces/node_topics.cpp

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -55,7 +55,8 @@ NodeTopics::add_publisher(
5555
callback_group = node_base_->get_default_callback_group();
5656
}
5757

58-
for (auto & publisher_event : publisher->get_event_handlers()) {
58+
for (auto & key_event_pair : publisher->get_event_handlers()) {
59+
auto publisher_event = key_event_pair.second;
5960
callback_group->add_waitable(publisher_event);
6061
}
6162

@@ -97,7 +98,8 @@ NodeTopics::add_subscription(
9798

9899
callback_group->add_subscription(subscription);
99100

100-
for (auto & subscription_event : subscription->get_event_handlers()) {
101+
for (auto & key_event_pair : subscription->get_event_handlers()) {
102+
auto subscription_event = key_event_pair.second;
101103
callback_group->add_waitable(subscription_event);
102104
}
103105

rclcpp/src/rclcpp/publisher_base.cpp

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,7 @@
2222
#include <mutex>
2323
#include <sstream>
2424
#include <string>
25+
#include <unordered_map>
2526
#include <vector>
2627

2728
#include "rcutils/logging_macros.h"
@@ -153,7 +154,8 @@ PublisherBase::get_publisher_handle() const
153154
return publisher_handle_;
154155
}
155156

156-
const std::vector<std::shared_ptr<rclcpp::QOSEventHandlerBase>> &
157+
const
158+
std::unordered_map<rcl_publisher_event_type_t, std::shared_ptr<rclcpp::QOSEventHandlerBase>> &
157159
PublisherBase::get_event_handlers() const
158160
{
159161
return event_handlers_;

rclcpp/src/rclcpp/subscription_base.cpp

Lines changed: 5 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@
1717
#include <cstdio>
1818
#include <memory>
1919
#include <string>
20+
#include <unordered_map>
2021
#include <vector>
2122

2223
#include "rclcpp/exceptions.hpp"
@@ -116,7 +117,8 @@ SubscriptionBase::get_subscription_handle() const
116117
return subscription_handle_;
117118
}
118119

119-
const std::vector<std::shared_ptr<rclcpp::QOSEventHandlerBase>> &
120+
const
121+
std::unordered_map<rcl_subscription_event_type_t, std::shared_ptr<rclcpp::QOSEventHandlerBase>> &
120122
SubscriptionBase::get_event_handlers() const
121123
{
122124
return event_handlers_;
@@ -282,7 +284,8 @@ SubscriptionBase::exchange_in_use_by_wait_set_state(
282284
if (get_intra_process_waitable().get() == pointer_to_subscription_part) {
283285
return intra_process_subscription_waitable_in_use_by_wait_set_.exchange(in_use_state);
284286
}
285-
for (const auto & qos_event : event_handlers_) {
287+
for (const auto & key_event_pair : event_handlers_) {
288+
auto qos_event = key_event_pair.second;
286289
if (qos_event.get() == pointer_to_subscription_part) {
287290
return qos_events_in_use_by_wait_set_[qos_event.get()].exchange(in_use_state);
288291
}

rclcpp/test/rclcpp/test_publisher.cpp

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -501,7 +501,8 @@ TEST_F(TestPublisher, run_event_handlers) {
501501
initialize();
502502
auto publisher = node->create_publisher<test_msgs::msg::Empty>("topic", 10);
503503

504-
for (const auto & handler : publisher->get_event_handlers()) {
504+
for (const auto & key_event_pair : publisher->get_event_handlers()) {
505+
auto handler = key_event_pair.second;
505506
std::shared_ptr<void> data = handler->take_data();
506507
handler->execute(data);
507508
}

rclcpp/test/rclcpp/test_qos_event.cpp

Lines changed: 66 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -27,6 +27,8 @@
2727

2828
#include "../mocking_utils/patch.hpp"
2929

30+
using namespace std::chrono_literals;
31+
3032
class TestQosEvent : public ::testing::Test
3133
{
3234
protected:
@@ -325,3 +327,67 @@ TEST_F(TestQosEvent, add_to_wait_set) {
325327
EXPECT_THROW(handler.add_to_wait_set(&wait_set), rclcpp::exceptions::RCLError);
326328
}
327329
}
330+
331+
TEST_F(TestQosEvent, test_on_new_event_callback)
332+
{
333+
rclcpp::QoS qos_profile_publisher(10);
334+
qos_profile_publisher.durability(RMW_QOS_POLICY_DURABILITY_VOLATILE);
335+
auto publisher = node->create_publisher<test_msgs::msg::Empty>(
336+
topic_name, qos_profile_publisher);
337+
338+
std::atomic<size_t> c1 {0};
339+
auto increase_c1_cb = [&c1](size_t count_events) {c1 += count_events;};
340+
publisher->set_on_new_qos_event_callback(increase_c1_cb, RCL_PUBLISHER_OFFERED_INCOMPATIBLE_QOS);
341+
342+
rclcpp::QoS qos_profile_subscription(10);
343+
qos_profile_subscription.durability(RMW_QOS_POLICY_DURABILITY_TRANSIENT_LOCAL);
344+
auto subscription = node->create_subscription<test_msgs::msg::Empty>(
345+
topic_name, qos_profile_subscription, message_callback);
346+
347+
auto start = std::chrono::steady_clock::now();
348+
do {
349+
std::this_thread::sleep_for(100ms);
350+
} while (c1 == 0 && std::chrono::steady_clock::now() - start < 10s);
351+
352+
EXPECT_EQ(c1, 1u);
353+
354+
std::atomic<size_t> c2 {0};
355+
auto increase_c2_cb = [&c2](size_t count_events) {c2 += count_events;};
356+
subscription->set_on_new_qos_event_callback(
357+
increase_c2_cb,
358+
RCL_SUBSCRIPTION_REQUESTED_INCOMPATIBLE_QOS);
359+
360+
start = std::chrono::steady_clock::now();
361+
do {
362+
std::this_thread::sleep_for(100ms);
363+
} while (c2 == 0 && std::chrono::steady_clock::now() - start < 10s);
364+
365+
EXPECT_EQ(c1, 1u);
366+
EXPECT_EQ(c2, 1u);
367+
368+
auto publisher2 = node->create_publisher<test_msgs::msg::Empty>(
369+
topic_name, qos_profile_publisher);
370+
371+
start = std::chrono::steady_clock::now();
372+
do {
373+
std::this_thread::sleep_for(100ms);
374+
} while (c2 == 1u && std::chrono::steady_clock::now() - start < 10s);
375+
376+
EXPECT_EQ(c1, 1u);
377+
EXPECT_EQ(c2, 2u);
378+
379+
publisher->clear_on_new_qos_event_callback(RCL_PUBLISHER_OFFERED_INCOMPATIBLE_QOS);
380+
381+
auto subscription2 = node->create_subscription<test_msgs::msg::Empty>(
382+
topic_name, qos_profile_subscription, message_callback);
383+
384+
publisher->set_on_new_qos_event_callback(increase_c1_cb, RCL_PUBLISHER_OFFERED_INCOMPATIBLE_QOS);
385+
386+
start = std::chrono::steady_clock::now();
387+
do {
388+
std::this_thread::sleep_for(100ms);
389+
} while (c1 == 1 && std::chrono::steady_clock::now() - start < 10s);
390+
391+
EXPECT_EQ(c1, 2u);
392+
EXPECT_EQ(c2, 2u);
393+
}

0 commit comments

Comments
 (0)