Skip to content

Commit d4dbb64

Browse files
author
iRobot ROS
authored
Merge pull request #64 from alsora/asoragna/waitable-listeners
Complete support for rmw listener APIs
2 parents bb09b5c + 79dba0a commit d4dbb64

20 files changed

+883
-34
lines changed

rclcpp/include/rclcpp/client.hpp

Lines changed: 22 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@
1919
#include <future>
2020
#include <map>
2121
#include <memory>
22+
#include <mutex>
2223
#include <sstream>
2324
#include <string>
2425
#include <tuple>
@@ -165,6 +166,8 @@ class ClientBase
165166
*
166167
* Calling it again will clear any previously set callback.
167168
*
169+
* An exception will be thrown if the callback is not callable.
170+
*
168171
* This function is thread-safe.
169172
*
170173
* If you want more information available in the callback, like the client
@@ -178,6 +181,12 @@ class ClientBase
178181
void
179182
set_on_new_response_callback(std::function<void(size_t)> callback)
180183
{
184+
if (!callback) {
185+
throw std::invalid_argument(
186+
"The callback passed to set_on_new_response_callback "
187+
"is not callable.");
188+
}
189+
181190
auto new_callback =
182191
[callback, this](size_t number_of_responses) {
183192
try {
@@ -198,6 +207,8 @@ class ClientBase
198207
}
199208
};
200209

210+
std::lock_guard<std::recursive_mutex> lock(reentrant_mutex_);
211+
201212
// Set it temporarily to the new callback, while we replace the old one.
202213
// This two-step setting, prevents a gap where the old std::function has
203214
// been replaced but the middleware hasn't been told about the new one yet.
@@ -214,6 +225,15 @@ class ClientBase
214225
static_cast<const void *>(&on_new_response_callback_));
215226
}
216227

228+
/// Unset the callback registered for new responses, if any.
229+
void
230+
clear_on_new_response_callback()
231+
{
232+
std::lock_guard<std::recursive_mutex> lock(reentrant_mutex_);
233+
set_on_new_response_callback(nullptr, nullptr);
234+
on_new_response_callback_ = nullptr;
235+
}
236+
217237
protected:
218238
RCLCPP_DISABLE_COPY(ClientBase)
219239

@@ -242,7 +262,8 @@ class ClientBase
242262

243263
std::atomic<bool> in_use_by_wait_set_{false};
244264

245-
std::function<void(size_t)> on_new_response_callback_;
265+
std::recursive_mutex reentrant_mutex_;
266+
std::function<void(size_t)> on_new_response_callback_{nullptr};
246267
};
247268

248269
template<typename ServiceT>

rclcpp/include/rclcpp/experimental/subscription_intra_process.hpp

Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -148,13 +148,15 @@ class SubscriptionIntraProcess : public SubscriptionIntraProcessBase
148148
provide_intra_process_message(ConstMessageSharedPtr message)
149149
{
150150
buffer_->add_shared(std::move(message));
151+
invoke_callback();
151152
trigger_guard_condition();
152153
}
153154

154155
void
155156
provide_intra_process_message(MessageUniquePtr message)
156157
{
157158
buffer_->add_unique(std::move(message));
159+
invoke_callback();
158160
trigger_guard_condition();
159161
}
160162

@@ -165,6 +167,17 @@ class SubscriptionIntraProcess : public SubscriptionIntraProcessBase
165167
}
166168

