Skip to content

Commit 6c4afb3

Browse files
authored
Hook up the incompatible type event inside of rclcpp (#2069)
* Rename QOSEventHandler* to EventHandler. We are going to be using it for more than just QOS events, so rename it to just EventHandler and EventHandlerBase for now. Leave the old names in place but deprecated. * Rename qos_event.hpp -> event_handler.hpp * Revamp incompatible_qos callback setting. * Add in incompatible type implementation. Signed-off-by: Chris Lalancette <[email protected]>
1 parent b6a803f commit 6c4afb3

13 files changed

+437
-327
lines changed

rclcpp/CMakeLists.txt

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -93,7 +93,7 @@ set(${PROJECT_NAME}_SRCS
9393
src/rclcpp/parameter_value.cpp
9494
src/rclcpp/publisher_base.cpp
9595
src/rclcpp/qos.cpp
96-
src/rclcpp/qos_event.cpp
96+
src/rclcpp/event_handler.cpp
9797
src/rclcpp/qos_overriding_options.cpp
9898
src/rclcpp/serialization.cpp
9999
src/rclcpp/serialized_message.cpp
Lines changed: 306 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,306 @@
1+
// Copyright 2019 Open Source Robotics Foundation, Inc.
2+
//
3+
// Licensed under the Apache License, Version 2.0 (the "License");
4+
// you may not use this file except in compliance with the License.
5+
// You may obtain a copy of the License at
6+
//
7+
// http://www.apache.org/licenses/LICENSE-2.0
8+
//
9+
// Unless required by applicable law or agreed to in writing, software
10+
// distributed under the License is distributed on an "AS IS" BASIS,
11+
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12+
// See the License for the specific language governing permissions and
13+
// limitations under the License.
14+
15+
#ifndef RCLCPP__EVENT_HANDLER_HPP_
16+
#define RCLCPP__EVENT_HANDLER_HPP_
17+
18+
#include <functional>
19+
#include <memory>
20+
#include <mutex>
21+
#include <stdexcept>
22+
#include <string>
23+
24+
#include "rcl/error_handling.h"
25+
#include "rcl/event_callback.h"
26+
#include "rmw/impl/cpp/demangle.hpp"
27+
#include "rmw/incompatible_qos_events_statuses.h"
28+
#include "rmw/events_statuses/incompatible_type.h"
29+
30+
#include "rcutils/logging_macros.h"
31+
32+
#include "rclcpp/detail/cpp_callback_trampoline.hpp"
33+
#include "rclcpp/exceptions.hpp"
34+
#include "rclcpp/function_traits.hpp"
35+
#include "rclcpp/logging.hpp"
36+
#include "rclcpp/waitable.hpp"
37+
38+
namespace rclcpp
39+
{
40+
41+
using QOSDeadlineRequestedInfo = rmw_requested_deadline_missed_status_t;
42+
using QOSDeadlineOfferedInfo = rmw_offered_deadline_missed_status_t;
43+
using QOSLivelinessChangedInfo = rmw_liveliness_changed_status_t;
44+
using QOSLivelinessLostInfo = rmw_liveliness_lost_status_t;
45+
using QOSMessageLostInfo = rmw_message_lost_status_t;
46+
using QOSOfferedIncompatibleQoSInfo = rmw_offered_qos_incompatible_event_status_t;
47+
using QOSRequestedIncompatibleQoSInfo = rmw_requested_qos_incompatible_event_status_t;
48+
49+
using IncompatibleTypeInfo = rmw_incompatible_type_status_t;
50+
51+
using QOSDeadlineRequestedCallbackType = std::function<void (QOSDeadlineRequestedInfo &)>;
52+
using QOSDeadlineOfferedCallbackType = std::function<void (QOSDeadlineOfferedInfo &)>;
53+
using QOSLivelinessChangedCallbackType = std::function<void (QOSLivelinessChangedInfo &)>;
54+
using QOSLivelinessLostCallbackType = std::function<void (QOSLivelinessLostInfo &)>;
55+
using QOSMessageLostCallbackType = std::function<void (QOSMessageLostInfo &)>;
56+
using QOSOfferedIncompatibleQoSCallbackType = std::function<void (QOSOfferedIncompatibleQoSInfo &)>;
57+
using QOSRequestedIncompatibleQoSCallbackType =
58+
std::function<void (QOSRequestedIncompatibleQoSInfo &)>;
59+
60+
using IncompatibleTypeCallbackType = std::function<void (IncompatibleTypeInfo &)>;
61+
62+
/// Contains callbacks for various types of events a Publisher can receive from the middleware.
63+
struct PublisherEventCallbacks
64+
{
65+
QOSDeadlineOfferedCallbackType deadline_callback;
66+
QOSLivelinessLostCallbackType liveliness_callback;
67+
QOSOfferedIncompatibleQoSCallbackType incompatible_qos_callback;
68+
IncompatibleTypeCallbackType incompatible_type_callback;
69+
};
70+
71+
/// Contains callbacks for non-message events that a Subscription can receive from the middleware.
72+
struct SubscriptionEventCallbacks
73+
{
74+
QOSDeadlineRequestedCallbackType deadline_callback;
75+
QOSLivelinessChangedCallbackType liveliness_callback;
76+
QOSRequestedIncompatibleQoSCallbackType incompatible_qos_callback;
77+
QOSMessageLostCallbackType message_lost_callback;
78+
IncompatibleTypeCallbackType incompatible_type_callback;
79+
};
80+
81+
class UnsupportedEventTypeException : public exceptions::RCLErrorBase, public std::runtime_error
82+
{
83+
public:
84+
RCLCPP_PUBLIC
85+
UnsupportedEventTypeException(
86+
rcl_ret_t ret,
87+
const rcl_error_state_t * error_state,
88+
const std::string & prefix);
89+
90+
RCLCPP_PUBLIC
91+
UnsupportedEventTypeException(
92+
const exceptions::RCLErrorBase & base_exc,
93+
const std::string & prefix);
94+
};
95+
96+
class EventHandlerBase : public Waitable
97+
{
98+
public:
99+
enum class EntityType : std::size_t
100+
{
101+
Event,
102+
};
103+
104+
RCLCPP_PUBLIC
105+
virtual ~EventHandlerBase();
106+
107+
/// Get the number of ready events
108+
RCLCPP_PUBLIC
109+
size_t
110+
get_number_of_ready_events() override;
111+
112+
/// Add the Waitable to a wait set.
113+
RCLCPP_PUBLIC
114+
void
115+
add_to_wait_set(rcl_wait_set_t * wait_set) override;
116+
117+
/// Check if the Waitable is ready.
118+
RCLCPP_PUBLIC
119+
bool
120+
is_ready(rcl_wait_set_t * wait_set) override;
121+
122+
/// Set a callback to be called when each new event instance occurs.
123+
/**
124+
* The callback receives a size_t which is the number of events that occurred
125+
* since the last time this callback was called.
126+
* Normally this is 1, but can be > 1 if events occurred before any
127+
* callback was set.
128+
*
129+
* The callback also receives an int identifier argument.
130+
* This is needed because a Waitable may be composed of several distinct entities,
131+
* such as subscriptions, services, etc.
132+
* The application should provide a generic callback function that will be then
133+
* forwarded by the waitable to all of its entities.
134+
* Before forwarding, a different value for the identifier argument will be
135+
* bond to the function.
136+
* This implies that the provided callback can use the identifier to behave
137+
* differently depending on which entity triggered the waitable to become ready.
138+
*
139+
* Since this callback is called from the middleware, you should aim to make
140+
* it fast and not blocking.
141+
* If you need to do a lot of work or wait for some other event, you should
142+
* spin it off to another thread, otherwise you risk blocking the middleware.
143+
*
144+
* Calling it again will clear any previously set callback.
145+
*
146+
* An exception will be thrown if the callback is not callable.
147+
*
148+
* This function is thread-safe.
149+
*
150+
* If you want more information available in the callback, like the qos event
151+
* or other information, you may use a lambda with captures or std::bind.
152+
*
153+
* \sa rmw_event_set_callback
154+
* \sa rcl_event_set_callback
155+
*
156+
* \param[in] callback functor to be called when a new event occurs
157+
*/
158+
void
159+
set_on_ready_callback(std::function<void(size_t, int)> callback) override
160+
{
161+
if (!callback) {
162+
throw std::invalid_argument(
163+
"The callback passed to set_on_ready_callback "
164+
"is not callable.");
165+
}
166+
167+
// Note: we bind the int identifier argument to this waitable's entity types
168+
auto new_callback =
169+
[callback, this](size_t number_of_events) {
170+
try {
171+
callback(number_of_events, static_cast<int>(EntityType::Event));
172+
} catch (const std::exception & exception) {
173+
RCLCPP_ERROR_STREAM(
174+
// TODO(wjwwood): get this class access to the node logger it is associated with
175+
rclcpp::get_logger("rclcpp"),
176+
"rclcpp::EventHandlerBase@" << this <<
177+
" caught " << rmw::impl::cpp::demangle(exception) <<
178+
" exception in user-provided callback for the 'on ready' callback: " <<
179+
exception.what());
180+
} catch (...) {
181+
RCLCPP_ERROR_STREAM(
182+
rclcpp::get_logger("rclcpp"),
183+
"rclcpp::EventHandlerBase@" << this <<
184+
" caught unhandled exception in user-provided callback " <<
185+
"for the 'on ready' callback");
186+
}
187+
};
188+
189+
std::lock_guard<std::recursive_mutex> lock(callback_mutex_);
190+
191+
// Set it temporarily to the new callback, while we replace the old one.
192+
// This two-step setting, prevents a gap where the old std::function has
193+
// been replaced but the middleware hasn't been told about the new one yet.
194+
set_on_new_event_callback(
195+
rclcpp::detail::cpp_callback_trampoline<decltype(new_callback), const void *, size_t>,
196+
static_cast<const void *>(&new_callback));
197+
198+
// Store the std::function to keep it in scope, also overwrites the existing one.
199+
on_new_event_callback_ = new_callback;
200+
201+
// Set it again, now using the permanent storage.
202+
set_on_new_event_callback(
203+
rclcpp::detail::cpp_callback_trampoline<
204+
decltype(on_new_event_callback_), const void *, size_t>,
205+
static_cast<const void *>(&on_new_event_callback_));
206+
}
207+
208+
/// Unset the callback registered for new events, if any.
209+
void
210+
clear_on_ready_callback() override
211+
{
212+
std::lock_guard<std::recursive_mutex> lock(callback_mutex_);
213+
if (on_new_event_callback_) {
214+
set_on_new_event_callback(nullptr, nullptr);
215+
on_new_event_callback_ = nullptr;
216+
}
217+
}
218+
219+
protected:
220+
RCLCPP_PUBLIC
221+
void
222+
set_on_new_event_callback(rcl_event_callback_t callback, const void * user_data);
223+
224+
std::recursive_mutex callback_mutex_;
225+
std::function<void(size_t)> on_new_event_callback_{nullptr};
226+
227+
rcl_event_t event_handle_;
228+
size_t wait_set_event_index_;
229+
};
230+
231+
using QOSEventHandlerBase [[deprecated("Use rclcpp::EventHandlerBase")]] = EventHandlerBase;
232+
233+
template<typename EventCallbackT, typename ParentHandleT>
234+
class EventHandler : public EventHandlerBase
235+
{
236+
public:
237+
template<typename InitFuncT, typename EventTypeEnum>
238+
EventHandler(
239+
const EventCallbackT & callback,
240+
InitFuncT init_func,
241+
ParentHandleT parent_handle,
242+
EventTypeEnum event_type)
243+
: parent_handle_(parent_handle), event_callback_(callback)
244+
{
245+
event_handle_ = rcl_get_zero_initialized_event();
246+
rcl_ret_t ret = init_func(&event_handle_, parent_handle.get(), event_type);
247+
if (ret != RCL_RET_OK) {
248+
if (ret == RCL_RET_UNSUPPORTED) {
249+
UnsupportedEventTypeException exc(ret, rcl_get_error_state(), "Failed to initialize event");
250+
rcl_reset_error();
251+
throw exc;
252+
} else {
253+
rclcpp::exceptions::throw_from_rcl_error(ret, "Failed to initialize event");
254+
}
255+
}
256+
}
257+
258+
/// Take data so that the callback cannot be scheduled again
259+
std::shared_ptr<void>
260+
take_data() override
261+
{
262+
EventCallbackInfoT callback_info;
263+
rcl_ret_t ret = rcl_take_event(&event_handle_, &callback_info);
264+
if (ret != RCL_RET_OK) {
265+
RCUTILS_LOG_ERROR_NAMED(
266+
"rclcpp",
267+
"Couldn't take event info: %s", rcl_get_error_string().str);
268+
return nullptr;
269+
}
270+
return std::static_pointer_cast<void>(std::make_shared<EventCallbackInfoT>(callback_info));
271+
}
272+
273+
std::shared_ptr<void>
274+
take_data_by_entity_id(size_t id) override
275+
{
276+
(void)id;
277+
return take_data();
278+
}
279+
280+
/// Execute any entities of the Waitable that are ready.
281+
void
282+
execute(std::shared_ptr<void> & data) override
283+
{
284+
if (!data) {
285+
throw std::runtime_error("'data' is empty");
286+
}
287+
auto callback_ptr = std::static_pointer_cast<EventCallbackInfoT>(data);
288+
event_callback_(*callback_ptr);
289+
callback_ptr.reset();
290+
}
291+
292+
private:
293+
using EventCallbackInfoT = typename std::remove_reference<typename
294+
rclcpp::function_traits::function_traits<EventCallbackT>::template argument_type<0>>::type;
295+
296+
ParentHandleT parent_handle_;
297+
EventCallbackT event_callback_;
298+
};
299+
300+
template<typename EventCallbackT, typename ParentHandleT>
301+
using QOSEventHandler [[deprecated("Use rclcpp::EventHandler")]] = EventHandler<EventCallbackT,
302+
ParentHandleT>;
303+
304+
} // namespace rclcpp
305+
306+
#endif // RCLCPP__EVENT_HANDLER_HPP_

rclcpp/include/rclcpp/publisher_base.hpp

Lines changed: 8 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -33,7 +33,7 @@
3333
#include "rclcpp/macros.hpp"
3434
#include "rclcpp/network_flow_endpoint.hpp"
3535
#include "rclcpp/qos.hpp"
36-
#include "rclcpp/qos_event.hpp"
36+
#include "rclcpp/event_handler.hpp"
3737
#include "rclcpp/type_support_decl.hpp"
3838
#include "rclcpp/visibility_control.hpp"
3939
#include "rcpputils/time.hpp"
@@ -124,7 +124,7 @@ class PublisherBase : public std::enable_shared_from_this<PublisherBase>
124124
/** \return The map of QoS event handlers. */
125125
RCLCPP_PUBLIC
126126
const
127-
std::unordered_map<rcl_publisher_event_type_t, std::shared_ptr<rclcpp::QOSEventHandlerBase>> &
127+
std::unordered_map<rcl_publisher_event_type_t, std::shared_ptr<rclcpp::EventHandlerBase>> &
128128
get_event_handlers() const;
129129

130130
/// Get subscription count
@@ -276,7 +276,7 @@ class PublisherBase : public std::enable_shared_from_this<PublisherBase>
276276
* If you want more information available in the callback, like the qos event
277277
* or other information, you may use a lambda with captures or std::bind.
278278
*
279-
* \sa rclcpp::QOSEventHandlerBase::set_on_ready_callback
279+
* \sa rclcpp::EventHandlerBase::set_on_ready_callback
280280
*
281281
* \param[in] callback functor to be called when a new event occurs
282282
* \param[in] event_type identifier for the qos event we want to attach the callback to
@@ -327,7 +327,7 @@ class PublisherBase : public std::enable_shared_from_this<PublisherBase>
327327
const EventCallbackT & callback,
328328
const rcl_publisher_event_type_t event_type)
329329
{
330-
auto handler = std::make_shared<QOSEventHandler<EventCallbackT,
330+
auto handler = std::make_shared<EventHandler<EventCallbackT,
331331
std::shared_ptr<rcl_publisher_t>>>(
332332
callback,
333333
rcl_publisher_event_init,
@@ -339,12 +339,15 @@ class PublisherBase : public std::enable_shared_from_this<PublisherBase>
339339
RCLCPP_PUBLIC
340340
void default_incompatible_qos_callback(QOSOfferedIncompatibleQoSInfo & info) const;
341341

342+
RCLCPP_PUBLIC
343+
void default_incompatible_type_callback(IncompatibleTypeInfo & info) const;
344+
342345
std::shared_ptr<rcl_node_t> rcl_node_handle_;
343346

344347
std::shared_ptr<rcl_publisher_t> publisher_handle_;
345348

346349
std::unordered_map<rcl_publisher_event_type_t,
347-
std::shared_ptr<rclcpp::QOSEventHandlerBase>> event_handlers_;
350+
std::shared_ptr<rclcpp::EventHandlerBase>> event_handlers_;
348351

349352
using IntraProcessManagerWeakPtr =
350353
std::weak_ptr<rclcpp::experimental::IntraProcessManager>;

rclcpp/include/rclcpp/publisher_options.hpp

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -26,7 +26,7 @@
2626
#include "rclcpp/detail/rmw_implementation_specific_publisher_payload.hpp"
2727
#include "rclcpp/intra_process_setting.hpp"
2828
#include "rclcpp/qos.hpp"
29-
#include "rclcpp/qos_event.hpp"
29+
#include "rclcpp/event_handler.hpp"
3030
#include "rclcpp/qos_overriding_options.hpp"
3131

3232
namespace rclcpp

0 commit comments

Comments
 (0)