diff --git a/rclcpp/CMakeLists.txt b/rclcpp/CMakeLists.txt index 561beb5d3e..4ea4f99868 100644 --- a/rclcpp/CMakeLists.txt +++ b/rclcpp/CMakeLists.txt @@ -50,10 +50,13 @@ set(${PROJECT_NAME}_SRCS src/rclcpp/executor.cpp src/rclcpp/executors.cpp src/rclcpp/expand_topic_or_service_name.cpp + src/rclcpp/executors/events_executor.cpp + src/rclcpp/executors/events_executor_entities_collector.cpp src/rclcpp/executors/multi_threaded_executor.cpp src/rclcpp/executors/single_threaded_executor.cpp src/rclcpp/executors/static_executor_entities_collector.cpp src/rclcpp/executors/static_single_threaded_executor.cpp + src/rclcpp/executors/timers_manager.cpp src/rclcpp/future_return_code.cpp src/rclcpp/graph_listener.cpp src/rclcpp/guard_condition.cpp diff --git a/rclcpp/include/rclcpp/any_subscription_callback.hpp b/rclcpp/include/rclcpp/any_subscription_callback.hpp index 640077a1d9..9b6cb8691f 100644 --- a/rclcpp/include/rclcpp/any_subscription_callback.hpp +++ b/rclcpp/include/rclcpp/any_subscription_callback.hpp @@ -185,6 +185,12 @@ class AnySubscriptionCallback ConstMessageSharedPtr message, const rclcpp::MessageInfo & message_info) { TRACEPOINT(callback_start, static_cast(this), true); + + // If the message is not valid, return. + if(!message) { + return; + } + if (const_shared_ptr_callback_) { const_shared_ptr_callback_(message); } else if (const_shared_ptr_with_info_callback_) { @@ -208,6 +214,12 @@ class AnySubscriptionCallback MessageUniquePtr message, const rclcpp::MessageInfo & message_info) { TRACEPOINT(callback_start, static_cast(this), true); + + // If the message is not valid, return. + if(!message) { + return; + } + if (shared_ptr_callback_) { typename std::shared_ptr shared_message = std::move(message); shared_ptr_callback_(shared_message); diff --git a/rclcpp/include/rclcpp/client.hpp b/rclcpp/include/rclcpp/client.hpp index bd2d326012..e5e7bd50a3 100644 --- a/rclcpp/include/rclcpp/client.hpp +++ b/rclcpp/include/rclcpp/client.hpp @@ -16,6 +16,7 @@ #define RCLCPP__CLIENT_HPP_ #include +#include #include #include #include @@ -50,6 +51,11 @@ namespace node_interfaces class NodeBaseInterface; } // namespace node_interfaces +namespace executors +{ +class EventsExecutor; +} // namespace executors + class ClientBase { public: @@ -150,6 +156,12 @@ class ClientBase bool exchange_in_use_by_wait_set_state(bool in_use_state); + RCLCPP_PUBLIC + void + set_events_executor_callback( + const rclcpp::executors::EventsExecutor * executor, + rmw_listener_callback_t executor_callback) const; + protected: RCLCPP_DISABLE_COPY(ClientBase) diff --git a/rclcpp/include/rclcpp/executor.hpp b/rclcpp/include/rclcpp/executor.hpp index 946aa18851..9321f679fc 100644 --- a/rclcpp/include/rclcpp/executor.hpp +++ b/rclcpp/include/rclcpp/executor.hpp @@ -544,7 +544,7 @@ class Executor RCLCPP_DISABLE_COPY(Executor) RCLCPP_PUBLIC - void + virtual void spin_once_impl(std::chrono::nanoseconds timeout); typedef std::map #include +#include "rclcpp/executors/events_executor.hpp" #include "rclcpp/executors/multi_threaded_executor.hpp" #include "rclcpp/executors/single_threaded_executor.hpp" #include "rclcpp/executors/static_single_threaded_executor.hpp" diff --git a/rclcpp/include/rclcpp/executors/event_waitable.hpp b/rclcpp/include/rclcpp/executors/event_waitable.hpp new file mode 100644 index 0000000000..8f4b4d27ca --- /dev/null +++ b/rclcpp/include/rclcpp/executors/event_waitable.hpp @@ -0,0 +1,70 @@ +// Copyright 2020 Open Source Robotics Foundation, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +#ifndef RCLCPP__EXECUTORS__EVENT_WAITABLE_HPP_ +#define RCLCPP__EXECUTORS__EVENT_WAITABLE_HPP_ + +#include "rclcpp/waitable.hpp" + +namespace rclcpp +{ +namespace executors +{ + +/** + * @brief This class provides a wrapper around the waitable object, that is + * meant to be used with the EventsExecutor. + * The waitset related methods are stubbed out as they should not be called. + * This class is abstract as the execute method of rclcpp::Waitable is not implemented. + * Nodes who want to implement a custom EventWaitable, can derive from this class and override + * the execute method. + */ +class EventWaitable : public rclcpp::Waitable +{ +public: + RCLCPP_SMART_PTR_DEFINITIONS(EventWaitable) + + // Constructor + RCLCPP_PUBLIC + EventWaitable() = default; + + // Destructor + RCLCPP_PUBLIC + virtual ~EventWaitable() = default; + + // Stub API: not used by EventsExecutor + RCLCPP_PUBLIC + bool + is_ready(rcl_wait_set_t * wait_set) final + { + (void)wait_set; + throw std::runtime_error("EventWaitable can't be checked if it's ready"); + return false; + } + + // Stub API: not used by EventsExecutor + RCLCPP_PUBLIC + bool + add_to_wait_set(rcl_wait_set_t * wait_set) final + { + (void)wait_set; + throw std::runtime_error("EventWaitable can't be added to a wait_set"); + return false; + } +}; + +} // namespace executors +} // namespace rclcpp + +#endif // RCLCPP__EXECUTORS__EVENT_WAITABLE_HPP_ diff --git a/rclcpp/include/rclcpp/executors/events_executor.hpp b/rclcpp/include/rclcpp/executors/events_executor.hpp new file mode 100644 index 0000000000..6b5ef5ec11 --- /dev/null +++ b/rclcpp/include/rclcpp/executors/events_executor.hpp @@ -0,0 +1,231 @@ +// Copyright 2020 Open Source Robotics Foundation, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +#ifndef RCLCPP__EXECUTORS__EVENTS_EXECUTOR_HPP_ +#define RCLCPP__EXECUTORS__EVENTS_EXECUTOR_HPP_ + +#include +#include +#include +#include + +#include "rclcpp/executor.hpp" +#include "rclcpp/executors/events_executor_entities_collector.hpp" +#include "rclcpp/executors/events_executor_notify_waitable.hpp" +#include "rclcpp/executors/timers_manager.hpp" +#include "rclcpp/experimental/buffers/events_queue.hpp" +#include "rclcpp/experimental/buffers/simple_events_queue.hpp" +#include "rclcpp/node.hpp" + +#include "rmw/listener_event_types.h" + +namespace rclcpp +{ +namespace executors +{ + +/// Events executor implementation +/** + * This executor is a variation of the standard one that does not uses a waitset. + * The executor uses an events queue and a timers manager to execute entities from its + * associated nodes and callback groups. + * This provides improved performance as it allows to skip all the waitset maintenance operations. + * + * To run this executor: + * rclcpp::executors::EventsExecutor executor; + * executor.add_node(node); + * executor.spin(); + * executor.remove_node(node); + */ +class EventsExecutor : public rclcpp::Executor +{ + friend class EventsExecutorEntitiesCollector; + +public: + RCLCPP_SMART_PTR_DEFINITIONS(EventsExecutor) + + /// Default constructor. See the default constructor for Executor. + /** + * \param[in] events_queue The queue used to store events. + * \param[in] options Options used to configure the executor. + */ + RCLCPP_PUBLIC + explicit EventsExecutor( + rclcpp::experimental::buffers::EventsQueue::UniquePtr events_queue = std::make_unique< + rclcpp::experimental::buffers::SimpleEventsQueue>(), + const rclcpp::ExecutorOptions & options = rclcpp::ExecutorOptions()); + + /// Default destrcutor. + RCLCPP_PUBLIC + virtual ~EventsExecutor() = default; + + /// Events executor implementation of spin. + /** + * This function will block until work comes in, execute it, and keep blocking. + * It will only be interrupted by a CTRL-C (managed by the global signal handler). + * \throws std::runtime_error when spin() called while already spinning + */ + RCLCPP_PUBLIC + void + spin() override; + + /// Events executor implementation of spin some + /** + * executor.provide_callbacks(); + * while(condition) { + * executor.spin_some(); + * } + */ + RCLCPP_PUBLIC + void + spin_some(std::chrono::nanoseconds max_duration = std::chrono::nanoseconds(0)) override; + + RCLCPP_PUBLIC + void + spin_all(std::chrono::nanoseconds max_duration) override; + + /// Add a node to the executor. + /** + * \sa rclcpp::Executor::add_node + */ + RCLCPP_PUBLIC + void + add_node( + rclcpp::node_interfaces::NodeBaseInterface::SharedPtr node_ptr, + bool notify = true) override; + + /// Convenience function which takes Node and forwards NodeBaseInterface. + /** + * \sa rclcpp::EventsExecutor::add_node + */ + RCLCPP_PUBLIC + void + add_node(std::shared_ptr node_ptr, bool notify = true) override; + + /// Remove a node from the executor. + /** + * \sa rclcpp::Executor::remove_node + */ + RCLCPP_PUBLIC + void + remove_node( + rclcpp::node_interfaces::NodeBaseInterface::SharedPtr node_ptr, + bool notify = true) override; + + /// Convenience function which takes Node and forwards NodeBaseInterface. + /** + * \sa rclcpp::Executor::remove_node + */ + RCLCPP_PUBLIC + void + remove_node(std::shared_ptr node_ptr, bool notify = true) override; + + /// Add a callback group to an executor. + /** + * \sa rclcpp::Executor::add_callback_group + */ + void + add_callback_group( + rclcpp::CallbackGroup::SharedPtr group_ptr, + rclcpp::node_interfaces::NodeBaseInterface::SharedPtr node_ptr, + bool notify = true) override; + + /// Remove callback group from the executor + /** + * \sa rclcpp::Executor::remove_callback_group + */ + RCLCPP_PUBLIC + void + remove_callback_group( + rclcpp::CallbackGroup::SharedPtr group_ptr, + bool notify = true) override; + + RCLCPP_PUBLIC + std::vector + get_all_callback_groups() override; + + /// Get callback groups that belong to executor. + /** + * \sa rclcpp::Executor::get_manually_added_callback_groups() + */ + RCLCPP_PUBLIC + std::vector + get_manually_added_callback_groups() override; + + /// Get callback groups that belong to executor. + /** + * \sa rclcpp::Executor::get_automatically_added_callback_groups_from_nodes() + */ + RCLCPP_PUBLIC + std::vector + get_automatically_added_callback_groups_from_nodes() override; + +protected: + RCLCPP_PUBLIC + void + spin_once_impl(std::chrono::nanoseconds timeout) override; + +private: + RCLCPP_DISABLE_COPY(EventsExecutor) + + using EventQueue = std::queue; + + // Executor callback: Push new events into the queue and trigger cv. + // This function is called by the DDS entities when an event happened, + // like a subscription receiving a message. + static void + push_event(const void * executor_ptr, rmw_listener_event_t event) + { + // Cast executor_ptr to this (need to remove constness) + auto this_executor = const_cast( + static_cast(executor_ptr)); + + // Event queue mutex scope + { + std::unique_lock lock(this_executor->push_mutex_); + + this_executor->events_queue_->push(event); + } + // Notify that the event queue has some events in it. + this_executor->events_queue_cv_.notify_one(); + } + + /// Extract and execute events from the queue until it is empty + RCLCPP_PUBLIC + void + consume_all_events(EventQueue & queue); + + // Execute a single event + RCLCPP_PUBLIC + void + execute_event(const rmw_listener_event_t & event); + + // Queue where entities can push events + rclcpp::experimental::buffers::EventsQueue::SharedPtr events_queue_; + + EventsExecutorEntitiesCollector::SharedPtr entities_collector_; + EventsExecutorNotifyWaitable::SharedPtr executor_notifier_; + + // Mutex to protect the insertion of events in the queue + std::mutex push_mutex_; + // Variable used to notify when an event is added to the queue + std::condition_variable events_queue_cv_; + // Timers manager + std::shared_ptr timers_manager_; +}; + +} // namespace executors +} // namespace rclcpp + +#endif // RCLCPP__EXECUTORS__EVENTS_EXECUTOR_HPP_ diff --git a/rclcpp/include/rclcpp/executors/events_executor_entities_collector.hpp b/rclcpp/include/rclcpp/executors/events_executor_entities_collector.hpp new file mode 100644 index 0000000000..624980a7b8 --- /dev/null +++ b/rclcpp/include/rclcpp/executors/events_executor_entities_collector.hpp @@ -0,0 +1,308 @@ +// Copyright 2020 Open Source Robotics Foundation, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +#ifndef RCLCPP__EXECUTORS__EVENTS_EXECUTOR_ENTITIES_COLLECTOR_HPP_ +#define RCLCPP__EXECUTORS__EVENTS_EXECUTOR_ENTITIES_COLLECTOR_HPP_ + +#include +#include +#include +#include +#include + +#include "rclcpp/executors/event_waitable.hpp" +#include "rclcpp/executors/timers_manager.hpp" +#include "rclcpp/node_interfaces/node_base_interface.hpp" + +namespace rclcpp +{ +namespace executors +{ +typedef std::map> WeakCallbackGroupsToNodesMap; + +// forward declaration of EventsExecutor to avoid circular dependency +class EventsExecutor; + +/** + * @brief This class provides a waitable object that is used for managing the + * entities (i.e. nodes and their subscriptions, timers, services, etc) + * added to an EventsExecutor. + * The add/remove node APIs are used when a node is added/removed from + * the associated EventsExecutor and result in setting/unsetting the + * events callbacks and adding timers to the timers manager. + * + * Being this class derived from Waitable, it can be used to wake up an + * executor thread while it's spinning. + * When this occurs, the execute API takes care of handling changes + * in the entities currently used by the executor. + */ +class EventsExecutorEntitiesCollector final + : public EventWaitable, + public std::enable_shared_from_this +{ +public: + RCLCPP_SMART_PTR_DEFINITIONS(EventsExecutorEntitiesCollector) + + // Constructor + RCLCPP_PUBLIC + EventsExecutorEntitiesCollector( + EventsExecutor * executor); + + // Destructor + RCLCPP_PUBLIC + ~EventsExecutorEntitiesCollector(); + + // Initialize entities collector + RCLCPP_PUBLIC + void init(); + + // The purpose of "execute" is handling the situation of a new entity added to + // a node, while the executor is already spinning. + // This consists in setting that entitiy's callback. + // If a entity is removed from a node, we should unset its callback + RCLCPP_PUBLIC + void + execute(std::shared_ptr & data) override; + + RCLCPP_PUBLIC + void + add_node( + rclcpp::node_interfaces::NodeBaseInterface::SharedPtr node_ptr); + + RCLCPP_PUBLIC + void + remove_node( + rclcpp::node_interfaces::NodeBaseInterface::SharedPtr node_ptr); + + /// Add a callback group to the entities collector + /** + * \see rclcpp::Executor::add_callback_group + */ + RCLCPP_PUBLIC + void + add_callback_group( + rclcpp::CallbackGroup::SharedPtr group_ptr, + rclcpp::node_interfaces::NodeBaseInterface::SharedPtr node_ptr); + + /// Add a callback group to the entities collector + /** + * \see rclcpp::Executor::add_callback_group + * \return boolean whether the node from the callback group is new + */ + RCLCPP_PUBLIC + void + add_callback_group( + rclcpp::CallbackGroup::SharedPtr group_ptr, + rclcpp::node_interfaces::NodeBaseInterface::SharedPtr node_ptr, + WeakCallbackGroupsToNodesMap & weak_groups_to_nodes); + + /// Remove a callback group from the entities collector + /** + * \see rclcpp::Executor::remove_callback_group + */ + RCLCPP_PUBLIC + void + remove_callback_group( + rclcpp::CallbackGroup::SharedPtr group_ptr); + + /// Remove a callback group from the entities collector + /** + * \see rclcpp::Executor::remove_callback_group_from_map + */ + RCLCPP_PUBLIC + void + remove_callback_group_from_map( + rclcpp::CallbackGroup::SharedPtr group_ptr, + WeakCallbackGroupsToNodesMap & weak_groups_to_nodes); + + RCLCPP_PUBLIC + std::vector + get_all_callback_groups(); + + /// Get manually added callback groups that belong to the entities collector + /** + * \see rclcpp::Executor::get_manually_added_callback_groups() + */ + RCLCPP_PUBLIC + std::vector + get_manually_added_callback_groups(); + + /// Get autmatically added callback groups that belong to the entities collector + /** + * \see rclcpp::Executor::get_automatically_added_callback_groups_from_nodes() + */ + RCLCPP_PUBLIC + std::vector + get_automatically_added_callback_groups_from_nodes(); + + /// + /** + * Get the subscription shared pointer corresponding + * to a subscription identifier + */ + RCLCPP_PUBLIC + rclcpp::SubscriptionBase::SharedPtr + get_subscription(const void * subscription_id); + + /// + /** + * Get the client shared pointer corresponding + * to a client identifier + */ + RCLCPP_PUBLIC + rclcpp::ClientBase::SharedPtr + get_client(const void * client_id); + + /// + /** + * Get the service shared pointer corresponding + * to a service identifier + */ + RCLCPP_PUBLIC + rclcpp::ServiceBase::SharedPtr + get_service(const void * service_id); + + /// + /** + * Get the waitable shared pointer corresponding + * to a waitable identifier + */ + RCLCPP_PUBLIC + rclcpp::Waitable::SharedPtr + get_waitable(const void * waitable_id); + + /// + /** + * Add a weak pointer to a waitable + */ + RCLCPP_PUBLIC + void + add_waitable(rclcpp::Waitable::SharedPtr waitable); + + RCLCPP_PUBLIC + std::shared_ptr + take_data() + { + // This waitable doesn't handle any data + return nullptr; + } + + /// + /** + * Get the subscription qos depth corresponding + * to a subscription identifier + */ + RCLCPP_PUBLIC + size_t + get_subscription_qos_depth(const void * subscription_id); + + /// + /** + * Get the waitable qos depth corresponding + * to a waitable identifier + */ + RCLCPP_PUBLIC + size_t + get_waitable_qos_depth(const void * waitable_id); + + /// + /** + * Gets the QoS of this notify waitable. + * This is useful for the events executor, when it has to + * decide if keep pushing events from this waitable into the qeueue + */ + RCLCPP_PUBLIC + rmw_qos_profile_t + get_actual_qos() const override + { + rmw_qos_profile_t qos; + qos.depth = 0; + return qos; + } + +private: + void + set_callback_group_entities_callbacks(rclcpp::CallbackGroup::SharedPtr group); + + void + unset_callback_group_entities_callbacks(rclcpp::CallbackGroup::SharedPtr group); + + void + set_guard_condition_callback(const rcl_guard_condition_t * guard_condition); + + void + unset_guard_condition_callback(const rcl_guard_condition_t * guard_condition); + + /// Return true if the node belongs to the collector + /** + * \param[in] group_ptr a node base interface shared pointer + * \return boolean whether a node belongs the collector + */ + bool + has_node( + const rclcpp::node_interfaces::NodeBaseInterface::SharedPtr node_ptr, + const WeakCallbackGroupsToNodesMap & weak_groups_to_nodes) const; + + /// Add all callback groups that can be automatically added by any executor + /// and is not already associated with an executor from nodes + /// that are associated with executor + /** + * \see rclcpp::Executor::add_callback_groups_from_nodes_associated_to_executor() + */ + void + add_callback_groups_from_nodes_associated_to_executor(); + + void + set_entities_event_callbacks_from_map( + const WeakCallbackGroupsToNodesMap & weak_groups_to_nodes); + + // maps callback groups to nodes. + WeakCallbackGroupsToNodesMap weak_groups_associated_with_executor_to_nodes_; + // maps callback groups to nodes. + WeakCallbackGroupsToNodesMap weak_groups_to_nodes_associated_with_executor_; + + typedef std::map> + WeakNodesToGuardConditionsMap; + WeakNodesToGuardConditionsMap weak_nodes_to_guard_conditions_; + + /// List of weak nodes registered in the events executor + std::list weak_nodes_; + + // Maps: entity identifiers to weak pointers from the entities registered in the executor + // so in the case of an event providing and ID, we can retrieve and own the corresponding + // entity while it performs work + std::unordered_map weak_subscriptions_map_; + std::unordered_map weak_services_map_; + std::unordered_map weak_clients_map_; + std::unordered_map weak_waitables_map_; + + /// Executor using this entities collector object + EventsExecutor * associated_executor_ = nullptr; + /// Instance of the timers manager used by the associated executor + TimersManager::SharedPtr timers_manager_; + + // Maps: entity identifiers to qos->depth from the entities registered in the executor + using QosDepthMap = std::unordered_map; + QosDepthMap qos_depth_subscriptions_map_; + QosDepthMap qos_depth_waitables_map_; +}; + +} // namespace executors +} // namespace rclcpp + +#endif // RCLCPP__EXECUTORS__EVENTS_EXECUTOR_ENTITIES_COLLECTOR_HPP_ diff --git a/rclcpp/include/rclcpp/executors/events_executor_notify_waitable.hpp b/rclcpp/include/rclcpp/executors/events_executor_notify_waitable.hpp new file mode 100644 index 0000000000..b6cd58d030 --- /dev/null +++ b/rclcpp/include/rclcpp/executors/events_executor_notify_waitable.hpp @@ -0,0 +1,111 @@ +// Copyright 2020 Open Source Robotics Foundation, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +#ifndef RCLCPP__EXECUTORS__EVENTS_EXECUTOR_NOTIFY_WAITABLE_HPP_ +#define RCLCPP__EXECUTORS__EVENTS_EXECUTOR_NOTIFY_WAITABLE_HPP_ + +#include +#include + +#include "rcl/guard_condition.h" +#include "rclcpp/executors/event_waitable.hpp" + +namespace rclcpp +{ +namespace executors +{ + +/** + * @brief This class provides an EventWaitable that allows to + * wake up an EventsExecutor when a guard condition is notified. + */ +class EventsExecutorNotifyWaitable final : public EventWaitable +{ +public: + RCLCPP_SMART_PTR_DEFINITIONS(EventsExecutorNotifyWaitable) + + // Constructor + RCLCPP_PUBLIC + EventsExecutorNotifyWaitable() = default; + + // Destructor + RCLCPP_PUBLIC + virtual ~EventsExecutorNotifyWaitable() = default; + + // The function is a no-op, since we only care of waking up the executor + RCLCPP_PUBLIC + void + execute(std::shared_ptr & data) override + { + (void)data; + } + + RCLCPP_PUBLIC + void + add_guard_condition(const rcl_guard_condition_t * guard_condition) + { + notify_guard_conditions_.push_back(guard_condition); + } + + RCLCPP_PUBLIC + void + set_events_executor_callback( + const rclcpp::executors::EventsExecutor * executor, + rmw_listener_callback_t executor_callback) const override + { + for (auto gc : notify_guard_conditions_) { + rcl_ret_t ret = rcl_guard_condition_set_listener_callback( + gc, + executor_callback, + executor, + this, + false); + + if (RCL_RET_OK != ret) { + throw std::runtime_error("Couldn't set guard condition events callback"); + } + } + } + + RCLCPP_PUBLIC + std::shared_ptr + take_data() + { + // This waitable doesn't handle any data + return nullptr; + } + + /// + /** + * Gets the QoS of this notify waitable. + * This is useful for the events executor, when it has to + * decide if keep pushing events from this waitable into the qeueue + */ + RCLCPP_PUBLIC + rmw_qos_profile_t + get_actual_qos() const override + { + rmw_qos_profile_t qos; + qos.depth = 0; + return qos; + } + +private: + std::list notify_guard_conditions_; +}; + +} // namespace executors +} // namespace rclcpp + +#endif // RCLCPP__EXECUTORS__EVENTS_EXECUTOR_NOTIFY_WAITABLE_HPP_ diff --git a/rclcpp/include/rclcpp/executors/timers_manager.hpp b/rclcpp/include/rclcpp/executors/timers_manager.hpp new file mode 100644 index 0000000000..b4be64aee3 --- /dev/null +++ b/rclcpp/include/rclcpp/executors/timers_manager.hpp @@ -0,0 +1,444 @@ +// Copyright 2020 Open Source Robotics Foundation, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +#ifndef RCLCPP__EXECUTORS__TIMERS_MANAGER_HPP_ +#define RCLCPP__EXECUTORS__TIMERS_MANAGER_HPP_ + +#include +#include +#include +#include +#include +#include +#include +#include +#include + +#include "rclcpp/context.hpp" +#include "rclcpp/timer.hpp" + +namespace rclcpp +{ +namespace executors +{ + +/** + * @brief This class provides a way for storing and executing timer objects. + * It APIs to suit the needs of different applications and execution models. + * All public APIs provided by this class are thread-safe. + * + * Timers management + * This class provides APIs to add and remove timers. + * It keeps a list of weak pointers from added timers, and owns them only when + * have expired and need to be executed. + * Timers are kept ordered in a binary-heap priority queue. + * Calls to add/remove APIs will temporarily block the execution of the timers and + * will require to reorder the internal priority queue of timers. + * Because of this, they have a not-negligible impact on the performance. + * + * Timers execution + * The most efficient implementation consists in letting a TimersManager object + * to spawn a thread where timers are monitored and periodically executed. + * Besides this, other APIs allow to either execute a single timer or all the + * currently ready ones. + * This class assumes that the execute_callback API of the stored timer is never + * called by other entities, but can only be called from here. + * If this assumption is not respected, the heap property will be invalidated, + * so timers may be executed out of order. + * + */ +class TimersManager +{ +public: + RCLCPP_SMART_PTR_DEFINITIONS_NOT_COPYABLE(TimersManager) + + /** + * @brief Construct a new TimersManager object + */ + explicit TimersManager(std::shared_ptr context); + + /** + * @brief Destruct the object making sure to stop thread and release memory. + */ + ~TimersManager(); + + /** + * @brief Adds a new TimerBase::WeakPtr to the storage. + * This object will store a weak pointer of the added timer + * in a heap data structure. + * @param timer the timer to be added + */ + void add_timer(rclcpp::TimerBase::SharedPtr timer); + + /** + * @brief Starts a thread that takes care of executing timers added to this object. + */ + void start(); + + /** + * @brief Stops the timers thread. + */ + void stop(); + + /** + * @brief Executes all the timers currently ready when the function is invoked + * while keeping the heap correctly sorted. + * @return std::chrono::nanoseconds for next timer to expire, + * the returned value could be negative if the timer is already expired + * or MAX_TIME if the heap is empty. + */ + std::chrono::nanoseconds execute_ready_timers(); + + /** + * @brief Executes one ready timer if available + * + * @return true if there was a timer ready + */ + bool execute_head_timer(); + + /** + * @brief Get the amount of time before the next timer expires. + * + * @return std::chrono::nanoseconds to wait, + * the returned value could be negative if the timer is already expired + * or MAX_TIME if the heap is empty. + */ + std::chrono::nanoseconds get_head_timeout(); + + /** + * @brief Remove all the timers stored in the object. + */ + void clear(); + + /** + * @brief Remove a single timer stored in the object, passed as a shared_ptr. + * @param timer the timer to remove. + */ + void remove_timer(rclcpp::TimerBase::SharedPtr timer); + + // This is what the TimersManager uses to denote a duration forever. + // We don't use std::chrono::nanoseconds::max because it will overflow. + // See https://en.cppreference.com/w/cpp/thread/condition_variable/wait_for + static constexpr std::chrono::nanoseconds MAX_TIME = std::chrono::hours(90); + +private: + RCLCPP_DISABLE_COPY(TimersManager) + + using TimerPtr = rclcpp::TimerBase::SharedPtr; + using WeakTimerPtr = rclcpp::TimerBase::WeakPtr; + + // Forward declaration + class TimersHeap; + + /** + * @brief This class allows to store weak pointers to timers in a heap data structure. + * The "validate_and_lock" API allows to get ownership of the timers and also makes sure that + * the heap property is respected. + * The root of the heap is the timer that expires first. + * This class is not thread safe and requires external mutexes to protect its usage. + */ + class WeakTimersHeap + { +public: + /** + * @brief Try to add a new timer to the heap. + * After the addition, the heap property is preserved. + * @param timer new timer to add + * @return true if timer has been added, false if it was already there + */ + bool add_timer(TimerPtr timer) + { + TimersHeap locked_heap = this->validate_and_lock(); + bool added = locked_heap.add_timer(std::move(timer)); + + if (added) { + // Re-create the weak heap with the new timer added + this->store(locked_heap); + } + + return added; + } + + /** + * @brief Try to remove a timer from the heap. + * After the removal, the heap property is preserved. + * @param timer timer to remove + * @return true if timer has been removed, false if it was not there + */ + bool remove_timer(TimerPtr timer) + { + TimersHeap locked_heap = this->validate_and_lock(); + bool removed = locked_heap.remove_timer(std::move(timer)); + + if (removed) { + // Re-create the weak heap with the timer removed + this->store(locked_heap); + } + + return removed; + } + + /** + * @brief This function restores the current object as a valid heap + * and it also returns a locked version of it + * @return TimersHeap owned timers corresponding to the current object + */ + TimersHeap validate_and_lock() + { + TimersHeap locked_heap; + bool any_timer_destroyed = false; + + auto it = weak_heap_.begin(); + + while (it != weak_heap_.end()) { + if (auto timer_shared_ptr = it->lock()) { + // This timer is valid, add it to the vector + locked_heap.push_back(std::move(timer_shared_ptr)); + it++; + } else { + // This timer went out of scope, remove it + it = weak_heap_.erase(it); + any_timer_destroyed = true; + } + } + + // If a timer has gone out of scope, then the remaining elements may not represent + // a valid heap anymore. We need to re heapify the timers heap. + if (any_timer_destroyed) { + locked_heap.heapify(); + // Re-create the weak heap now that elements have been heapified again + this->store(locked_heap); + } + + return locked_heap; + } + + /** + * @brief This function allows to recreate the heap of weak pointers + * from an heap of owned pointers. + * @param heap timers heap to store as weak pointers + */ + void store(const TimersHeap & heap) + { + weak_heap_.clear(); + for (auto t : heap.owned_heap_) { + weak_heap_.push_back(t); + } + } + + /** + * @brief Remove all timers from the heap. + */ + void clear() + { + weak_heap_.clear(); + } + +private: + std::vector weak_heap_; + }; + + /** + * @brief This class is the equivalent of WeakTimersHeap but with ownership of the timers. + * It can be generated by locking the weak version. + * It provides operations to manipulate the heap + */ + class TimersHeap + { +public: + /** + * @brief Try to add a new timer to the heap. + * After the addition, the heap property is preserved. + * @param timer new timer to add + * @return true if timer has been added, false if it was already there + */ + bool add_timer(TimerPtr timer) + { + // Nothing to do if the timer is already stored here + auto it = std::find(owned_heap_.begin(), owned_heap_.end(), timer); + if (it != owned_heap_.end()) { + return false; + } + + owned_heap_.push_back(std::move(timer)); + std::push_heap(owned_heap_.begin(), owned_heap_.end(), timer_greater); + + return true; + } + + /** + * @brief Try to remove a timer from the heap. + * After the removal, the heap property is preserved. + * @param timer timer to remove + * @return true if timer has been removed, false if it was not there + */ + bool remove_timer(TimerPtr timer) + { + // Nothing to do if the timer is not stored here + auto it = std::find(owned_heap_.begin(), owned_heap_.end(), timer); + if (it == owned_heap_.end()) { + return false; + } + + owned_heap_.erase(it); + std::make_heap(owned_heap_.begin(), owned_heap_.end(), timer_greater); + + return true; + } + + /** + * @brief Returns a reference to the front element + */ + TimerPtr & front() + { + return owned_heap_.front(); + } + + /** + * @brief Returns a const reference to the front element + */ + const TimerPtr & front() const + { + return owned_heap_.front(); + } + + /** + * @brief Returns whether the heap is empty or not + */ + bool empty() const + { + return owned_heap_.empty(); + } + + /** + * @brief Restore a valid heap after the root value has been replaced. + */ + void heapify_root() + { + // The following code is a more efficient version of doing + // - pop_heap; pop_back; + // - push_back; push_heap; + // as it removes the need for the last push_heap + + // Push the modified element (i.e. the current root) at the bottom of the heap + owned_heap_.push_back(owned_heap_[0]); + // Exchange first and last-1 elements and reheapify + std::pop_heap(owned_heap_.begin(), owned_heap_.end(), timer_greater); + // Remove last element + owned_heap_.pop_back(); + } + + /** + * @brief Completely restores the structure to a valid heap + */ + void heapify() + { + std::make_heap(owned_heap_.begin(), owned_heap_.end(), timer_greater); + } + + /** + * @brief Convenience function that allows to push an element at the bottom of the heap. + * It will not perform any check on whether the heap remains valid or not. + * Those checks are responsibility of the calling code. + * + * @param timer timer to push at the back of the data structure representing the heap + */ + void push_back(TimerPtr timer) + { + owned_heap_.push_back(timer); + } + + /** + * @brief Friend declaration to allow the store function to access the underlying + * heap container + */ + friend void WeakTimersHeap::store(const TimersHeap & heap); + +private: + /** + * @brief Comparison function between timers. + */ + static bool timer_greater(TimerPtr a, TimerPtr b) + { + return a->time_until_trigger() > b->time_until_trigger(); + } + + std::vector owned_heap_; + }; + + /** + * @brief Implements a loop that keeps executing ready timers. + * This function is executed in the timers thread. + */ + void run_timers(); + + /** + * @brief Get the amount of time before the next timer expires. + * This function is not thread safe, acquire a mutex before calling it. + * + * @return std::chrono::nanoseconds to wait, + * the returned value could be negative if the timer is already expired + * or MAX_TIME if the heap is empty. + */ + std::chrono::nanoseconds get_head_timeout_unsafe(const TimersHeap & heap) + { + if (heap.empty()) { + return MAX_TIME; + } + return (heap.front())->time_until_trigger(); + } + + /** + * @brief Executes all the timers currently ready when the function is invoked + * while keeping the heap correctly sorted. + * This function is not thread safe, acquire a mutex before calling it. + */ + void execute_ready_timers_unsafe(TimersHeap & heap); + + /** + * @brief Helper function that checks whether a timer was already ready + * at a specific timepoint + * @param timer a pointer to the timer to check for + * @param tp the timepoint to check for + * @return true if timer was ready at tp + */ + bool timer_was_ready_at_tp( + TimerPtr timer, + std::chrono::time_point tp) + { + // A ready timer will return a negative duration when calling time_until_trigger + auto time_ready = std::chrono::steady_clock::now() + timer->time_until_trigger(); + return time_ready < tp; + } + + // Thread used to run the timers monitoring and execution task + std::thread timers_thread_; + // Protects access to timers + std::mutex timers_mutex_; + // Notifies the timers thread whenever timers are added/removed + std::condition_variable timers_cv_; + // Flag used as predicate by timers_cv + bool timers_updated_ {false}; + // Indicates whether the timers thread is currently running or requested to stop + std::atomic running_ {false}; + // Context of the parent executor + std::shared_ptr context_; + // Timers heap with weak ownership + WeakTimersHeap weak_timers_heap_; +}; + +} // namespace executors +} // namespace rclcpp + +#endif // RCLCPP__EXECUTORS__TIMERS_MANAGER_HPP_ diff --git a/rclcpp/include/rclcpp/experimental/buffers/events_queue.hpp b/rclcpp/include/rclcpp/experimental/buffers/events_queue.hpp new file mode 100644 index 0000000000..8101029d5d --- /dev/null +++ b/rclcpp/include/rclcpp/experimental/buffers/events_queue.hpp @@ -0,0 +1,109 @@ +// Copyright 2021 Open Source Robotics Foundation, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +#ifndef RCLCPP__EXPERIMENTAL__BUFFERS__EVENTS_QUEUE_HPP_ +#define RCLCPP__EXPERIMENTAL__BUFFERS__EVENTS_QUEUE_HPP_ + +#include + +#include "rclcpp/executors/events_executor_entities_collector.hpp" +#include "rclcpp/macros.hpp" + +#include "rmw/listener_event_types.h" + +namespace rclcpp +{ +namespace experimental +{ +namespace buffers +{ + +/** + * @brief This abstract class is intended to be used as + * a wrapper around a queue. The derived classes should chose + * which container to use and the strategies for push and prune + * events from the queue. + */ +class EventsQueue +{ +public: + RCLCPP_SMART_PTR_DEFINITIONS(EventsQueue) + + /** + * @brief Destruct the object. + */ + RCLCPP_PUBLIC + virtual ~EventsQueue() = default; + + /** + * @brief push event into the queue + * @param event The event to push into the queue + */ + RCLCPP_PUBLIC + virtual + void + push(const rmw_listener_event_t & event) = 0; + + /** + * @brief removes front element from the queue + * The element removed is the "oldest" element in the queue whose + * value can be retrieved by calling member front(). + */ + RCLCPP_PUBLIC + virtual + void + pop() = 0; + + /** + * @brief gets the front event from the queue + * @return the front event + */ + RCLCPP_PUBLIC + virtual + rmw_listener_event_t + front() const = 0; + + /** + * @brief Test whether queue is empty + * @return true if the queue's size is 0, false otherwise. + */ + RCLCPP_PUBLIC + virtual + bool + empty() const = 0; + + /** + * @brief Initializes the queue + */ + RCLCPP_PUBLIC + virtual + void + init(rclcpp::executors::EventsExecutorEntitiesCollector::SharedPtr entities_collector) = 0; + + /** + * @brief gets a queue with all events accumulated on it since + * the last call. The member queue is empty when the call returns. + * @return queue with events + */ + RCLCPP_PUBLIC + virtual + std::queue + get_all_events() = 0; +}; + +} // namespace buffers +} // namespace experimental +} // namespace rclcpp + +#endif // RCLCPP__EXPERIMENTAL__BUFFERS__EVENTS_QUEUE_HPP_ diff --git a/rclcpp/include/rclcpp/experimental/buffers/qos_bounded_events_queue.hpp b/rclcpp/include/rclcpp/experimental/buffers/qos_bounded_events_queue.hpp new file mode 100644 index 0000000000..8103bffb4e --- /dev/null +++ b/rclcpp/include/rclcpp/experimental/buffers/qos_bounded_events_queue.hpp @@ -0,0 +1,288 @@ +// Copyright 2021 Open Source Robotics Foundation, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +#ifndef RCLCPP__EXPERIMENTAL__BUFFERS__BOUNDED_EVENTS_QUEUE_HPP_ +#define RCLCPP__EXPERIMENTAL__BUFFERS__BOUNDED_EVENTS_QUEUE_HPP_ + +#include +#include + +#include "rclcpp/experimental/buffers/events_queue.hpp" + +namespace rclcpp +{ +namespace experimental +{ +namespace buffers +{ + +/** + * @brief This class provides a bounded queue implementation + * based on a std::queue. Before pushing events into the queue + * checks the queue size. In case of exceeding the size it performs + * a prune of the queue. + */ +class QosBoundedEventsQueue : public EventsQueue +{ +public: + RCLCPP_PUBLIC + explicit QosBoundedEventsQueue(size_t queue_size_limit) + { + queue_size_limit_ = queue_size_limit; + } + + RCLCPP_PUBLIC + ~QosBoundedEventsQueue() = default; + + /** + * @brief push event into the queue + * @param event The event to push into the queue + */ + RCLCPP_PUBLIC + virtual + void + push(const rmw_listener_event_t & event) + { + event_queue_.push_back(event); + + if (event_queue_.size() >= queue_size_limit_) { + prune_queue(); + } + } + + /** + * @brief removes front element from the queue + * The element removed is the "oldest" element in the queue whose + * value can be retrieved by calling member front(). + */ + RCLCPP_PUBLIC + virtual + void + pop() + { + event_queue_.pop_front(); + } + + /** + * @brief gets the front event from the queue + * @return the front event + */ + RCLCPP_PUBLIC + virtual + rmw_listener_event_t + front() const + { + return event_queue_.front(); + } + + /** + * @brief Test whether queue is empty + * @return true if the queue's size is 0, false otherwise. + */ + RCLCPP_PUBLIC + virtual + bool + empty() const + { + return event_queue_.empty(); + } + + /** + * @brief Initializes the queue + */ + RCLCPP_PUBLIC + virtual + void + init(rclcpp::executors::EventsExecutorEntitiesCollector::SharedPtr entities_collector) + { + // Assign entities collector + entities_collector_ = entities_collector; + + // Make sure the queue is empty when we start + std::deque emtpy_queue; + std::swap(event_queue_, emtpy_queue); + } + + /** + * @brief gets a queue with all events accumulated on it since + * the last call. The member queue is empty when the call returns. + * @return std::queue with events + */ + RCLCPP_PUBLIC + virtual + std::queue + get_all_events() + { + // Initialize a queue with the events in deque + std::queue local_queue( + std::queue::container_type( + event_queue_.begin(), event_queue_.end())); + + // Empty the queue + std::deque emtpy_queue; + std::swap(event_queue_, emtpy_queue); + + return local_queue; + } + +private: + RCLCPP_PUBLIC + void + prune_queue() + { + // Clear maps + subscription_events_in_queue_map_.clear(); + waitable_events_in_queue_map_.clear(); + + std::cout << "prune queue" << std::endl; + + // Prune queue: + // For each entity, we get its QoS depth and use it as its events limit. + // Starting from the newer events (backward iterating the queue) we + // count events from each entity. If there are more events than the limit, + // we discard the oldest events. The + // For example, subscription A has depth = 1 / B has depth = 2 + // C has depth = 1 / D has depth = 1 + // If the queue is: + // Older Events (front of the queue) + // | D | + // | A | -> Should be removed, the msg has expired and overriden. + // | A | -> Should be removed + // | B | -> Should be removed | D | + // | C | | C | + // | B | | B | + // | A | ---> | A | + // | B | | B | + // Newer Events After pruning + // + // Even with this prunning method, after pruning we might still have more events + // in the queue than the limit set on the queue, if for example a subscription has a + // qos->depth bigger than the queue limit, and all the messages in the queue + // belonged to that subscription + std::deque::reverse_iterator rit = event_queue_.rbegin(); + + while (rit != event_queue_.rend()) { + auto event = *rit; + + switch (event.type) { + case SUBSCRIPTION_EVENT: + { + bool limit_reached = subscription_events_reached_limit(event.entity); + + if (limit_reached) { + // Remove oldest events + rit = decltype(rit)(event_queue_.erase(std::next(rit).base())); + } else { + rit++; + } + break; + } + + case SERVICE_EVENT: + case CLIENT_EVENT: + { + break; + } + + case WAITABLE_EVENT: + { + bool limit_reached = waitable_events_reached_limit(event.entity); + + if (limit_reached) { + // Remove oldest events + rit = decltype(rit)(event_queue_.erase(std::next(rit).base())); + } else { + rit++; + } + break; + } + } + } + } + + bool subscription_events_reached_limit(const void * subscription_id) + { + size_t limit = entities_collector_->get_subscription_qos_depth(subscription_id); + + // If there's no limit, return false + if (!limit) { + return false; + } + + auto it = subscription_events_in_queue_map_.find(subscription_id); + + if (it != subscription_events_in_queue_map_.end()) { + size_t & subscription_events_in_queue = it->second; + + if (subscription_events_in_queue < limit) { + // Add one event as we still didn't reach the limit + subscription_events_in_queue++; + return false; + } else { + return true; + } + } + + // If the subscription_id is not present in the map, add it with one event in the counter + subscription_events_in_queue_map_.emplace(subscription_id, 1); + return false; + } + + bool waitable_events_reached_limit(const void * waitable_id) + { + auto limit = entities_collector_->get_waitable_qos_depth(waitable_id); + + // If there's no limit, return false + if (!limit) { + return false; + } + + auto it = waitable_events_in_queue_map_.find(waitable_id); + + if (it != waitable_events_in_queue_map_.end()) { + size_t & waitable_events_in_queue = it->second; + + if (waitable_events_in_queue < limit) { + // Add one event as we still didn't reach the limit + waitable_events_in_queue++; + return false; + } else { + return true; + } + } + + // If the waitable_id is not present in the map, add it with one event in the counter + waitable_events_in_queue_map_.emplace(waitable_id, 1); + return false; + } + + // Variables + std::deque event_queue_; + size_t queue_size_limit_; + + // Maps: entity identifiers to number of events in the queue + using EventsInQueueMap = std::unordered_map; + EventsInQueueMap subscription_events_in_queue_map_; + EventsInQueueMap waitable_events_in_queue_map_; + + // Entities collector associated with executor and events queue + rclcpp::executors::EventsExecutorEntitiesCollector::SharedPtr entities_collector_; +}; + +} // namespace buffers +} // namespace experimental +} // namespace rclcpp + + +#endif // RCLCPP__EXPERIMENTAL__BUFFERS__BOUNDED_EVENTS_QUEUE_HPP_ diff --git a/rclcpp/include/rclcpp/experimental/buffers/ring_buffer_implementation.hpp b/rclcpp/include/rclcpp/experimental/buffers/ring_buffer_implementation.hpp index ad01946193..ece0e379dc 100644 --- a/rclcpp/include/rclcpp/experimental/buffers/ring_buffer_implementation.hpp +++ b/rclcpp/include/rclcpp/experimental/buffers/ring_buffer_implementation.hpp @@ -91,7 +91,12 @@ class RingBufferImplementation : public BufferImplementationBase if (!has_data_()) { RCLCPP_ERROR(rclcpp::get_logger("rclcpp"), "Calling dequeue on empty intra-process buffer"); - throw std::runtime_error("Calling dequeue on empty intra-process buffer"); + // This situation can happen on the EventsExecutor if we have more events in the queue + // than messages in the history cache (set by the qos_policies.depth of the subscription) + // For example if we set depth=1 and we get 2 messages really fast (so no time for processing), + // we could end up with 2 events in the queue but only 1 msg is actually stored on the buffer. + // In this case we return an empty buffer. + return BufferT(); } auto request = std::move(ring_buffer_[read_index_]); diff --git a/rclcpp/include/rclcpp/experimental/buffers/simple_events_queue.hpp b/rclcpp/include/rclcpp/experimental/buffers/simple_events_queue.hpp new file mode 100644 index 0000000000..d027527eea --- /dev/null +++ b/rclcpp/include/rclcpp/experimental/buffers/simple_events_queue.hpp @@ -0,0 +1,131 @@ +// Copyright 2021 Open Source Robotics Foundation, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +#ifndef RCLCPP__EXPERIMENTAL__BUFFERS__SIMPLE_EVENTS_QUEUE_HPP_ +#define RCLCPP__EXPERIMENTAL__BUFFERS__SIMPLE_EVENTS_QUEUE_HPP_ + +#include +#include + +#include "rclcpp/experimental/buffers/events_queue.hpp" + +namespace rclcpp +{ +namespace experimental +{ +namespace buffers +{ + +/** + * @brief This class provides a simple queue implementation + * based on a std::queue. As the objective is having a CPU peformant + * queue, it does not performs any checks about the size of + * the queue, so the queue size could grow unbounded. + * It does not implement any pruning mechanisms. + */ +class SimpleEventsQueue : public EventsQueue +{ +public: + RCLCPP_PUBLIC + ~SimpleEventsQueue() = default; + + /** + * @brief push event into the queue + * @param event The event to push into the queue + */ + RCLCPP_PUBLIC + virtual + void + push(const rmw_listener_event_t & event) + { + event_queue_.push(event); + } + + /** + * @brief removes front element from the queue + * The element removed is the "oldest" element in the queue whose + * value can be retrieved by calling member front(). + */ + RCLCPP_PUBLIC + virtual + void + pop() + { + event_queue_.pop(); + } + + /** + * @brief gets the front event from the queue + * @return the front event + */ + RCLCPP_PUBLIC + virtual + rmw_listener_event_t + front() const + { + return event_queue_.front(); + } + + /** + * @brief Test whether queue is empty + * @return true if the queue's size is 0, false otherwise. + */ + RCLCPP_PUBLIC + virtual + bool + empty() const + { + return event_queue_.empty(); + } + + /** + * @brief Initializes the queue + */ + RCLCPP_PUBLIC + virtual + void + init(rclcpp::executors::EventsExecutorEntitiesCollector::SharedPtr entities_collector) + { + (void)entities_collector; + // Make sure the queue is empty when we start + std::queue local_queue; + std::swap(event_queue_, local_queue); + } + + + /** + * @brief gets a queue with all events accumulated on it since + * the last call. The member queue is empty when the call returns. + * @return std::queue with events + */ + RCLCPP_PUBLIC + virtual + std::queue + get_all_events() + { + std::queue local_queue; + std::swap(event_queue_, local_queue); + return local_queue; + } + +private: + std::queue event_queue_; +}; + +} // namespace buffers +} // namespace experimental +} // namespace rclcpp + + +#endif // RCLCPP__EXPERIMENTAL__BUFFERS__SIMPLE_EVENTS_QUEUE_HPP_ diff --git a/rclcpp/include/rclcpp/experimental/subscription_intra_process_base.hpp b/rclcpp/include/rclcpp/experimental/subscription_intra_process_base.hpp index c41317c705..cca9a6cc46 100644 --- a/rclcpp/include/rclcpp/experimental/subscription_intra_process_base.hpp +++ b/rclcpp/include/rclcpp/experimental/subscription_intra_process_base.hpp @@ -74,6 +74,12 @@ class SubscriptionIntraProcessBase : public rclcpp::Waitable rmw_qos_profile_t get_actual_qos() const; + RCLCPP_PUBLIC + void + set_events_executor_callback( + const rclcpp::executors::EventsExecutor * executor, + rmw_listener_callback_t executor_callback) const override; + protected: std::recursive_mutex reentrant_mutex_; rcl_guard_condition_t gc_; diff --git a/rclcpp/include/rclcpp/qos_event.hpp b/rclcpp/include/rclcpp/qos_event.hpp index b3bf61cea5..d179321630 100644 --- a/rclcpp/include/rclcpp/qos_event.hpp +++ b/rclcpp/include/rclcpp/qos_event.hpp @@ -32,6 +32,11 @@ namespace rclcpp { +namespace executors +{ +class EventsExecutor; +} // namespace executors + using QOSDeadlineRequestedInfo = rmw_requested_deadline_missed_status_t; using QOSDeadlineOfferedInfo = rmw_offered_deadline_missed_status_t; using QOSLivelinessChangedInfo = rmw_liveliness_changed_status_t; @@ -102,6 +107,18 @@ class QOSEventHandlerBase : public Waitable bool is_ready(rcl_wait_set_t * wait_set) override; + /// Set EventsExecutor's callback + RCLCPP_PUBLIC + void + set_events_executor_callback( + const rclcpp::executors::EventsExecutor * executor, + rmw_listener_callback_t executor_callback) const override; + + RCLCPP_PUBLIC + virtual + rmw_qos_profile_t + get_actual_qos() const override; + protected: rcl_event_t event_handle_; size_t wait_set_event_index_; diff --git a/rclcpp/include/rclcpp/service.hpp b/rclcpp/include/rclcpp/service.hpp index 5c7411892d..35efd0cc8d 100644 --- a/rclcpp/include/rclcpp/service.hpp +++ b/rclcpp/include/rclcpp/service.hpp @@ -39,6 +39,11 @@ namespace rclcpp { +namespace executors +{ +class EventsExecutor; +} // namespace executors + class ServiceBase { public: @@ -121,6 +126,12 @@ class ServiceBase bool exchange_in_use_by_wait_set_state(bool in_use_state); + RCLCPP_PUBLIC + void + set_events_executor_callback( + const rclcpp::executors::EventsExecutor * executor, + rmw_listener_callback_t executor_callback) const; + protected: RCLCPP_DISABLE_COPY(ServiceBase) diff --git a/rclcpp/include/rclcpp/subscription_base.hpp b/rclcpp/include/rclcpp/subscription_base.hpp index 28b92ffb13..ea60a062cc 100644 --- a/rclcpp/include/rclcpp/subscription_base.hpp +++ b/rclcpp/include/rclcpp/subscription_base.hpp @@ -16,6 +16,7 @@ #define RCLCPP__SUBSCRIPTION_BASE_HPP_ #include +#include #include #include #include @@ -45,6 +46,11 @@ namespace node_interfaces class NodeBaseInterface; } // namespace node_interfaces +namespace executors +{ +class EventsExecutor; +} // namespace executors + namespace experimental { /** @@ -263,6 +269,12 @@ class SubscriptionBase : public std::enable_shared_from_this bool exchange_in_use_by_wait_set_state(void * pointer_to_subscription_part, bool in_use_state); + RCLCPP_PUBLIC + void + set_events_executor_callback( + const rclcpp::executors::EventsExecutor * executor, + rmw_listener_callback_t executor_callback) const; + protected: template void diff --git a/rclcpp/include/rclcpp/waitable.hpp b/rclcpp/include/rclcpp/waitable.hpp index 85deabda85..96940a096c 100644 --- a/rclcpp/include/rclcpp/waitable.hpp +++ b/rclcpp/include/rclcpp/waitable.hpp @@ -26,6 +26,11 @@ namespace rclcpp { +namespace executors +{ +class EventsExecutor; +} // namespace executors + class Waitable { public: @@ -203,6 +208,18 @@ class Waitable bool exchange_in_use_by_wait_set_state(bool in_use_state); + RCLCPP_PUBLIC + virtual + void + set_events_executor_callback( + const rclcpp::executors::EventsExecutor * executor, + rmw_listener_callback_t executor_callback) const; + + RCLCPP_PUBLIC + virtual + rmw_qos_profile_t + get_actual_qos() const; + private: std::atomic in_use_by_wait_set_{false}; }; // class Waitable diff --git a/rclcpp/src/rclcpp/client.cpp b/rclcpp/src/rclcpp/client.cpp index 6a1d3934d4..789919c0b6 100644 --- a/rclcpp/src/rclcpp/client.cpp +++ b/rclcpp/src/rclcpp/client.cpp @@ -198,3 +198,19 @@ ClientBase::exchange_in_use_by_wait_set_state(bool in_use_state) { return in_use_by_wait_set_.exchange(in_use_state); } + +void +ClientBase::set_events_executor_callback( + const rclcpp::executors::EventsExecutor * executor, + rmw_listener_callback_t executor_callback) const +{ + rcl_ret_t ret = rcl_client_set_listener_callback( + client_handle_.get(), + executor_callback, + executor, + this); + + if (RCL_RET_OK != ret) { + throw std::runtime_error("Couldn't set the EventsExecutor's callback to client"); + } +} diff --git a/rclcpp/src/rclcpp/executors/events_executor.cpp b/rclcpp/src/rclcpp/executors/events_executor.cpp new file mode 100644 index 0000000000..d0857d949e --- /dev/null +++ b/rclcpp/src/rclcpp/executors/events_executor.cpp @@ -0,0 +1,356 @@ +// Copyright 2020 Open Source Robotics Foundation, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +#include +#include +#include +#include +#include + +#include "rclcpp/exceptions/exceptions.hpp" +#include "rclcpp/executors/events_executor.hpp" + +using namespace std::chrono_literals; + +using rclcpp::executors::EventsExecutor; + +EventsExecutor::EventsExecutor( + rclcpp::experimental::buffers::EventsQueue::UniquePtr events_queue, + const rclcpp::ExecutorOptions & options) +: rclcpp::Executor(options) +{ + timers_manager_ = std::make_shared(context_); + entities_collector_ = std::make_shared(this); + entities_collector_->init(); + + + // Setup the executor notifier to wake up the executor when some guard conditions are tiggered. + // The added guard conditions are guaranteed to not go out of scope before the executor itself. + executor_notifier_ = std::make_shared(); + executor_notifier_->add_guard_condition(&shutdown_guard_condition_->get_rcl_guard_condition()); + executor_notifier_->add_guard_condition(&interrupt_guard_condition_); + executor_notifier_->set_events_executor_callback(this, &EventsExecutor::push_event); + entities_collector_->add_waitable(executor_notifier_); + + // Get ownership of the queue used to store events. + events_queue_ = std::move(events_queue); + // Init the events queue + events_queue_->init(entities_collector_); +} + +void +EventsExecutor::spin() +{ + if (spinning.exchange(true)) { + throw std::runtime_error("spin() called while already spinning"); + } + RCLCPP_SCOPE_EXIT(this->spinning.store(false);); + + // When condition variable is notified, check this predicate to proceed + auto has_event_predicate = [this]() {return !events_queue_->empty();}; + + // Local event queue to allow entities to push events while we execute them + EventQueue execution_event_queue; + + timers_manager_->start(); + + while (rclcpp::ok(context_) && spinning.load()) { + std::unique_lock push_lock(push_mutex_); + // We wait here until something has been pushed to the event queue + events_queue_cv_.wait(push_lock, has_event_predicate); + // Local event queue to allow entities to push events while we execute them + execution_event_queue = events_queue_->get_all_events(); + // Unlock the mutex + push_lock.unlock(); + // Consume all available events, this queue will be empty at the end of the function + this->consume_all_events(execution_event_queue); + } + timers_manager_->stop(); +} + +void +EventsExecutor::spin_some(std::chrono::nanoseconds max_duration) +{ + if (spinning.exchange(true)) { + throw std::runtime_error("spin_some() called while already spinning"); + } + RCLCPP_SCOPE_EXIT(this->spinning.store(false);); + + // In this context a 0 input max_duration means no duration limit + if (std::chrono::nanoseconds(0) == max_duration) { + max_duration = timers_manager_->MAX_TIME; + } + + // This function will wait until the first of the following events occur: + // - The input max_duration is elapsed + // - A timer triggers + // - An executor event is received and processed + + // When condition variable is notified, check this predicate to proceed + auto has_event_predicate = [this]() {return !events_queue_->empty();}; + + + // Select the smallest between input max_duration and timer timeout + auto next_timer_timeout = timers_manager_->get_head_timeout(); + if (next_timer_timeout < max_duration) { + max_duration = next_timer_timeout; + } + + std::unique_lock push_lock(push_mutex_); + // Wait until timeout or event + events_queue_cv_.wait_for(push_lock, max_duration, has_event_predicate); + // Local event queue to allow entities to push events while we execute them + EventQueue execution_event_queue = events_queue_->get_all_events(); + // We don't need the lock anymore + push_lock.unlock(); + + // Execute all ready timers + timers_manager_->execute_ready_timers(); + // Consume all available events, this queue will be empty at the end of the function + this->consume_all_events(execution_event_queue); +} + +void +EventsExecutor::spin_all(std::chrono::nanoseconds max_duration) +{ + if (max_duration <= 0ns) { + throw std::invalid_argument("max_duration must be positive"); + } + + if (spinning.exchange(true)) { + throw std::runtime_error("spin_some() called while already spinning"); + } + RCLCPP_SCOPE_EXIT(this->spinning.store(false);); + + // When condition variable is notified, check this predicate to proceed + auto has_event_predicate = [this]() {return !events_queue_->empty();}; + + // Local event queue to allow entities to push events while we execute them + EventQueue execution_event_queue; + + auto start = std::chrono::steady_clock::now(); + auto max_duration_not_elapsed = [max_duration, start]() { + auto elapsed_time = std::chrono::steady_clock::now() - start; + return elapsed_time < max_duration; + }; + + // Select the smallest between input max duration and timer timeout + auto next_timer_timeout = timers_manager_->get_head_timeout(); + if (next_timer_timeout < max_duration) { + max_duration = next_timer_timeout; + } + + { + // Wait once until timeout or event + std::unique_lock push_lock(push_mutex_); + events_queue_cv_.wait_for(push_lock, max_duration, has_event_predicate); + } + + auto timeout = timers_manager_->get_head_timeout(); + + // Keep executing until no more work to do or timeout expired + while (rclcpp::ok(context_) && spinning.load() && max_duration_not_elapsed()) { + std::unique_lock push_lock(push_mutex_); + execution_event_queue = events_queue_->get_all_events(); + push_lock.unlock(); + + // Exit if there is no more work to do + const bool ready_timer = timeout < 0ns; + const bool has_events = !execution_event_queue.empty(); + if (!ready_timer && !has_events) { + break; + } + + // Execute all ready work + timeout = timers_manager_->execute_ready_timers(); + this->consume_all_events(execution_event_queue); + } +} + +void +EventsExecutor::spin_once_impl(std::chrono::nanoseconds timeout) +{ + // In this context a negative input timeout means no timeout + if (timeout < 0ns) { + timeout = timers_manager_->MAX_TIME; + } + + // Select the smallest between input timeout and timer timeout + auto next_timer_timeout = timers_manager_->get_head_timeout(); + if (next_timer_timeout < timeout) { + timeout = next_timer_timeout; + } + + // When condition variable is notified, check this predicate to proceed + auto has_event_predicate = [this]() {return !events_queue_->empty();}; + + rmw_listener_event_t event; + bool has_event = false; + + { + // Wait until timeout or event arrives + std::unique_lock lock(push_mutex_); + events_queue_cv_.wait_for(lock, timeout, has_event_predicate); + + // Grab first event from queue if it exists + has_event = !events_queue_->empty(); + if (has_event) { + event = events_queue_->front(); + events_queue_->pop(); + } + } + + // If we wake up from the wait with an event, it means that it + // arrived before any of the timers expired. + if (has_event) { + this->execute_event(event); + } else { + timers_manager_->execute_head_timer(); + } +} + +void +EventsExecutor::add_node( + rclcpp::node_interfaces::NodeBaseInterface::SharedPtr node_ptr, bool notify) +{ + // This field is unused because we don't have to wake up the executor when a node is added. + (void) notify; + + // Add node to entities collector + entities_collector_->add_node(node_ptr); +} + +void +EventsExecutor::add_node(std::shared_ptr node_ptr, bool notify) +{ + this->add_node(node_ptr->get_node_base_interface(), notify); +} + +void +EventsExecutor::remove_node( + rclcpp::node_interfaces::NodeBaseInterface::SharedPtr node_ptr, bool notify) +{ + // This field is unused because we don't have to wake up the executor when a node is removed. + (void)notify; + + // Remove node from entities collector. + // This will result in un-setting all the event callbacks from its entities. + // After this function returns, this executor will not receive any more events associated + // to these entities. + entities_collector_->remove_node(node_ptr); +} + +void +EventsExecutor::remove_node(std::shared_ptr node_ptr, bool notify) +{ + this->remove_node(node_ptr->get_node_base_interface(), notify); +} + +void +EventsExecutor::consume_all_events(EventQueue & event_queue) +{ + while (!event_queue.empty()) { + rmw_listener_event_t event = event_queue.front(); + event_queue.pop(); + + this->execute_event(event); + } +} + +void +EventsExecutor::execute_event(const rmw_listener_event_t & event) +{ + switch (event.type) { + case SUBSCRIPTION_EVENT: + { + auto subscription = entities_collector_->get_subscription(event.entity); + + if (subscription) { + execute_subscription(subscription); + } + break; + } + + case SERVICE_EVENT: + { + auto service = entities_collector_->get_service(event.entity); + + if (service) { + execute_service(service); + } + break; + } + + case CLIENT_EVENT: + { + auto client = entities_collector_->get_client(event.entity); + + if (client) { + execute_client(client); + } + break; + } + + case WAITABLE_EVENT: + { + auto waitable = entities_collector_->get_waitable(event.entity); + + if (waitable) { + auto data = waitable->take_data(); + waitable->execute(data); + } + break; + } + } +} + +void +EventsExecutor::add_callback_group( + rclcpp::CallbackGroup::SharedPtr group_ptr, + rclcpp::node_interfaces::NodeBaseInterface::SharedPtr node_ptr, + bool notify) +{ + // This field is unused because we don't have to wake up + // the executor when a callback group is added. + (void)notify; + entities_collector_->add_callback_group(group_ptr, node_ptr); +} + +void +EventsExecutor::remove_callback_group( + rclcpp::CallbackGroup::SharedPtr group_ptr, bool notify) +{ + // This field is unused because we don't have to wake up + // the executor when a callback group is removed. + (void)notify; + entities_collector_->remove_callback_group(group_ptr); +} + +std::vector +EventsExecutor::get_all_callback_groups() +{ + return entities_collector_->get_all_callback_groups(); +} + +std::vector +EventsExecutor::get_manually_added_callback_groups() +{ + return entities_collector_->get_manually_added_callback_groups(); +} + +std::vector +EventsExecutor::get_automatically_added_callback_groups_from_nodes() +{ + return entities_collector_->get_automatically_added_callback_groups_from_nodes(); +} diff --git a/rclcpp/src/rclcpp/executors/events_executor_entities_collector.cpp b/rclcpp/src/rclcpp/executors/events_executor_entities_collector.cpp new file mode 100644 index 0000000000..0d05ba9b76 --- /dev/null +++ b/rclcpp/src/rclcpp/executors/events_executor_entities_collector.cpp @@ -0,0 +1,631 @@ +// Copyright 2020 Open Source Robotics Foundation, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +#include +#include +#include +#include + +#include "rclcpp/executors/events_executor.hpp" +#include "rclcpp/executors/events_executor_entities_collector.hpp" + +using rclcpp::executors::EventsExecutorEntitiesCollector; + +EventsExecutorEntitiesCollector::EventsExecutorEntitiesCollector( + EventsExecutor * executor) +{ + if (executor == nullptr) { + throw std::runtime_error("Received NULL executor in EventsExecutorEntitiesCollector."); + } + + associated_executor_ = executor; + timers_manager_ = associated_executor_->timers_manager_; +} + +EventsExecutorEntitiesCollector::~EventsExecutorEntitiesCollector() +{ + // Disassociate all callback groups + for (const auto & pair : weak_groups_associated_with_executor_to_nodes_) { + auto group = pair.first.lock(); + if (group) { + std::atomic_bool & has_executor = group->get_associated_with_executor_atomic(); + has_executor.store(false); + unset_callback_group_entities_callbacks(group); + } + } + for (const auto & pair : weak_groups_to_nodes_associated_with_executor_) { + auto group = pair.first.lock(); + if (group) { + std::atomic_bool & has_executor = group->get_associated_with_executor_atomic(); + has_executor.store(false); + unset_callback_group_entities_callbacks(group); + } + } + + // Disassociate all nodes + for (const auto & weak_node : weak_nodes_) { + auto node = weak_node.lock(); + if (node) { + std::atomic_bool & has_executor = node->get_associated_with_executor_atomic(); + has_executor.store(false); + } + } + + // Unset nodes notify guard condition executor callback + for (const auto & pair : weak_nodes_to_guard_conditions_) { + auto node = pair.first.lock(); + if (node) { + auto node_gc = pair.second; + unset_guard_condition_callback(node_gc); + } + } + + // Clear all containers + weak_nodes_.clear(); + weak_clients_map_.clear(); + weak_services_map_.clear(); + weak_waitables_map_.clear(); + weak_subscriptions_map_.clear(); + qos_depth_waitables_map_.clear(); + qos_depth_subscriptions_map_.clear(); + weak_nodes_to_guard_conditions_.clear(); + weak_groups_associated_with_executor_to_nodes_.clear(); + weak_groups_to_nodes_associated_with_executor_.clear(); +} + +void +EventsExecutorEntitiesCollector::init() +{ + // Add the EventsExecutorEntitiesCollector shared_ptr to waitables map + add_waitable(this->shared_from_this()); +} + +void +EventsExecutorEntitiesCollector::add_node( + rclcpp::node_interfaces::NodeBaseInterface::SharedPtr node_ptr) +{ + // Check if the node already has an executor and if not, set this to true + std::atomic_bool & has_executor = node_ptr->get_associated_with_executor_atomic(); + if (has_executor.exchange(true)) { + throw std::runtime_error("Node has already been added to an executor."); + } + + // Get node callback groups, add them to weak_groups_to_nodes_associated_with_executor_ + for (const auto & weak_group : node_ptr->get_callback_groups()) { + auto group_ptr = weak_group.lock(); + if (group_ptr != nullptr && !group_ptr->get_associated_with_executor_atomic().load() && + group_ptr->automatically_add_to_executor_with_node()) + { + add_callback_group(group_ptr, node_ptr, weak_groups_to_nodes_associated_with_executor_); + } + } + + // Add node to weak_nodes_ + weak_nodes_.push_back(node_ptr); +} + +void +EventsExecutorEntitiesCollector::add_callback_group( + rclcpp::CallbackGroup::SharedPtr group_ptr, + rclcpp::node_interfaces::NodeBaseInterface::SharedPtr node_ptr) +{ + add_callback_group(group_ptr, node_ptr, weak_groups_associated_with_executor_to_nodes_); +} + +void +EventsExecutorEntitiesCollector::add_callback_group( + rclcpp::CallbackGroup::SharedPtr group_ptr, + rclcpp::node_interfaces::NodeBaseInterface::SharedPtr node_ptr, + WeakCallbackGroupsToNodesMap & weak_groups_to_nodes) +{ + // If the callback_group already has an executor, throw error + std::atomic_bool & has_executor = group_ptr->get_associated_with_executor_atomic(); + if (has_executor.exchange(true)) { + throw std::runtime_error("Callback group has already been added to an executor."); + } + + bool is_new_node = !has_node(node_ptr, weak_groups_associated_with_executor_to_nodes_) && + !has_node(node_ptr, weak_groups_to_nodes_associated_with_executor_); + + if (is_new_node) { + // Set an event callback for the node's notify guard condition, so if new entities are added + // or removed to this node we will receive an event. + set_guard_condition_callback(node_ptr->get_notify_guard_condition()); + + // Store node's notify guard condition + weak_nodes_to_guard_conditions_[node_ptr] = node_ptr->get_notify_guard_condition(); + } + + // Add callback group to weak_groups_to_node + rclcpp::CallbackGroup::WeakPtr weak_group_ptr = group_ptr; + + auto insert_info = weak_groups_to_nodes.insert(std::make_pair(weak_group_ptr, node_ptr)); + + // Throw error if the group was already registered in the executor + bool was_inserted = insert_info.second; + if (!was_inserted) { + throw std::runtime_error("Callback group was already added to executor."); + } + + // For all entities in the callback group, set their event callback + set_callback_group_entities_callbacks(group_ptr); +} + +void +EventsExecutorEntitiesCollector::execute(std::shared_ptr & data) +{ + (void)data; + // This function is called when the associated executor is notified that something changed. + // We do not know if an entity has been added or removed so we have to rebuild everything. + + timers_manager_->clear(); + + // If a registered node has a new callback group, register the group. + add_callback_groups_from_nodes_associated_to_executor(); + + // For all groups registered in the executor, set their event callbacks. + set_entities_event_callbacks_from_map(weak_groups_associated_with_executor_to_nodes_); + set_entities_event_callbacks_from_map(weak_groups_to_nodes_associated_with_executor_); +} + +void +EventsExecutorEntitiesCollector::add_callback_groups_from_nodes_associated_to_executor() +{ + // Register new callback groups added to a node while already spinning + for (const auto & weak_node : weak_nodes_) { + auto node = weak_node.lock(); + if (node) { + auto group_ptrs = node->get_callback_groups(); + std::for_each( + group_ptrs.begin(), group_ptrs.end(), + [this, node](rclcpp::CallbackGroup::WeakPtr group_ptr) + { + auto shared_group_ptr = group_ptr.lock(); + if (shared_group_ptr && shared_group_ptr->automatically_add_to_executor_with_node() && + !shared_group_ptr->get_associated_with_executor_atomic().load()) + { + add_callback_group( + shared_group_ptr, + node, + weak_groups_to_nodes_associated_with_executor_); + } + }); + } + } +} + +void +EventsExecutorEntitiesCollector::set_entities_event_callbacks_from_map( + const WeakCallbackGroupsToNodesMap & weak_groups_to_nodes) +{ + for (const auto & pair : weak_groups_to_nodes) { + auto group = pair.first.lock(); + auto node = pair.second.lock(); + if (!node || !group || !group->can_be_taken_from().load()) { + continue; + } + set_callback_group_entities_callbacks(group); + } +} + +void +EventsExecutorEntitiesCollector::set_callback_group_entities_callbacks( + rclcpp::CallbackGroup::SharedPtr group) +{ + // Timers are handled by the timers manager + group->find_timer_ptrs_if( + [this](const rclcpp::TimerBase::SharedPtr & timer) { + if (timer) { + timers_manager_->add_timer(timer); + } + return false; + }); + + // Set callbacks for all other entity types + group->find_subscription_ptrs_if( + [this](const rclcpp::SubscriptionBase::SharedPtr & subscription) { + if (subscription) { + qos_depth_subscriptions_map_.emplace( + subscription.get(), subscription->get_actual_qos().depth()); + + subscription->set_events_executor_callback( + associated_executor_, + &EventsExecutor::push_event); + + weak_subscriptions_map_.emplace(subscription.get(), subscription); + } + return false; + }); + group->find_service_ptrs_if( + [this](const rclcpp::ServiceBase::SharedPtr & service) { + if (service) { + service->set_events_executor_callback( + associated_executor_, + &EventsExecutor::push_event); + weak_services_map_.emplace(service.get(), service); + } + return false; + }); + group->find_client_ptrs_if( + [this](const rclcpp::ClientBase::SharedPtr & client) { + if (client) { + client->set_events_executor_callback( + associated_executor_, + &EventsExecutor::push_event); + weak_clients_map_.emplace(client.get(), client); + } + return false; + }); + group->find_waitable_ptrs_if( + [this](const rclcpp::Waitable::SharedPtr & waitable) { + if (waitable) { + qos_depth_waitables_map_.emplace( + waitable.get(), waitable->get_actual_qos().depth); + + waitable->set_events_executor_callback( + associated_executor_, + &EventsExecutor::push_event); + + weak_waitables_map_.emplace(waitable.get(), waitable); + } + return false; + }); +} + +void +EventsExecutorEntitiesCollector::unset_callback_group_entities_callbacks( + rclcpp::CallbackGroup::SharedPtr group) +{ + // Timers are handled by the timers manager + group->find_timer_ptrs_if( + [this](const rclcpp::TimerBase::SharedPtr & timer) { + if (timer) { + timers_manager_->remove_timer(timer); + } + return false; + }); + + // Unset callbacks for all other entity types + group->find_subscription_ptrs_if( + [this](const rclcpp::SubscriptionBase::SharedPtr & subscription) { + if (subscription) { + subscription->set_events_executor_callback(nullptr, nullptr); + weak_subscriptions_map_.erase(subscription.get()); + qos_depth_subscriptions_map_.erase(subscription.get()); + } + return false; + }); + group->find_service_ptrs_if( + [this](const rclcpp::ServiceBase::SharedPtr & service) { + if (service) { + service->set_events_executor_callback(nullptr, nullptr); + weak_services_map_.erase(service.get()); + } + return false; + }); + group->find_client_ptrs_if( + [this](const rclcpp::ClientBase::SharedPtr & client) { + if (client) { + client->set_events_executor_callback(nullptr, nullptr); + weak_clients_map_.erase(client.get()); + } + return false; + }); + group->find_waitable_ptrs_if( + [this](const rclcpp::Waitable::SharedPtr & waitable) { + if (waitable) { + waitable->set_events_executor_callback(nullptr, nullptr); + weak_waitables_map_.erase(waitable.get()); + qos_depth_waitables_map_.erase(waitable.get()); + } + return false; + }); +} + +void +EventsExecutorEntitiesCollector::remove_callback_group( + rclcpp::CallbackGroup::SharedPtr group_ptr) +{ + this->remove_callback_group_from_map( + group_ptr, + weak_groups_associated_with_executor_to_nodes_); +} + +void +EventsExecutorEntitiesCollector::remove_callback_group_from_map( + rclcpp::CallbackGroup::SharedPtr group_ptr, + WeakCallbackGroupsToNodesMap & weak_groups_to_nodes) +{ + rclcpp::node_interfaces::NodeBaseInterface::SharedPtr node_ptr; + rclcpp::CallbackGroup::WeakPtr weak_group_ptr = group_ptr; + + // Look for the group to remove in the map + auto iter = weak_groups_to_nodes.find(weak_group_ptr); + if (iter == weak_groups_to_nodes.end()) { + // Group not found. + throw std::runtime_error("Callback group needs to be associated with this executor."); + } + + // Group found, get its associated node. + node_ptr = iter->second.lock(); + if (node_ptr == nullptr) { + throw std::runtime_error("Node must not be deleted before its callback group(s)."); + } + // Remove group from map + weak_groups_to_nodes.erase(iter); + + // For all the entities in the group, unset their callbacks + unset_callback_group_entities_callbacks(group_ptr); + + // Check if this node still has other callback groups associated with the executor + bool node_has_associated_callback_groups = + has_node(node_ptr, weak_groups_associated_with_executor_to_nodes_) || + has_node(node_ptr, weak_groups_to_nodes_associated_with_executor_); + + if (!node_has_associated_callback_groups) { + // Node doesn't have more callback groups associated to the executor. + // Unset the event callback for the node's notify guard condition, to stop + // receiving events if entities are added or removed to this node. + unset_guard_condition_callback(node_ptr->get_notify_guard_condition()); + + // Remove guard condition from list + rclcpp::node_interfaces::NodeBaseInterface::WeakPtr weak_node_ptr(node_ptr); + weak_nodes_to_guard_conditions_.erase(weak_node_ptr); + } +} + +void +EventsExecutorEntitiesCollector::remove_node( + rclcpp::node_interfaces::NodeBaseInterface::SharedPtr node_ptr) +{ + if (!node_ptr->get_associated_with_executor_atomic().load()) { + throw std::runtime_error("Node needs to be associated with an executor."); + return; + } + // Check if this node is currently stored here + auto node_it = weak_nodes_.begin(); + while (node_it != weak_nodes_.end()) { + bool matched = (node_it->lock() == node_ptr); + if (matched) { + weak_nodes_.erase(node_it); + break; + } + ++node_it; + } + if (node_it == weak_nodes_.end()) { + // The node is not stored here + throw std::runtime_error("Tried to remove node not stored in this executor."); + return; + } + + // Find callback groups belonging to the node to remove + std::vector found_group_ptrs; + std::for_each( + weak_groups_to_nodes_associated_with_executor_.begin(), + weak_groups_to_nodes_associated_with_executor_.end(), + [&found_group_ptrs, node_ptr](std::pair key_value_pair) { + auto & weak_node_ptr = key_value_pair.second; + auto shared_node_ptr = weak_node_ptr.lock(); + auto group_ptr = key_value_pair.first.lock(); + if (shared_node_ptr == node_ptr) { + found_group_ptrs.push_back(group_ptr); + } + }); + // Remove those callback groups + std::for_each( + found_group_ptrs.begin(), found_group_ptrs.end(), [this] + (rclcpp::CallbackGroup::SharedPtr group_ptr) { + this->remove_callback_group_from_map( + group_ptr, + weak_groups_to_nodes_associated_with_executor_); + }); + + // Set that the node does not have an executor anymore + std::atomic_bool & has_executor = node_ptr->get_associated_with_executor_atomic(); + has_executor.store(false); +} + +// Returns true if the map has the node_ptr +bool +EventsExecutorEntitiesCollector::has_node( + const rclcpp::node_interfaces::NodeBaseInterface::SharedPtr node_ptr, + const WeakCallbackGroupsToNodesMap & weak_groups_to_nodes) const +{ + return std::find_if( + weak_groups_to_nodes.begin(), + weak_groups_to_nodes.end(), + [&](const WeakCallbackGroupsToNodesMap::value_type & other) -> bool { + auto other_ptr = other.second.lock(); + return other_ptr == node_ptr; + }) != weak_groups_to_nodes.end(); +} + +std::vector +EventsExecutorEntitiesCollector::get_all_callback_groups() +{ + std::vector groups; + for (const auto & group_node_ptr : weak_groups_associated_with_executor_to_nodes_) { + groups.push_back(group_node_ptr.first); + } + for (const auto & group_node_ptr : weak_groups_to_nodes_associated_with_executor_) { + groups.push_back(group_node_ptr.first); + } + return groups; +} + +std::vector +EventsExecutorEntitiesCollector::get_manually_added_callback_groups() +{ + std::vector groups; + for (const auto & group_node_ptr : weak_groups_associated_with_executor_to_nodes_) { + groups.push_back(group_node_ptr.first); + } + return groups; +} + +std::vector +EventsExecutorEntitiesCollector::get_automatically_added_callback_groups_from_nodes() +{ + std::vector groups; + for (const auto & group_node_ptr : weak_groups_to_nodes_associated_with_executor_) { + groups.push_back(group_node_ptr.first); + } + return groups; +} + +void +EventsExecutorEntitiesCollector::set_guard_condition_callback( + const rcl_guard_condition_t * guard_condition) +{ + rcl_ret_t ret = rcl_guard_condition_set_listener_callback( + guard_condition, + &EventsExecutor::push_event, + associated_executor_, + this, + false /* Discard previous events */); + + if (ret != RCL_RET_OK) { + throw std::runtime_error("Couldn't set guard condition event callback"); + } +} + +void +EventsExecutorEntitiesCollector::unset_guard_condition_callback( + const rcl_guard_condition_t * guard_condition) +{ + rcl_ret_t ret = rcl_guard_condition_set_listener_callback( + guard_condition, + nullptr, + nullptr, + nullptr, + false /* Discard previous events */); + + if (ret != RCL_RET_OK) { + throw std::runtime_error("Couldn't unset guard condition event callback"); + } +} + +rclcpp::SubscriptionBase::SharedPtr +EventsExecutorEntitiesCollector::get_subscription(const void * subscription_id) +{ + auto it = weak_subscriptions_map_.find(subscription_id); + + if (it != weak_subscriptions_map_.end()) { + auto subscription_weak_ptr = it->second; + auto subscription_shared_ptr = subscription_weak_ptr.lock(); + + if (subscription_shared_ptr) { + return subscription_shared_ptr; + } + + // The subscription expired, remove from map + weak_subscriptions_map_.erase(it); + qos_depth_subscriptions_map_.erase(subscription_id); + } + return nullptr; +} + +rclcpp::ClientBase::SharedPtr +EventsExecutorEntitiesCollector::get_client(const void * client_id) +{ + auto it = weak_clients_map_.find(client_id); + + if (it != weak_clients_map_.end()) { + auto client_weak_ptr = it->second; + auto client_shared_ptr = client_weak_ptr.lock(); + + if (client_shared_ptr) { + return client_shared_ptr; + } + + // The client expired, remove from map + weak_clients_map_.erase(it); + } + return nullptr; +} + +rclcpp::ServiceBase::SharedPtr +EventsExecutorEntitiesCollector::get_service(const void * service_id) +{ + auto it = weak_services_map_.find(service_id); + + if (it != weak_services_map_.end()) { + auto service_weak_ptr = it->second; + auto service_shared_ptr = service_weak_ptr.lock(); + + if (service_shared_ptr) { + return service_shared_ptr; + } + + // The service expired, remove from map + weak_services_map_.erase(it); + } + return nullptr; +} + +rclcpp::Waitable::SharedPtr +EventsExecutorEntitiesCollector::get_waitable(const void * waitable_id) +{ + auto it = weak_waitables_map_.find(waitable_id); + + if (it != weak_waitables_map_.end()) { + auto waitable_weak_ptr = it->second; + auto waitable_shared_ptr = waitable_weak_ptr.lock(); + + if (waitable_shared_ptr) { + return waitable_shared_ptr; + } + + // The waitable expired, remove from map + weak_waitables_map_.erase(it); + qos_depth_waitables_map_.erase(waitable_id); + } + return nullptr; +} + +void +EventsExecutorEntitiesCollector::add_waitable(rclcpp::Waitable::SharedPtr waitable) +{ + qos_depth_waitables_map_.emplace(waitable.get(), waitable->get_actual_qos().depth); + weak_waitables_map_.emplace(waitable.get(), waitable); +} + +size_t +EventsExecutorEntitiesCollector::get_subscription_qos_depth(const void * subscription_id) +{ + auto it = qos_depth_subscriptions_map_.find(subscription_id); + + if (it != qos_depth_subscriptions_map_.end()) { + size_t qos_depth = it->second; + return qos_depth; + } + + // If the subscription_id is not present in the map, throw error + throw std::runtime_error("Event from subscription not registered in map!"); +} + +size_t +EventsExecutorEntitiesCollector::get_waitable_qos_depth(const void * waitable_id) +{ + auto it = qos_depth_waitables_map_.find(waitable_id); + + if (it != qos_depth_waitables_map_.end()) { + size_t qos_depth = it->second; + return qos_depth; + } + + // If the waitable_id is not present in the map, throw error + throw std::runtime_error("Event from waitable not registered in map!"); +} \ No newline at end of file diff --git a/rclcpp/src/rclcpp/executors/timers_manager.cpp b/rclcpp/src/rclcpp/executors/timers_manager.cpp new file mode 100644 index 0000000000..7e29ec0f23 --- /dev/null +++ b/rclcpp/src/rclcpp/executors/timers_manager.cpp @@ -0,0 +1,232 @@ +// Copyright 2020 Open Source Robotics Foundation, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +#include +#include + +#include "rclcpp/executors/timers_manager.hpp" + +using rclcpp::executors::TimersManager; + +constexpr std::chrono::nanoseconds TimersManager::MAX_TIME; + +TimersManager::TimersManager(std::shared_ptr context) +{ + context_ = context; +} + +TimersManager::~TimersManager() +{ + // Remove all timers + this->clear(); + + // Make sure timers thread is stopped before destroying this object + this->stop(); +} + +void TimersManager::add_timer(rclcpp::TimerBase::SharedPtr timer) +{ + if (!timer) { + throw std::invalid_argument("TimersManager::add_timer() trying to add nullptr timer"); + } + + bool added = false; + { + std::unique_lock timers_lock(timers_mutex_); + added = weak_timers_heap_.add_timer(timer); + timers_updated_ = timers_updated_ || added; + } + + if (added) { + // Notify that a timer has been added + timers_cv_.notify_one(); + } +} + +void TimersManager::start() +{ + // Make sure that the thread is not already running + if (running_.exchange(true)) { + throw std::runtime_error("TimersManager::start() can't start timers thread as already running"); + } + + timers_thread_ = std::thread(&TimersManager::run_timers, this); + pthread_setname_np(timers_thread_.native_handle(), "TimersManager"); +} + +void TimersManager::stop() +{ + // Nothing to do if the timers thread is not running + // or if another thred already signaled to stop. + if (!running_.exchange(false)) { + return; + } + + // Notify the timers manager thread to wake up + { + std::unique_lock timers_lock(timers_mutex_); + timers_updated_ = true; + } + timers_cv_.notify_one(); + + // Join timers thread if it's running + if (timers_thread_.joinable()) { + timers_thread_.join(); + } +} + +std::chrono::nanoseconds TimersManager::get_head_timeout() +{ + // Do not allow to interfere with the thread running + if (running_) { + throw std::runtime_error( + "TimersManager::get_head_timeout() can't be used while timers thread is running"); + } + + std::unique_lock lock(timers_mutex_); + TimersHeap timers_heap = weak_timers_heap_.validate_and_lock(); + return this->get_head_timeout_unsafe(timers_heap); +} + +std::chrono::nanoseconds TimersManager::execute_ready_timers() +{ + // Do not allow to interfere with the thread running + if (running_) { + throw std::runtime_error( + "TimersManager::execute_ready_timers() can't be used while timers thread is running"); + } + + std::unique_lock lock(timers_mutex_); + + TimersHeap timers_heap = weak_timers_heap_.validate_and_lock(); + this->execute_ready_timers_unsafe(timers_heap); + weak_timers_heap_.store(timers_heap); + + return this->get_head_timeout_unsafe(timers_heap); +} + +bool TimersManager::execute_head_timer() +{ + // Do not allow to interfere with the thread running + if (running_) { + throw std::runtime_error( + "TimersManager::execute_head_timer() can't be used while timers thread is running"); + } + + std::unique_lock lock(timers_mutex_); + + TimersHeap timers_heap = weak_timers_heap_.validate_and_lock(); + // Nothing to do if we don't have any timer + if (timers_heap.empty()) { + return false; + } + + TimerPtr head = timers_heap.front(); + if (head->is_ready()) { + // Head timer is ready, execute + head->execute_callback(); + timers_heap.heapify_root(); + weak_timers_heap_.store(timers_heap); + return true; + } else { + // Head timer was not ready yet + return false; + } +} + +void TimersManager::execute_ready_timers_unsafe(TimersHeap & heap) +{ + // Nothing to do if we don't have any timer + if (heap.empty()) { + return; + } + + // Keep executing timers until they are ready and they were already ready when we started. + // The second check prevents this function from blocking indefinitely if the + // time required for executing the timers is longer than their period. + + TimerPtr head = heap.front(); + auto start_time = std::chrono::steady_clock::now(); + while (head->is_ready() && this->timer_was_ready_at_tp(head, start_time)) { + // Execute head timer + head->execute_callback(); + // Executing a timer will result in updating its time_until_trigger, so re-heapify + heap.heapify_root(); + // Get new head timer + head = heap.front(); + } +} + +void TimersManager::run_timers() +{ + std::chrono::nanoseconds time_to_sleep; + { + std::unique_lock lock(timers_mutex_); + TimersHeap timers_heap = weak_timers_heap_.validate_and_lock(); + time_to_sleep = this->get_head_timeout_unsafe(timers_heap); + } + + while (rclcpp::ok(context_) && running_) { + // Lock mutex + std::unique_lock timers_lock(timers_mutex_); + + // Wait until timeout or notification that timers have been updated + timers_cv_.wait_for(timers_lock, time_to_sleep, [this]() {return timers_updated_;}); + // Reset timers updated flag + timers_updated_ = false; + + // Get ownership of timers + TimersHeap timers_heap = weak_timers_heap_.validate_and_lock(); + // Execute timers + this->execute_ready_timers_unsafe(timers_heap); + // Store updated order of elements to efficiently re-use it next iteration + weak_timers_heap_.store(timers_heap); + // Get next timeout + time_to_sleep = this->get_head_timeout_unsafe(timers_heap); + } + + // Make sure the running flag is set to false when we exit from this function + // to allow restarting the timers thread. + running_ = false; +} + +void TimersManager::clear() +{ + { + // Lock mutex and then clear all data structures + std::unique_lock timers_lock(timers_mutex_); + weak_timers_heap_.clear(); + + timers_updated_ = true; + } + + // Notify timers thread such that it can re-compute its timeout + timers_cv_.notify_one(); +} + +void TimersManager::remove_timer(TimerPtr timer) +{ + bool removed = false; + { + std::unique_lock timers_lock(timers_mutex_); + removed = weak_timers_heap_.remove_timer(timer); + + timers_updated_ = timers_updated_ || removed; + } + + if (removed) { + // Notify timers thread such that it can re-compute its timeout + timers_cv_.notify_one(); + } +} diff --git a/rclcpp/src/rclcpp/qos_event.cpp b/rclcpp/src/rclcpp/qos_event.cpp index 8af3918c48..933441e8ae 100644 --- a/rclcpp/src/rclcpp/qos_event.cpp +++ b/rclcpp/src/rclcpp/qos_event.cpp @@ -68,4 +68,29 @@ QOSEventHandlerBase::is_ready(rcl_wait_set_t * wait_set) return wait_set->events[wait_set_event_index_] == &event_handle_; } +void +QOSEventHandlerBase::set_events_executor_callback( + const rclcpp::executors::EventsExecutor * executor, + rmw_listener_callback_t executor_callback) const +{ + rcl_ret_t ret = rcl_event_set_listener_callback( + &event_handle_, + executor_callback, + executor, + this, + false /* Discard previous events */); + + if (RCL_RET_OK != ret) { + throw std::runtime_error("Couldn't set EventsExecutor's callback in QOSEventHandlerBase"); + } +} + +rmw_qos_profile_t +QOSEventHandlerBase::get_actual_qos() const +{ + rmw_qos_profile_t qos; + qos.depth = 0; + return qos; +} + } // namespace rclcpp diff --git a/rclcpp/src/rclcpp/service.cpp b/rclcpp/src/rclcpp/service.cpp index 827062bdd3..766de70826 100644 --- a/rclcpp/src/rclcpp/service.cpp +++ b/rclcpp/src/rclcpp/service.cpp @@ -84,3 +84,19 @@ ServiceBase::exchange_in_use_by_wait_set_state(bool in_use_state) { return in_use_by_wait_set_.exchange(in_use_state); } + +void +ServiceBase::set_events_executor_callback( + const rclcpp::executors::EventsExecutor * executor, + rmw_listener_callback_t executor_callback) const +{ + rcl_ret_t ret = rcl_service_set_listener_callback( + service_handle_.get(), + executor_callback, + executor, + this); + + if (RCL_RET_OK != ret) { + throw std::runtime_error("Couldn't set the EventsExecutor's callback to service"); + } +} diff --git a/rclcpp/src/rclcpp/subscription_base.cpp b/rclcpp/src/rclcpp/subscription_base.cpp index 824d0f2f85..0eb2b3a62e 100644 --- a/rclcpp/src/rclcpp/subscription_base.cpp +++ b/rclcpp/src/rclcpp/subscription_base.cpp @@ -288,3 +288,19 @@ SubscriptionBase::exchange_in_use_by_wait_set_state( } throw std::runtime_error("given pointer_to_subscription_part does not match any part"); } + +void +SubscriptionBase::set_events_executor_callback( + const rclcpp::executors::EventsExecutor * executor, + rmw_listener_callback_t executor_callback) const +{ + rcl_ret_t ret = rcl_subscription_set_listener_callback( + subscription_handle_.get(), + executor_callback, + executor, + this); + + if (RCL_RET_OK != ret) { + throw std::runtime_error("Couldn't set the EventsExecutor's callback to subscription"); + } +} diff --git a/rclcpp/src/rclcpp/subscription_intra_process_base.cpp b/rclcpp/src/rclcpp/subscription_intra_process_base.cpp index 3b951f34de..876e73ff8b 100644 --- a/rclcpp/src/rclcpp/subscription_intra_process_base.cpp +++ b/rclcpp/src/rclcpp/subscription_intra_process_base.cpp @@ -36,3 +36,20 @@ SubscriptionIntraProcessBase::get_actual_qos() const { return qos_profile_; } + +void +SubscriptionIntraProcessBase::set_events_executor_callback( + const rclcpp::executors::EventsExecutor * executor, + rmw_listener_callback_t executor_callback) const +{ + rcl_ret_t ret = rcl_guard_condition_set_listener_callback( + &gc_, + executor_callback, + executor, + this, + true /*Use previous events*/); + + if (RCL_RET_OK != ret) { + throw std::runtime_error("Couldn't set guard condition callback"); + } +} diff --git a/rclcpp/src/rclcpp/waitable.cpp b/rclcpp/src/rclcpp/waitable.cpp index b76c7215e0..38e7ff4038 100644 --- a/rclcpp/src/rclcpp/waitable.cpp +++ b/rclcpp/src/rclcpp/waitable.cpp @@ -12,6 +12,8 @@ // See the License for the specific language governing permissions and // limitations under the License. +#include + #include "rclcpp/waitable.hpp" using rclcpp::Waitable; @@ -57,3 +59,22 @@ Waitable::exchange_in_use_by_wait_set_state(bool in_use_state) { return in_use_by_wait_set_.exchange(in_use_state); } + +void +Waitable::set_events_executor_callback( + const rclcpp::executors::EventsExecutor * executor, + rmw_listener_callback_t executor_callback) const +{ + (void)executor; + (void)executor_callback; + + throw std::runtime_error( + "Custom waitables should override set_events_executor_callback() to use events executor"); +} + +rmw_qos_profile_t +Waitable::get_actual_qos() const +{ + throw std::runtime_error( + "Custom waitables should override get_actual_qos() to use events executor"); +} diff --git a/rclcpp/test/rclcpp/CMakeLists.txt b/rclcpp/test/rclcpp/CMakeLists.txt index a1005836a0..13d41a7fca 100644 --- a/rclcpp/test/rclcpp/CMakeLists.txt +++ b/rclcpp/test/rclcpp/CMakeLists.txt @@ -559,6 +559,33 @@ if(TARGET test_executors) target_link_libraries(test_executors ${PROJECT_NAME}) endif() +ament_add_gtest(test_events_executor executors/test_events_executor.cpp + APPEND_LIBRARY_DIRS "${append_library_dirs}") +if(TARGET test_events_executor) + ament_target_dependencies(test_events_executor + "rcl" + "test_msgs") + target_link_libraries(test_events_executor ${PROJECT_NAME} mimick) +endif() + +ament_add_gtest(test_events_queue executors/test_events_queue.cpp + APPEND_LIBRARY_DIRS "${append_library_dirs}") +if(TARGET test_events_queue) + ament_target_dependencies(test_events_queue + "rcl" + "test_msgs") + target_link_libraries(test_events_queue ${PROJECT_NAME} mimick) +endif() + +ament_add_gtest(test_events_executor_entities_collector executors/test_events_executor_entities_collector.cpp + APPEND_LIBRARY_DIRS "${append_library_dirs}") +if(TARGET test_events_executor_entities_collector) + ament_target_dependencies(test_events_executor_entities_collector + "rcl" + "test_msgs") + target_link_libraries(test_events_executor_entities_collector ${PROJECT_NAME} mimick) +endif() + ament_add_gtest(test_static_single_threaded_executor executors/test_static_single_threaded_executor.cpp APPEND_LIBRARY_DIRS "${append_library_dirs}") if(TARGET test_static_single_threaded_executor) @@ -584,6 +611,14 @@ if(TARGET test_static_executor_entities_collector) target_link_libraries(test_static_executor_entities_collector ${PROJECT_NAME} mimick) endif() +ament_add_gtest(test_timers_manager executors/test_timers_manager.cpp + APPEND_LIBRARY_DIRS "${append_library_dirs}") +if(TARGET test_timers_manager) + ament_target_dependencies(test_timers_manager + "rcl") + target_link_libraries(test_timers_manager ${PROJECT_NAME}) +endif() + ament_add_gtest(test_guard_condition test_guard_condition.cpp APPEND_LIBRARY_DIRS "${append_library_dirs}") if(TARGET test_guard_condition) diff --git a/rclcpp/test/rclcpp/executors/test_events_executor.cpp b/rclcpp/test/rclcpp/executors/test_events_executor.cpp new file mode 100644 index 0000000000..8fad5b40b0 --- /dev/null +++ b/rclcpp/test/rclcpp/executors/test_events_executor.cpp @@ -0,0 +1,365 @@ +// Copyright 2020 Open Source Robotics Foundation, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +#include + +#include +#include + +#include "rclcpp/executors/events_executor.hpp" + +#include "test_msgs/srv/empty.hpp" +#include "test_msgs/msg/empty.hpp" + +#include "../../mocking_utils/patch.hpp" + +using namespace std::chrono_literals; + +using rclcpp::executors::EventsExecutor; +using rclcpp::executors::EventsExecutorNotifyWaitable; + +class TestEventsExecutor : public ::testing::Test +{ +public: + void SetUp() + { + rclcpp::init(0, nullptr); + } + + void TearDown() + { + rclcpp::shutdown(); + } +}; + +TEST_F(TestEventsExecutor, notify_waitable) +{ + auto notifier = std::make_shared(); + + // Waitset methods can't be used on EventsWaitable + rcl_wait_set_t wait_set = rcl_get_zero_initialized_wait_set(); + EXPECT_THROW(notifier->add_to_wait_set(&wait_set), std::runtime_error); + EXPECT_THROW(notifier->is_ready(&wait_set), std::runtime_error); + + { + auto mock = mocking_utils::patch_and_return( + "lib:rclcpp", rcl_guard_condition_set_listener_callback, RCL_RET_ERROR); + EXPECT_THROW(std::make_shared(), std::runtime_error); + } +} + +TEST_F(TestEventsExecutor, run_clients_servers) +{ + auto node = std::make_shared("node"); + + bool request_received = false; + bool response_received = false; + auto service = + node->create_service( + "service", + [&request_received]( + const test_msgs::srv::Empty::Request::SharedPtr, + test_msgs::srv::Empty::Response::SharedPtr) + { + request_received = true; + }); + auto client = node->create_client("service"); + + EventsExecutor executor; + executor.add_node(node); + + bool spin_exited = false; + std::thread spinner([&spin_exited, &executor, this]() { + executor.spin(); + spin_exited = true; + }); + + auto request = std::make_shared(); + client->async_send_request( + request, + [&response_received](rclcpp::Client::SharedFuture result_future) { + (void)result_future; + response_received = true; + }); + + // Wait some time for the client-server to be invoked + auto start = std::chrono::steady_clock::now(); + while ( + !response_received && + !spin_exited && + (std::chrono::steady_clock::now() - start < 1s)) + { + std::this_thread::sleep_for(5ms); + } + + executor.cancel(); + spinner.join(); + executor.remove_node(node); + + EXPECT_TRUE(request_received); + EXPECT_TRUE(response_received); + EXPECT_TRUE(spin_exited); +} + +TEST_F(TestEventsExecutor, spin_once_max_duration) +{ + { + auto node = std::make_shared("node"); + + size_t t_runs = 0; + auto t = node->create_wall_timer( + 10s, + [&]() { + t_runs++; + }); + + EventsExecutor executor; + executor.add_node(node); + + auto start = std::chrono::steady_clock::now(); + executor.spin_once(10ms); + + EXPECT_EQ(0u, t_runs); + EXPECT_TRUE(std::chrono::steady_clock::now() - start < 200ms); + } + + { + auto node = std::make_shared("node"); + + size_t t_runs = 0; + auto t = node->create_wall_timer( + 10ms, + [&]() { + t_runs++; + }); + + EventsExecutor executor; + executor.add_node(node); + + auto start = std::chrono::steady_clock::now(); + executor.spin_once(10s); + + EXPECT_EQ(1u, t_runs); + EXPECT_TRUE(std::chrono::steady_clock::now() - start < 200ms); + } +} + +TEST_F(TestEventsExecutor, spin_some_max_duration) +{ + { + auto node = std::make_shared("node"); + + size_t t_runs = 0; + auto t = node->create_wall_timer( + 10s, + [&]() { + t_runs++; + }); + + EventsExecutor executor; + executor.add_node(node); + + auto start = std::chrono::steady_clock::now(); + executor.spin_some(10ms); + + EXPECT_EQ(0u, t_runs); + EXPECT_TRUE(std::chrono::steady_clock::now() - start < 200ms); + } + + { + auto node = std::make_shared("node"); + + size_t t_runs = 0; + auto t = node->create_wall_timer( + 10ms, + [&]() { + t_runs++; + }); + + EventsExecutor executor; + executor.add_node(node); + + auto start = std::chrono::steady_clock::now(); + executor.spin_some(10s); + + EXPECT_EQ(1u, t_runs); + EXPECT_TRUE(std::chrono::steady_clock::now() - start < 200ms); + } +} + +TEST_F(TestEventsExecutor, spin_some_zero_duration) +{ + auto node = std::make_shared("node"); + + size_t t_runs = 0; + auto t = node->create_wall_timer( + 20ms, + [&]() { + t_runs++; + }); + + EventsExecutor executor; + executor.add_node(node); + executor.spin_some(0ms); + + EXPECT_EQ(1u, t_runs); +} + +TEST_F(TestEventsExecutor, spin_all_max_duration) +{ + { + auto node = std::make_shared("node"); + + size_t t_runs = 0; + auto t = node->create_wall_timer( + 10s, + [&]() { + t_runs++; + }); + + EventsExecutor executor; + executor.add_node(node); + + auto start = std::chrono::steady_clock::now(); + executor.spin_all(10ms); + + EXPECT_EQ(0u, t_runs); + EXPECT_TRUE(std::chrono::steady_clock::now() - start < 200ms); + } + + { + auto node = std::make_shared("node"); + + size_t t_runs = 0; + auto t = node->create_wall_timer( + 10ms, + [&]() { + t_runs++; + }); + + EventsExecutor executor; + executor.add_node(node); + + auto start = std::chrono::steady_clock::now(); + executor.spin_all(10s); + + EXPECT_EQ(1u, t_runs); + EXPECT_TRUE(std::chrono::steady_clock::now() - start < 200ms); + } + + EventsExecutor executor; + EXPECT_THROW(executor.spin_all(0ms), std::invalid_argument); + EXPECT_THROW(executor.spin_all(-5ms), std::invalid_argument); +} + +TEST_F(TestEventsExecutor, cancel_while_timers_running) +{ + auto node = std::make_shared("node"); + + size_t t1_runs = 0; + auto t1 = node->create_wall_timer( + 1ms, + [&]() { + t1_runs++; + std::this_thread::sleep_for(25ms); + }); + + size_t t2_runs = 0; + auto t2 = node->create_wall_timer( + 1ms, + [&]() { + t2_runs++; + std::this_thread::sleep_for(25ms); + }); + + EventsExecutor executor; + executor.add_node(node); + + std::thread spinner([&executor, this]() {executor.spin();}); + + std::this_thread::sleep_for(10ms); + // Call cancel while t1 callback is still being executed + executor.cancel(); + spinner.join(); + + // Depending on the latency on the system, t2 may start to execute before cancel is signaled + EXPECT_GE(1u, t1_runs); + EXPECT_GE(1u, t2_runs); +} + +TEST_F(TestEventsExecutor, cancel_while_timers_waiting) +{ + auto node = std::make_shared("node"); + + size_t t1_runs = 0; + auto t1 = node->create_wall_timer( + 100s, + [&]() { + t1_runs++; + }); + + EventsExecutor executor; + executor.add_node(node); + + auto start = std::chrono::steady_clock::now(); + std::thread spinner([&executor, this]() {executor.spin();}); + + std::this_thread::sleep_for(10ms); + executor.cancel(); + spinner.join(); + + EXPECT_EQ(0u, t1_runs); + EXPECT_TRUE(std::chrono::steady_clock::now() - start < 1s); +} + +TEST_F(TestEventsExecutor, destroy_entities) +{ + // Create a publisher node and start publishing messages + auto node_pub = std::make_shared("node_pub"); + auto publisher = node_pub->create_publisher("topic", rclcpp::QoS(10)); + auto timer = node_pub->create_wall_timer( + 2ms, [&]() {publisher->publish(std::make_unique());}); + EventsExecutor executor_pub; + executor_pub.add_node(node_pub); + std::thread spinner([&executor_pub, this]() {executor_pub.spin();}); + + // Create a node with two different subscriptions to the topic + auto node_sub = std::make_shared("node_sub"); + size_t callback_count_1 = 0; + auto subscription_1 = + node_sub->create_subscription( + "topic", rclcpp::QoS(10), [&](test_msgs::msg::Empty::SharedPtr) {callback_count_1++;}); + size_t callback_count_2 = 0; + auto subscription_2 = + node_sub->create_subscription( + "topic", rclcpp::QoS(10), [&](test_msgs::msg::Empty::SharedPtr) {callback_count_2++;}); + EventsExecutor executor_sub; + executor_sub.add_node(node_sub); + + // Wait some time while messages are published + std::this_thread::sleep_for(10ms); + + // Destroy one of the two subscriptions + subscription_1.reset(); + + // Let subscriptions executor spin + executor_sub.spin_some(10ms); + + // The callback count of the destroyed subscription remained at 0 + EXPECT_EQ(0u, callback_count_1); + EXPECT_LT(0u, callback_count_2); + + executor_pub.cancel(); + spinner.join(); +} diff --git a/rclcpp/test/rclcpp/executors/test_events_executor_entities_collector.cpp b/rclcpp/test/rclcpp/executors/test_events_executor_entities_collector.cpp new file mode 100644 index 0000000000..3f11f983fe --- /dev/null +++ b/rclcpp/test/rclcpp/executors/test_events_executor_entities_collector.cpp @@ -0,0 +1,178 @@ +// Copyright 2020 Open Source Robotics Foundation, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +#include + +#include + +#include "rclcpp/rclcpp.hpp" + +#include "../../mocking_utils/patch.hpp" + +class TestEventsExecutorEntitiesCollector : public ::testing::Test +{ +public: + void SetUp() + { + rclcpp::init(0, nullptr); + dummy_executor_ = std::make_shared(); + entities_collector_ = + std::make_shared(dummy_executor_.get()); + } + + void TearDown() + { + rclcpp::shutdown(); + } + + rclcpp::executors::EventsExecutorEntitiesCollector::SharedPtr entities_collector_; + +private: + rclcpp::executors::EventsExecutor::SharedPtr dummy_executor_; +}; + +TEST_F(TestEventsExecutorEntitiesCollector, bad_init) +{ + EXPECT_THROW( + std::make_shared(nullptr), + std::runtime_error); +} + +TEST_F(TestEventsExecutorEntitiesCollector, add_remove_node) +{ + auto node1 = std::make_shared("node1", "ns"); + EXPECT_NO_THROW(entities_collector_->add_node(node1->get_node_base_interface())); + + // Check adding second time + EXPECT_THROW( + entities_collector_->add_node(node1->get_node_base_interface()), + std::runtime_error); + + auto node2 = std::make_shared("node2", "ns"); + EXPECT_THROW( + entities_collector_->remove_node(node2->get_node_base_interface()), + std::runtime_error); + EXPECT_NO_THROW(entities_collector_->add_node(node2->get_node_base_interface())); + + EXPECT_NO_THROW(entities_collector_->remove_node(node1->get_node_base_interface())); + EXPECT_THROW( + entities_collector_->remove_node(node1->get_node_base_interface()), + std::runtime_error); + EXPECT_NO_THROW(entities_collector_->remove_node(node2->get_node_base_interface())); + + auto node3 = std::make_shared("node3", "ns"); + node3->get_node_base_interface()->get_associated_with_executor_atomic().exchange(true); + EXPECT_THROW( + entities_collector_->remove_node(node3->get_node_base_interface()), + std::runtime_error); +} + +TEST_F(TestEventsExecutorEntitiesCollector, add_callback_group) +{ + auto node = std::make_shared("node1", "ns"); + rclcpp::CallbackGroup::SharedPtr cb_group = node->create_callback_group( + rclcpp::CallbackGroupType::MutuallyExclusive); + + entities_collector_->add_callback_group(cb_group, node->get_node_base_interface()); + ASSERT_EQ(entities_collector_->get_all_callback_groups().size(), 1u); +} + +TEST_F(TestEventsExecutorEntitiesCollector, add_callback_group_after_add_node) +{ + auto node = std::make_shared("node1", "ns"); + rclcpp::CallbackGroup::SharedPtr cb_group = node->create_callback_group( + rclcpp::CallbackGroupType::MutuallyExclusive); + + entities_collector_->add_node(node->get_node_base_interface()); + EXPECT_THROW( + entities_collector_->add_callback_group(cb_group, node->get_node_base_interface()), + std::runtime_error); +} + +TEST_F(TestEventsExecutorEntitiesCollector, add_callback_group_twice) +{ + auto node = std::make_shared("node1", "ns"); + rclcpp::CallbackGroup::SharedPtr cb_group = node->create_callback_group( + rclcpp::CallbackGroupType::MutuallyExclusive); + + entities_collector_->add_callback_group(cb_group, node->get_node_base_interface()); + ASSERT_EQ(entities_collector_->get_all_callback_groups().size(), 1u); + cb_group->get_associated_with_executor_atomic().exchange(false); + EXPECT_THROW( + entities_collector_->add_callback_group(cb_group, node->get_node_base_interface()), + std::runtime_error); +} + +TEST_F(TestEventsExecutorEntitiesCollector, remove_callback_group_after_node) +{ + auto node = std::make_shared("node1", "ns"); + rclcpp::CallbackGroup::SharedPtr cb_group = node->create_callback_group( + rclcpp::CallbackGroupType::MutuallyExclusive); + + entities_collector_->add_callback_group(cb_group, node->get_node_base_interface()); + ASSERT_EQ(entities_collector_->get_all_callback_groups().size(), 1u); + + node.reset(); + + EXPECT_THROW( + entities_collector_->remove_callback_group(cb_group), + std::runtime_error); +} + +TEST_F(TestEventsExecutorEntitiesCollector, remove_callback_group_twice) +{ + auto node = std::make_shared("node1", "ns"); + rclcpp::CallbackGroup::SharedPtr cb_group = node->create_callback_group( + rclcpp::CallbackGroupType::MutuallyExclusive); + + entities_collector_->add_callback_group(cb_group, node->get_node_base_interface()); + ASSERT_EQ(entities_collector_->get_all_callback_groups().size(), 1u); + + entities_collector_->remove_callback_group(cb_group); + + EXPECT_THROW( + entities_collector_->remove_callback_group(cb_group), + std::runtime_error); +} + +TEST_F(TestEventsExecutorEntitiesCollector, remove_node_opposite_order) +{ + auto node1 = std::make_shared("node1", "ns"); + EXPECT_NO_THROW(entities_collector_->add_node(node1->get_node_base_interface())); + + auto node2 = std::make_shared("node2", "ns"); + EXPECT_NO_THROW(entities_collector_->add_node(node2->get_node_base_interface())); + + EXPECT_NO_THROW(entities_collector_->remove_node(node2->get_node_base_interface())); +} + +TEST_F(TestEventsExecutorEntitiesCollector, test_rcl_exception) +{ + auto node1 = std::make_shared("node1", "ns"); + auto node2 = std::make_shared("node2", "ns"); + entities_collector_->add_node(node1->get_node_base_interface()); + + { + auto mock = mocking_utils::patch_and_return( + "lib:rclcpp", rcl_guard_condition_set_listener_callback, RCL_RET_ERROR); + + EXPECT_THROW( + entities_collector_->add_node(node2->get_node_base_interface()), + std::runtime_error); + + EXPECT_THROW( + entities_collector_->remove_node(node1->get_node_base_interface()), + std::runtime_error); + } +} diff --git a/rclcpp/test/rclcpp/executors/test_events_queue.cpp b/rclcpp/test/rclcpp/executors/test_events_queue.cpp new file mode 100644 index 0000000000..7996731ea2 --- /dev/null +++ b/rclcpp/test/rclcpp/executors/test_events_queue.cpp @@ -0,0 +1,77 @@ +// Copyright 2021 Open Source Robotics Foundation, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. +#include +#include + +#include + +#include "rclcpp/experimental/buffers/simple_events_queue.hpp" + +using namespace std::chrono_literals; + +class TestEventsQueue : public ::testing::Test +{ +public: + void SetUp() + { + rclcpp::init(0, nullptr); + } + + void TearDown() + { + rclcpp::shutdown(); + } +}; + +TEST_F(TestEventsQueue, SimpleQueueTest) +{ + // Create a SimpleEventsQueue and a local queue + auto simple_queue = std::make_unique(); + std::queue local_events_queue; + + // Make sure the queue is empty after init + simple_queue->init(); + EXPECT_TRUE(simple_queue->empty()); + + // Push 11 messages + for (int i = 0; i < 11; i++) { + rmw_listener_event_t stub_event; + simple_queue->push(stub_event); + } + + // Pop one message + simple_queue->pop(); + + local_events_queue = simple_queue->get_all_events(); + + // We should have (11 - 1) events in the local queue + size_t local_queue_size = local_events_queue.size(); + + // The local queue size should be 10 + EXPECT_EQ(10u, local_queue_size); + + // The simple queue should be empty after taking all events + EXPECT_TRUE(simple_queue->empty()); + + // Lets push an event into the queue and get it back + rmw_listener_event_t push_event = {simple_queue.get(), SUBSCRIPTION_EVENT}; + + simple_queue->push(push_event); + + rmw_listener_event_t front_event = simple_queue->front(); + + // The events should be equal + EXPECT_EQ(push_event.entity, front_event.entity); + EXPECT_EQ(push_event.type, front_event.type); +} diff --git a/rclcpp/test/rclcpp/executors/test_executors.cpp b/rclcpp/test/rclcpp/executors/test_executors.cpp index ad34273f68..bbb68298ee 100644 --- a/rclcpp/test/rclcpp/executors/test_executors.cpp +++ b/rclcpp/test/rclcpp/executors/test_executors.cpp @@ -90,7 +90,8 @@ using ExecutorTypes = ::testing::Types< rclcpp::executors::SingleThreadedExecutor, rclcpp::executors::MultiThreadedExecutor, - rclcpp::executors::StaticSingleThreadedExecutor>; + rclcpp::executors::StaticSingleThreadedExecutor, + rclcpp::executors::EventsExecutor>; class ExecutorTypeNames { @@ -111,6 +112,10 @@ class ExecutorTypeNames return "StaticSingleThreadedExecutor"; } + if (std::is_same()) { + return "EventsExecutor"; + } + return ""; } }; @@ -124,7 +129,8 @@ TYPED_TEST_SUITE(TestExecutors, ExecutorTypes, ExecutorTypeNames); using StandardExecutors = ::testing::Types< rclcpp::executors::SingleThreadedExecutor, - rclcpp::executors::MultiThreadedExecutor>; + rclcpp::executors::MultiThreadedExecutor, + rclcpp::executors::EventsExecutor>; TYPED_TEST_SUITE(TestExecutorsStable, StandardExecutors, ExecutorTypeNames); // Make sure that executors detach from nodes when destructing @@ -156,7 +162,7 @@ TYPED_TEST(TestExecutorsStable, addTemporaryNode) { // Sleep for a short time to verify executor.spin() is going, and didn't throw. std::thread spinner([&]() {EXPECT_NO_THROW(executor.spin());}); - std::this_thread::sleep_for(50ms); + std::this_thread::sleep_for(200ms); executor.cancel(); spinner.join(); } @@ -422,10 +428,12 @@ class TestWaitable : public rclcpp::Waitable add_to_wait_set(rcl_wait_set_t * wait_set) override { rcl_ret_t ret = rcl_wait_set_add_guard_condition(wait_set, &gc_, NULL); - if (RCL_RET_OK != ret) { - return false; - } - ret = rcl_trigger_guard_condition(&gc_); + return RCL_RET_OK == ret; + } + + bool trigger() + { + rcl_ret_t ret = rcl_trigger_guard_condition(&gc_); return RCL_RET_OK == ret; } @@ -447,7 +455,7 @@ class TestWaitable : public rclcpp::Waitable { (void) data; count_++; - std::this_thread::sleep_for(1ms); + std::this_thread::sleep_for(3ms); } size_t @@ -459,6 +467,23 @@ class TestWaitable : public rclcpp::Waitable return count_; } + void + set_events_executor_callback( + const rclcpp::executors::EventsExecutor * executor, + rmw_listener_callback_t executor_callback) const override + { + rcl_ret_t ret = rcl_guard_condition_set_listener_callback( + &gc_, + executor_callback, + executor, + this, + true /*Use previous events*/); + + if (RCL_RET_OK != ret) { + throw std::runtime_error(std::string("Couldn't set guard condition callback")); + } + } + private: size_t count_ = 0; rcl_guard_condition_t gc_; @@ -488,6 +513,7 @@ TYPED_TEST(TestExecutorsStable, spinAll) { !spin_exited && (std::chrono::steady_clock::now() - start < 1s)) { + my_waitable->trigger(); this->publisher->publish(test_msgs::msg::Empty()); std::this_thread::sleep_for(1ms); } @@ -521,14 +547,20 @@ TYPED_TEST(TestExecutorsStable, spinSome) { spin_exited = true; }); + // Give some time for executor to start spinning + // otherwise when it will start looking for work to do it will already find + // more than 1 notification + std::this_thread::sleep_for(10ms); + // Do some work until sufficient calls to the waitable occur, but keep going until either // count becomes too large, spin exits, or the 1 second timeout completes. auto start = std::chrono::steady_clock::now(); while ( - my_waitable->get_count() <= 1 && + my_waitable->get_count() <= 10 && !spin_exited && (std::chrono::steady_clock::now() - start < 1s)) { + my_waitable->trigger(); this->publisher->publish(test_msgs::msg::Empty()); std::this_thread::sleep_for(1ms); } @@ -572,6 +604,57 @@ TYPED_TEST(TestExecutorsStable, testSpinNodeUntilFutureCompleteNodePtr) { EXPECT_EQ(rclcpp::FutureReturnCode::SUCCESS, ret); } +TYPED_TEST(TestExecutorsStable, testSpinSomeWhileSpinning) { + using ExecutorType = TypeParam; + ExecutorType executor; + + std::thread spinner([&]() {executor.spin();}); + + // Wait to make sure thread started + do { + std::this_thread::sleep_for(5ms); + } while (!spinner.joinable()); + + EXPECT_THROW(executor.spin_some(1s), std::runtime_error); + + executor.cancel(); + spinner.join(); +} + +TYPED_TEST(TestExecutorsStable, testSpinAllWhileSpinning) { + using ExecutorType = TypeParam; + ExecutorType executor; + + std::thread spinner([&]() {executor.spin();}); + + // Wait to make sure thread started + do { + std::this_thread::sleep_for(5ms); + } while (!spinner.joinable()); + + EXPECT_THROW(executor.spin_all(1s), std::runtime_error); + + executor.cancel(); + spinner.join(); +} + +TYPED_TEST(TestExecutorsStable, testSpinOnceWhileSpinning) { + using ExecutorType = TypeParam; + ExecutorType executor; + + std::thread spinner([&]() {executor.spin();}); + + // Wait to make sure thread started + do { + std::this_thread::sleep_for(5ms); + } while (!spinner.joinable()); + + EXPECT_THROW(executor.spin_once(), std::runtime_error); + + executor.cancel(); + spinner.join(); +} + // Check spin_until_future_complete with node base pointer (instantiates its own executor) TEST(TestExecutors, testSpinUntilFutureCompleteNodeBasePtr) { rclcpp::init(0, nullptr); diff --git a/rclcpp/test/rclcpp/executors/test_timers_manager.cpp b/rclcpp/test/rclcpp/executors/test_timers_manager.cpp new file mode 100644 index 0000000000..ccafcd9c47 --- /dev/null +++ b/rclcpp/test/rclcpp/executors/test_timers_manager.cpp @@ -0,0 +1,435 @@ +// Copyright 2020 Open Source Robotics Foundation, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +#include + +#include +#include +#include + +#include "rclcpp/contexts/default_context.hpp" +#include "rclcpp/executors/timers_manager.hpp" + +using namespace std::chrono_literals; + +using rclcpp::executors::TimersManager; + +using CallbackT = std::function; +using TimerT = rclcpp::WallTimer; + +class TestTimersManager : public ::testing::Test +{ +public: + void SetUp() + { + rclcpp::init(0, nullptr); + } + + void TearDown() + { + rclcpp::shutdown(); + } +}; + +TEST_F(TestTimersManager, empty_manager) +{ + auto timers_manager = std::make_shared( + rclcpp::contexts::get_global_default_context()); + + EXPECT_EQ(TimersManager::MAX_TIME, timers_manager->get_head_timeout()); + EXPECT_FALSE(timers_manager->execute_head_timer()); + EXPECT_NO_THROW(timers_manager->execute_ready_timers()); + EXPECT_NO_THROW(timers_manager->clear()); + EXPECT_NO_THROW(timers_manager->start()); + EXPECT_NO_THROW(timers_manager->stop()); +} + +TEST_F(TestTimersManager, max_time) +{ + // Timers manager max time should be a very big duration that does not cause overflow + + EXPECT_LT(std::chrono::hours(50).count(), TimersManager::MAX_TIME.count()); + auto tp = std::chrono::steady_clock::now() + TimersManager::MAX_TIME; + EXPECT_TRUE(std::chrono::steady_clock::now() < tp); +} + +TEST_F(TestTimersManager, add_run_remove_timer) +{ + size_t t_runs = 0; + auto t = TimerT::make_shared( + 1ms, + [&t_runs]() { + t_runs++; + }, + rclcpp::contexts::get_global_default_context()); + std::weak_ptr t_weak = t; + + // Add the timer to the timers manager + auto timers_manager = std::make_shared( + rclcpp::contexts::get_global_default_context()); + timers_manager->add_timer(t); + + // Sleep for more 3 times the timer period + std::this_thread::sleep_for(3ms); + + // The timer is executed only once, even if we slept 3 times the period + timers_manager->execute_ready_timers(); + EXPECT_EQ(1u, t_runs); + + // Remove the timer from the manager + timers_manager->remove_timer(t); + + t.reset(); + // The timer is now not valid anymore + EXPECT_FALSE(t_weak.lock() != nullptr); +} + +TEST_F(TestTimersManager, clear) +{ + auto timers_manager = std::make_shared( + rclcpp::contexts::get_global_default_context()); + + auto t1 = TimerT::make_shared(1ms, CallbackT(), rclcpp::contexts::get_global_default_context()); + std::weak_ptr t1_weak = t1; + auto t2 = TimerT::make_shared(1ms, CallbackT(), rclcpp::contexts::get_global_default_context()); + std::weak_ptr t2_weak = t2; + + timers_manager->add_timer(t1); + timers_manager->add_timer(t2); + + EXPECT_TRUE(t1_weak.lock() != nullptr); + EXPECT_TRUE(t2_weak.lock() != nullptr); + + timers_manager->clear(); + + t1.reset(); + t2.reset(); + + EXPECT_FALSE(t1_weak.lock() != nullptr); + EXPECT_FALSE(t2_weak.lock() != nullptr); +} + +TEST_F(TestTimersManager, remove_not_existing_timer) +{ + auto timers_manager = std::make_shared( + rclcpp::contexts::get_global_default_context()); + + // Try to remove a nullptr timer + EXPECT_NO_THROW(timers_manager->remove_timer(nullptr)); + + auto t = TimerT::make_shared(1ms, CallbackT(), rclcpp::contexts::get_global_default_context()); + timers_manager->add_timer(t); + + // Remove twice the same timer + timers_manager->remove_timer(t); + EXPECT_NO_THROW(timers_manager->remove_timer(t)); +} + +TEST_F(TestTimersManager, timers_thread_exclusive_usage) +{ + auto timers_manager = std::make_shared( + rclcpp::contexts::get_global_default_context()); + + timers_manager->start(); + + EXPECT_THROW(timers_manager->start(), std::exception); + EXPECT_THROW(timers_manager->get_head_timeout(), std::exception); + EXPECT_THROW(timers_manager->execute_ready_timers(), std::exception); + EXPECT_THROW(timers_manager->execute_head_timer(), std::exception); + + timers_manager->stop(); + + EXPECT_NO_THROW(timers_manager->get_head_timeout()); + EXPECT_NO_THROW(timers_manager->execute_ready_timers()); + EXPECT_NO_THROW(timers_manager->execute_head_timer()); +} + +TEST_F(TestTimersManager, add_timer_twice) +{ + auto timers_manager = std::make_shared( + rclcpp::contexts::get_global_default_context()); + + auto t = TimerT::make_shared(1ms, CallbackT(), rclcpp::contexts::get_global_default_context()); + + timers_manager->add_timer(t); + EXPECT_NO_THROW(timers_manager->add_timer(t)); +} + +TEST_F(TestTimersManager, add_nullptr) +{ + auto timers_manager = std::make_shared( + rclcpp::contexts::get_global_default_context()); + + EXPECT_THROW(timers_manager->add_timer(nullptr), std::exception); +} + +TEST_F(TestTimersManager, head_not_ready) +{ + auto timers_manager = std::make_shared( + rclcpp::contexts::get_global_default_context()); + + size_t t_runs = 0; + auto t = TimerT::make_shared( + 10s, + [&t_runs]() { + t_runs++; + }, + rclcpp::contexts::get_global_default_context()); + + timers_manager->add_timer(t); + + // Timer will take 10s to get ready, so nothing to execute here + bool ret = timers_manager->execute_head_timer(); + EXPECT_FALSE(ret); + EXPECT_EQ(0u, t_runs); +} + +TEST_F(TestTimersManager, timers_order) +{ + auto timers_manager = std::make_shared( + rclcpp::contexts::get_global_default_context()); + + size_t t1_runs = 0; + auto t1 = TimerT::make_shared( + 1ms, + [&t1_runs]() { + t1_runs++; + }, + rclcpp::contexts::get_global_default_context()); + + size_t t2_runs = 0; + auto t2 = TimerT::make_shared( + 3ms, + [&t2_runs]() { + t2_runs++; + }, + rclcpp::contexts::get_global_default_context()); + + size_t t3_runs = 0; + auto t3 = TimerT::make_shared( + 10ms, + [&t3_runs]() { + t3_runs++; + }, + rclcpp::contexts::get_global_default_context()); + + // Add timers in a random order + timers_manager->add_timer(t2); + timers_manager->add_timer(t3); + timers_manager->add_timer(t1); + + std::this_thread::sleep_for(1ms); + timers_manager->execute_ready_timers(); + EXPECT_EQ(1u, t1_runs); + EXPECT_EQ(0u, t2_runs); + EXPECT_EQ(0u, t3_runs); + + std::this_thread::sleep_for(3ms); + timers_manager->execute_ready_timers(); + EXPECT_EQ(2u, t1_runs); + EXPECT_EQ(1u, t2_runs); + EXPECT_EQ(0u, t3_runs); + + std::this_thread::sleep_for(10ms); + timers_manager->execute_ready_timers(); + EXPECT_EQ(3u, t1_runs); + EXPECT_EQ(2u, t2_runs); + EXPECT_EQ(1u, t3_runs); + + timers_manager->remove_timer(t1); + + std::this_thread::sleep_for(3ms); + timers_manager->execute_ready_timers(); + EXPECT_EQ(3u, t1_runs); + EXPECT_EQ(3u, t2_runs); + EXPECT_EQ(1u, t3_runs); +} + +TEST_F(TestTimersManager, start_stop_timers_thread) +{ + auto timers_manager = std::make_shared( + rclcpp::contexts::get_global_default_context()); + + auto t = TimerT::make_shared(1ms, []() {}, rclcpp::contexts::get_global_default_context()); + timers_manager->add_timer(t); + + // Calling start multiple times will throw an error + EXPECT_NO_THROW(timers_manager->start()); + EXPECT_THROW(timers_manager->start(), std::exception); + + // Calling stop multiple times does not throw an error + EXPECT_NO_THROW(timers_manager->stop()); + EXPECT_NO_THROW(timers_manager->stop()); +} + +TEST_F(TestTimersManager, timers_thread) +{ + auto timers_manager = std::make_shared( + rclcpp::contexts::get_global_default_context()); + + size_t t1_runs = 0; + auto t1 = TimerT::make_shared( + 1ms, + [&t1_runs]() { + t1_runs++; + }, + rclcpp::contexts::get_global_default_context()); + + size_t t2_runs = 0; + auto t2 = TimerT::make_shared( + 1ms, + [&t2_runs]() { + t2_runs++; + }, + rclcpp::contexts::get_global_default_context()); + + // Add timers + timers_manager->add_timer(t1); + timers_manager->add_timer(t2); + + // Run timers thread for a while + timers_manager->start(); + std::this_thread::sleep_for(5ms); + timers_manager->stop(); + + EXPECT_LT(1u, t1_runs); + EXPECT_LT(1u, t2_runs); + EXPECT_EQ(t1_runs, t2_runs); +} + +TEST_F(TestTimersManager, destructor) +{ + size_t t_runs = 0; + auto t = TimerT::make_shared( + 1ms, + [&t_runs]() { + t_runs++; + }, + rclcpp::contexts::get_global_default_context()); + std::weak_ptr t_weak = t; + + // When the timers manager is destroyed, it will stop the thread + // and clear the timers + { + auto timers_manager = std::make_shared( + rclcpp::contexts::get_global_default_context()); + + timers_manager->add_timer(t); + + timers_manager->start(); + std::this_thread::sleep_for(3ms); + + EXPECT_LT(1u, t_runs); + } + + // The thread is not running anymore, so this value does not increase + size_t runs = t_runs; + std::this_thread::sleep_for(3ms); + EXPECT_EQ(runs, t_runs); + t.reset(); + EXPECT_FALSE(t_weak.lock() != nullptr); +} + +TEST_F(TestTimersManager, add_remove_while_thread_running) +{ + auto timers_manager = std::make_shared( + rclcpp::contexts::get_global_default_context()); + + size_t t1_runs = 0; + auto t1 = TimerT::make_shared( + 1ms, + [&t1_runs]() { + t1_runs++; + }, + rclcpp::contexts::get_global_default_context()); + + size_t t2_runs = 0; + auto t2 = TimerT::make_shared( + 1ms, + [&t2_runs]() { + t2_runs++; + }, + rclcpp::contexts::get_global_default_context()); + + // Add timers + timers_manager->add_timer(t1); + + // Start timers thread + timers_manager->start(); + + // After a while remove t1 and add t2 + std::this_thread::sleep_for(5ms); + timers_manager->remove_timer(t1); + size_t tmp_t1 = t1_runs; + timers_manager->add_timer(t2); + + // Wait some more time and then stop + std::this_thread::sleep_for(5ms); + timers_manager->stop(); + + // t1 has stopped running + EXPECT_EQ(tmp_t1, t1_runs); + // t2 is correctly running + EXPECT_LT(1u, t2_runs); +} + +TEST_F(TestTimersManager, infinite_loop) +{ + // This test makes sure that even if timers have a period shorter than the duration + // of their callback the functions never block indefinitely. + + auto timers_manager = std::make_shared( + rclcpp::contexts::get_global_default_context()); + + size_t t1_runs = 0; + auto t1 = TimerT::make_shared( + 1ms, + [&t1_runs]() { + t1_runs++; + std::this_thread::sleep_for(5ms); + }, + rclcpp::contexts::get_global_default_context()); + + size_t t2_runs = 0; + auto t2 = TimerT::make_shared( + 1ms, + [&t2_runs]() { + t2_runs++; + std::this_thread::sleep_for(5ms); + }, + rclcpp::contexts::get_global_default_context()); + + timers_manager->add_timer(t1); + timers_manager->add_timer(t2); + + // Sleep for enough time to trigger timers + std::this_thread::sleep_for(3ms); + timers_manager->execute_ready_timers(); + EXPECT_EQ(1u, t1_runs); + EXPECT_EQ(1u, t2_runs); + + // Due to the long execution of timer callbacks, timers are already ready + bool ret = timers_manager->execute_head_timer(); + EXPECT_TRUE(ret); + EXPECT_EQ(3u, t1_runs + t2_runs); + + // Start a timers thread + timers_manager->start(); + std::this_thread::sleep_for(10ms); + timers_manager->stop(); + + EXPECT_LT(3u, t1_runs + t2_runs); + EXPECT_LT(1u, t1_runs); + EXPECT_LT(1u, t2_runs); +} diff --git a/rclcpp/test/rclcpp/test_add_callback_groups_to_executor.cpp b/rclcpp/test/rclcpp/test_add_callback_groups_to_executor.cpp index 5c63fd5962..0d9da31613 100644 --- a/rclcpp/test/rclcpp/test_add_callback_groups_to_executor.cpp +++ b/rclcpp/test/rclcpp/test_add_callback_groups_to_executor.cpp @@ -54,7 +54,8 @@ using ExecutorTypes = ::testing::Types< rclcpp::executors::SingleThreadedExecutor, rclcpp::executors::MultiThreadedExecutor, - rclcpp::executors::StaticSingleThreadedExecutor>; + rclcpp::executors::StaticSingleThreadedExecutor, + rclcpp::executors::EventsExecutor>; class ExecutorTypeNames { @@ -75,6 +76,10 @@ class ExecutorTypeNames return "StaticSingleThreadedExecutor"; } + if (std::is_same()) { + return "EventsExecutor"; + } + return ""; } }; @@ -162,7 +167,8 @@ TYPED_TEST(TestAddCallbackGroupsToExecutor, remove_callback_groups) { */ TYPED_TEST(TestAddCallbackGroupsToExecutor, add_duplicate_callback_groups) { - rclcpp::executors::MultiThreadedExecutor executor; + using ExecutorType = TypeParam; + ExecutorType executor; auto node = std::make_shared("my_node", "/ns"); auto timer_callback = []() {}; rclcpp::CallbackGroup::SharedPtr cb_grp = node->create_callback_group( @@ -180,7 +186,8 @@ TYPED_TEST(TestAddCallbackGroupsToExecutor, add_duplicate_callback_groups) */ TYPED_TEST(TestAddCallbackGroupsToExecutor, add_callback_groups_after_add_node_to_executor) { - rclcpp::executors::MultiThreadedExecutor executor; + using ExecutorType = TypeParam; + ExecutorType executor; auto node = std::make_shared("my_node", "/ns"); executor.add_node(node->get_node_base_interface()); ASSERT_EQ(executor.get_all_callback_groups().size(), 1u); @@ -214,13 +221,14 @@ TYPED_TEST(TestAddCallbackGroupsToExecutor, add_callback_groups_after_add_node_t */ TYPED_TEST(TestAddCallbackGroupsToExecutor, add_unallowable_callback_groups) { + using ExecutorType = TypeParam; + ExecutorType executor; auto node = std::make_shared("my_node", "/ns"); auto timer_callback = []() {}; rclcpp::CallbackGroup::SharedPtr cb_grp = node->create_callback_group( rclcpp::CallbackGroupType::MutuallyExclusive, false); rclcpp::TimerBase::SharedPtr timer_ = node->create_wall_timer( 2s, timer_callback, cb_grp); - rclcpp::executors::MultiThreadedExecutor executor; executor.add_callback_group(cb_grp, node->get_node_base_interface()); ASSERT_EQ(executor.get_all_callback_groups().size(), 1u); @@ -249,14 +257,15 @@ TYPED_TEST(TestAddCallbackGroupsToExecutor, add_unallowable_callback_groups) */ TYPED_TEST(TestAddCallbackGroupsToExecutor, one_node_many_callback_groups_many_executors) { + using ExecutorType = TypeParam; auto node = std::make_shared("my_node", "/ns"); auto timer_callback = []() {}; rclcpp::CallbackGroup::SharedPtr cb_grp = node->create_callback_group( rclcpp::CallbackGroupType::MutuallyExclusive, false); rclcpp::TimerBase::SharedPtr timer_ = node->create_wall_timer( 2s, timer_callback, cb_grp); - rclcpp::executors::MultiThreadedExecutor timer_executor; - rclcpp::executors::MultiThreadedExecutor sub_executor; + ExecutorType timer_executor; + ExecutorType sub_executor; timer_executor.add_callback_group(cb_grp, node->get_node_base_interface()); const rclcpp::QoS qos(10); auto options = rclcpp::SubscriptionOptions(); @@ -285,7 +294,8 @@ TYPED_TEST(TestAddCallbackGroupsToExecutor, one_node_many_callback_groups_many_e */ TYPED_TEST(TestAddCallbackGroupsToExecutor, remove_callback_group) { - rclcpp::executors::MultiThreadedExecutor executor; + using ExecutorType = TypeParam; + ExecutorType executor; auto node = std::make_shared("my_node", "/ns"); auto timer_callback = []() {}; rclcpp::CallbackGroup::SharedPtr cb_grp = node->create_callback_group(