167169
private:
170+
void
171+
invoke_callback()
172+
{
173+
std::lock_guard<std::recursive_mutex> lock(reentrant_mutex_);
174+
if (on_new_message_callback_) {
175+
on_new_message_callback_(1);
176+
} else {
177+
unread_count_++;
178+
}
179+
}
180+
168181
void
169182
trigger_guard_condition()
170183
{

rclcpp/include/rclcpp/experimental/subscription_intra_process_base.hpp

Lines changed: 87 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -15,16 +15,16 @@
1515
#ifndef RCLCPP__EXPERIMENTAL__SUBSCRIPTION_INTRA_PROCESS_BASE_HPP_
1616
#define RCLCPP__EXPERIMENTAL__SUBSCRIPTION_INTRA_PROCESS_BASE_HPP_
1717

18-
#include <rmw/rmw.h>
19-
20-
#include <functional>
18+
#include <algorithm>
2119
#include <memory>
2220
#include <mutex>
2321
#include <string>
2422
#include <utility>
2523

2624
#include "rcl/error_handling.h"
25+
#include "rmw/impl/cpp/demangle.hpp"
2726

27+
#include "rclcpp/logging.hpp"
2828
#include "rclcpp/type_support_decl.hpp"
2929
#include "rclcpp/waitable.hpp"
3030

@@ -38,6 +38,11 @@ class SubscriptionIntraProcessBase : public rclcpp::Waitable
3838
public:
3939
RCLCPP_SMART_PTR_ALIASES_ONLY(SubscriptionIntraProcessBase)
4040

41+
enum class EntityType
42+
{
43+
Subscription,
44+
};
45+
4146
RCLCPP_PUBLIC
4247
SubscriptionIntraProcessBase(const std::string & topic_name, rmw_qos_profile_t qos_profile)
4348
: topic_name_(topic_name), qos_profile_(qos_profile)
@@ -74,8 +79,87 @@ class SubscriptionIntraProcessBase : public rclcpp::Waitable
7479
rmw_qos_profile_t
7580
get_actual_qos() const;
7681

82+
/// Set a callback to be called when each new message arrives.
83+
/**
84+
* The callback receives a size_t which is the number of messages received
85+
* since the last time this callback was called.
86+
* Normally this is 1, but can be > 1 if messages were received before any
87+
* callback was set.
88+
*
89+
* The callback also receives an int identifier argument.
90+
* This is needed because a Waitable may be composed of several distinct entities,
91+
* such as subscriptions, services, etc.
92+
* The application should provide a generic callback function that will be then
93+
* forwarded by the waitable to all of its entities.
94+
* Before forwarding, a different value for the identifier argument will be
95+
* bond to the function.
96+
* This implies that the provided callback can use the identifier to behave
97+
* differently depending on which entity triggered the waitable to become ready.
98+
*
99+
* Calling it again will clear any previously set callback.
100+
*
101+
* An exception will be thrown if the callback is not callable.
102+
*
103+
* This function is thread-safe.
104+
*
105+
* If you want more information available in the callback, like the subscription
106+
* or other information, you may use a lambda with captures or std::bind.
107+
*
108+
* \param[in] callback functor to be called when a new message is received.
109+
*/
110+
void
111+
set_on_ready_callback(std::function<void(size_t, int)> callback) override
112+
{
113+
if (!callback) {
114+
throw std::invalid_argument(
115+
"The callback passed to set_on_ready_callback "
116+
"is not callable.");
117+
}
118+
119+
// Note: we bind the int identifier argument to this waitable's entity types
120+
auto new_callback =
121+
[callback, this](size_t number_of_events) {
122+
try {
123+
callback(number_of_events, static_cast<int>(EntityType::Subscription));
124+
} catch (const std::exception & exception) {
125+
RCLCPP_ERROR_STREAM(
126+
// TODO(wjwwood): get this class access to the node logger it is associated with
127+
rclcpp::get_logger("rclcpp"),
128+
"rclcpp::SubscriptionIntraProcessBase@" << this <<
129+
" caught " << rmw::impl::cpp::demangle(exception) <<
130+
" exception in user-provided callback for the 'on ready' callback: " <<
131+
exception.what());
132+
} catch (...) {
133+
RCLCPP_ERROR_STREAM(
134+
rclcpp::get_logger("rclcpp"),
135+
"rclcpp::SubscriptionIntraProcessBase@" << this <<
136+
" caught unhandled exception in user-provided callback " <<
137+
"for the 'on ready' callback");
138+
}
139+
};
140+
141+
std::lock_guard<std::recursive_mutex> lock(reentrant_mutex_);
142+
on_new_message_callback_ = new_callback;
143+
144+
if (unread_count_ > 0) {
145+
// Use qos profile depth as upper bound for unread_count_
146+
on_new_message_callback_(std::min(unread_count_, qos_profile_.depth));
147+
unread_count_ = 0;
148+
}
149+
}
150+
151+
/// Unset the callback registered for new messages, if any.
152+
void
153+
clear_on_ready_callback() override
154+
{
155+
std::lock_guard<std::recursive_mutex> lock(reentrant_mutex_);
156+
on_new_message_callback_ = nullptr;
157+
}
158+
77159
protected:
78160
std::recursive_mutex reentrant_mutex_;
161+
std::function<void(size_t)> on_new_message_callback_ {nullptr};
162+
size_t unread_count_{0};
79163
rcl_guard_condition_t gc_;
80164

81165
private:

rclcpp/include/rclcpp/publisher_base.hpp

Lines changed: 73 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,8 @@
2323
#include <memory>
2424
#include <sstream>
2525
#include <string>
26+
#include <unordered_map>
27+
#include <utility>
2628
#include <vector>
2729

2830
#include "rcl/publisher.h"
@@ -109,9 +111,10 @@ class PublisherBase : public std::enable_shared_from_this<PublisherBase>
109111
get_publisher_handle() const;
110112

111113
/// Get all the QoS event handlers associated with this publisher.
112-
/** \return The vector of QoS event handlers. */
114+
/** \return The map of QoS event handlers. */
113115
RCLCPP_PUBLIC
114-
const std::vector<std::shared_ptr<rclcpp::QOSEventHandlerBase>> &
116+
const
117+
std::unordered_map<rcl_publisher_event_type_t, std::shared_ptr<rclcpp::QOSEventHandlerBase>> &
115118
get_event_handlers() const;
116119

117120
/// Get subscription count
@@ -193,6 +196,71 @@ class PublisherBase : public std::enable_shared_from_this<PublisherBase>
193196
uint64_t intra_process_publisher_id,
194197
IntraProcessManagerSharedPtr ipm);
195198

199+
/// Set a callback to be called when each new qos event instance occurs.
200+
/**
201+
* The callback receives a size_t which is the number of events that occurred
202+
* since the last time this callback was called.
203+
* Normally this is 1, but can be > 1 if events occurred before any
204+
* callback was set.
205+
*
206+
* Since this callback is called from the middleware, you should aim to make
207+
* it fast and not blocking.
208+
* If you need to do a lot of work or wait for some other event, you should
209+
* spin it off to another thread, otherwise you risk blocking the middleware.
210+
*
211+
* Calling it again will clear any previously set callback.
212+
*
213+
* An exception will be thrown if the callback is not callable.
214+
*
215+
* This function is thread-safe.
216+
*
217+
* If you want more information available in the callback, like the qos event
218+
* or other information, you may use a lambda with captures or std::bind.
219+
*
220+
* \sa rclcpp::QOSEventHandlerBase::set_on_ready_callback
221+
*
222+
* \param[in] callback functor to be called when a new event occurs
223+
* \param[in] event_type identifier for the qos event we want to attach the callback to
224+
*/
225+
void
226+
set_on_new_qos_event_callback(
227+
std::function<void(size_t)> callback,
228+
rcl_publisher_event_type_t event_type)
229+
{
230+
if (event_handlers_.count(event_type) == 0) {
231+
RCLCPP_WARN(
232+
rclcpp::get_logger("rclcpp"),
233+
"Calling set_on_new_qos_event_callback for non registered event_type");
234+
return;
235+
}
236+
237+
if (!callback) {
238+
throw std::invalid_argument(
239+
"The callback passed to set_on_new_qos_event_callback "
240+
"is not callable.");
241+
}
242+
243+
// The on_ready_callback signature has an extra `int` argument used to disambiguate between
244+
// possible different entities within a generic waitable.
245+
// We hide that detail to users of this method.
246+
std::function<void(size_t, int)> new_callback = std::bind(callback, std::placeholders::_1);
247+
event_handlers_[event_type]->set_on_ready_callback(new_callback);
248+
}
249+
250+
/// Unset the callback registered for new qos events, if any.
251+
void
252+
clear_on_new_qos_event_callback(rcl_publisher_event_type_t event_type)
253+
{
254+
if (event_handlers_.count(event_type) == 0) {
255+
RCLCPP_WARN(
256+
rclcpp::get_logger("rclcpp"),
257+
"Calling clear_on_new_qos_event_callback for non registered event_type");
258+
return;
259+
}
260+
261+
event_handlers_[event_type]->clear_on_ready_callback();
262+
}
263+
196264
protected:
197265
template<typename EventCallbackT>
198266
void
@@ -206,7 +274,7 @@ class PublisherBase : public std::enable_shared_from_this<PublisherBase>
206274
rcl_publisher_event_init,
207275
publisher_handle_,
208276
event_type);
209-
event_handlers_.emplace_back(handler);
277+
event_handlers_.insert(std::make_pair(event_type, handler));
210278
}
211279

212280
RCLCPP_PUBLIC
@@ -216,7 +284,8 @@ class PublisherBase : public std::enable_shared_from_this<PublisherBase>
216284

217285
std::shared_ptr<rcl_publisher_t> publisher_handle_;
218286

219-
std::vector<std::shared_ptr<rclcpp::QOSEventHandlerBase>> event_handlers_;
287+
std::unordered_map<rcl_publisher_event_type_t,
288+
std::shared_ptr<rclcpp::QOSEventHandlerBase>> event_handlers_;
220289

221290
using IntraProcessManagerWeakPtr =
222291
std::weak_ptr<rclcpp::experimental::IntraProcessManager>;

0 commit comments

Comments
 (0)