Skip to content

Commit 2228e75

Browse files
mjcarrollclalancettewjwwood
authored andcommitted
Utilize rclcpp::WaitSet as part of the executors (ros2#2142)
* Deprecate callback_group call taking context Signed-off-by: Michael Carroll <[email protected]> * Add base executor objects that can be used by implementors Signed-off-by: Michael Carroll <[email protected]> * Template common operations Signed-off-by: Michael Carroll <[email protected]> * Address reviewer feedback: * Add callback to EntitiesCollector constructor * Make function to check automatically added callback groups take a list Signed-off-by: Michael Carroll <[email protected]> * Lint Signed-off-by: Michael Carroll <[email protected]> * Address reviewer feedback and fix templates Signed-off-by: Michael Carroll <[email protected]> * Lint and docs Signed-off-by: Michael Carroll <[email protected]> * Make executor own the notify waitable Signed-off-by: Michael Carroll <[email protected]> * Add pending queue to collector, remove from waitable Also change node's get_guard_condition to return shared_ptr Signed-off-by: Michael Carroll <[email protected]> * Change interrupt guard condition to shared_ptr Check if guard condition is valid before adding it to the waitable Signed-off-by: Michael Carroll <[email protected]> * Lint and docs Signed-off-by: Michael Carroll <[email protected]> * Utilize rclcpp::WaitSet as part of the executors Signed-off-by: Michael Carroll <[email protected]> * Don't exchange atomic twice Signed-off-by: Michael Carroll <[email protected]> * Fix add_node and add more tests Signed-off-by: Michael Carroll <[email protected]> * Make get_notify_guard_condition follow API tick-tock Signed-off-by: Michael Carroll <[email protected]> * Improve callback group tick-tocking Signed-off-by: Michael Carroll <[email protected]> * Don't lock twice Signed-off-by: Michael Carroll <[email protected]> * Address reviewer feedback Signed-off-by: Michael Carroll <[email protected]> * Add thread safety annotations and make locks consistent Signed-off-by: Michael Carroll <[email protected]> * @wip Signed-off-by: Michael Carroll <[email protected]> * Reset callback groups for multithreaded executor Signed-off-by: Michael Carroll <[email protected]> * Avoid many small function calls when building executables Signed-off-by: Michael Carroll <[email protected]> * Re-trigger guard condition if buffer has data Signed-off-by: Michael Carroll <[email protected]> * Address reviewer feedback Signed-off-by: Michael Carroll <[email protected]> * Trace points Signed-off-by: Michael Carroll <[email protected]> * Remove tracepoints Signed-off-by: Michael Carroll <[email protected]> * Reducing diff Signed-off-by: Michael Carroll <[email protected]> * Reduce diff Signed-off-by: Michael Carroll <[email protected]> * Uncrustify Signed-off-by: Michael Carroll <[email protected]> * Restore tests Signed-off-by: Michael Carroll <[email protected]> * Back to weak_ptr and reduce test time Signed-off-by: Michael Carroll <[email protected]> * reduce diff and lint Signed-off-by: Michael Carroll <[email protected]> * Restore static single threaded tests that weren't working before Signed-off-by: Michael Carroll <[email protected]> * Restore more tests Signed-off-by: Michael Carroll <[email protected]> * Fix multithreaded test Signed-off-by: Michael Carroll <[email protected]> * Fix assert Signed-off-by: Michael Carroll <[email protected]> * Fix constructor test Signed-off-by: Michael Carroll <[email protected]> * Change ready_executables signature back Signed-off-by: Michael Carroll <[email protected]> * Don't enforce removing callback groups before nodes Signed-off-by: Michael Carroll <[email protected]> * Remove the "add_valid_node" API Signed-off-by: Michael Carroll <[email protected]> * Only notify if the trigger condition is valid Signed-off-by: Michael Carroll <[email protected]> * Only trigger if valid and needed Signed-off-by: Michael Carroll <[email protected]> * Fix spin_some/spin_all implementation Signed-off-by: Michael Carroll <[email protected]> * Restore single threaded executor Signed-off-by: Michael Carroll <[email protected]> * Picking ABI-incompatible executor changes Signed-off-by: Michael Carroll <[email protected]> * Add PIMPL Signed-off-by: Michael Carroll <[email protected]> * Additional waitset prune Signed-off-by: Michael Carroll <[email protected]> * Fix bad merge Signed-off-by: Michael Carroll <[email protected]> * Expand test timeout Signed-off-by: Michael Carroll <[email protected]> * Introduce method to clear expired entities from a collection Signed-off-by: Michael Carroll <[email protected]> * Make sure to call remove_expired_entities(). Signed-off-by: Chris Lalancette <[email protected]> * Prune queued work when callback group is removed Signed-off-by: Michael Carroll <[email protected]> * Prune subscriptions from dynamic storage Signed-off-by: Michael Carroll <[email protected]> * Styles fixes. Signed-off-by: Chris Lalancette <[email protected]> * Re-trigger guard conditions Signed-off-by: Michael Carroll <[email protected]> * Condense to just use watiable.take_data Signed-off-by: Michael Carroll <[email protected]> * Lint Signed-off-by: Michael Carroll <[email protected]> * Address reviewer comments (nits) Signed-off-by: Michael Carroll <[email protected]> * Lock mutex when copying Signed-off-by: Michael Carroll <[email protected]> * Refactors to static single threaded based on reviewers Signed-off-by: Michael Carroll <[email protected]> * More small refactoring Signed-off-by: Michael Carroll <[email protected]> * Lint Signed-off-by: Michael Carroll <[email protected]> * Lint Signed-off-by: Michael Carroll <[email protected]> * Add ready executable accessors to WaitResult Signed-off-by: Michael Carroll <[email protected]> * Make use of accessors from wait_set Signed-off-by: Michael Carroll <[email protected]> * Fix tests Signed-off-by: Michael Carroll <[email protected]> * Fix more tests Signed-off-by: Michael Carroll <[email protected]> * Tidy up single threaded executor implementation Signed-off-by: Michael Carroll <[email protected]> * Don't null out timer, rely on call Signed-off-by: Michael Carroll <[email protected]> * change how timers are checked from wait result in executors Signed-off-by: William Woodall <[email protected]> * peak -> peek Signed-off-by: William Woodall <[email protected]> * fix bug in next_waitable logic Signed-off-by: William Woodall <[email protected]> * fix bug in StaticSTE that broke the add callback groups to executor tests Signed-off-by: William Woodall <[email protected]> * style Signed-off-by: William Woodall <[email protected]> --------- Signed-off-by: Michael Carroll <[email protected]> Signed-off-by: Michael Carroll <[email protected]> Signed-off-by: Chris Lalancette <[email protected]> Signed-off-by: William Woodall <[email protected]> Co-authored-by: Chris Lalancette <[email protected]> Co-authored-by: William Woodall <[email protected]>
1 parent 77a6992 commit 2228e75

31 files changed

+976
-1537
lines changed

rclcpp/CMakeLists.txt

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -64,7 +64,6 @@ set(${PROJECT_NAME}_SRCS
6464
src/rclcpp/executors/executor_notify_waitable.cpp
6565
src/rclcpp/executors/multi_threaded_executor.cpp
6666
src/rclcpp/executors/single_threaded_executor.cpp
67-
src/rclcpp/executors/static_executor_entities_collector.cpp
6867
src/rclcpp/executors/static_single_threaded_executor.cpp
6968
src/rclcpp/expand_topic_or_service_name.cpp
7069
src/rclcpp/experimental/executors/events_executor/events_executor.cpp

rclcpp/include/rclcpp/any_executable.hpp

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -45,9 +45,9 @@ struct AnyExecutable
4545
rclcpp::ClientBase::SharedPtr client;
4646
rclcpp::Waitable::SharedPtr waitable;
4747
// These are used to keep the scope on the containing items
48-
rclcpp::CallbackGroup::SharedPtr callback_group;
49-
rclcpp::node_interfaces::NodeBaseInterface::SharedPtr node_base;
50-
std::shared_ptr<void> data;
48+
rclcpp::CallbackGroup::SharedPtr callback_group {nullptr};
49+
rclcpp::node_interfaces::NodeBaseInterface::SharedPtr node_base {nullptr};
50+
std::shared_ptr<void> data {nullptr};
5151
};
5252

5353
} // namespace rclcpp

rclcpp/include/rclcpp/callback_group.hpp

Lines changed: 25 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -185,18 +185,41 @@ class CallbackGroup
185185
* \return the number of entities in the callback group.
186186
*/
187187
RCLCPP_PUBLIC
188-
size_t size() const;
188+
size_t
189+
size() const;
189190

191+
/// Return a reference to the 'can be taken' atomic boolean.
192+
/**
193+
* The resulting bool will be true in the case that no executor is currently
194+
* using an executable entity from this group.
195+
* The resulting bool will be false in the case that an executor is currently
196+
* using an executable entity from this group, and the group policy doesn't
197+
* allow a second take (eg mutual exclusion)
198+
* \return a reference to the flag
199+
*/
190200
RCLCPP_PUBLIC
191201
std::atomic_bool &
192202
can_be_taken_from();
193203

204+
/// Get the group type.
205+
/**
206+
* \return the group type
207+
*/
194208
RCLCPP_PUBLIC
195209
const CallbackGroupType &
196210
type() const;
197211

212+
/// Collect all of the entity pointers contained in this callback group.
213+
/**
214+
* \param[in] sub_func Function to execute for each subscription
215+
* \param[in] service_func Function to execute for each service
216+
* \param[in] client_func Function to execute for each client
217+
* \param[in] timer_func Function to execute for each timer
218+
* \param[in] waitable_fuinc Function to execute for each waitable
219+
*/
198220
RCLCPP_PUBLIC
199-
void collect_all_ptrs(
221+
void
222+
collect_all_ptrs(
200223
std::function<void(const rclcpp::SubscriptionBase::SharedPtr &)> sub_func,
201224
std::function<void(const rclcpp::ServiceBase::SharedPtr &)> service_func,
202225
std::function<void(const rclcpp::ClientBase::SharedPtr &)> client_func,

rclcpp/include/rclcpp/executor.hpp

Lines changed: 29 additions & 151 deletions
Original file line numberDiff line numberDiff line change
@@ -29,26 +29,24 @@
2929

3030
#include "rcl/guard_condition.h"
3131
#include "rcl/wait.h"
32+
#include "rclcpp/executors/executor_notify_waitable.hpp"
3233
#include "rcpputils/scope_exit.hpp"
3334

3435
#include "rclcpp/context.hpp"
3536
#include "rclcpp/contexts/default_context.hpp"
3637
#include "rclcpp/guard_condition.hpp"
3738
#include "rclcpp/executor_options.hpp"
39+
#include "rclcpp/executors/executor_entities_collection.hpp"
40+
#include "rclcpp/executors/executor_entities_collector.hpp"
3841
#include "rclcpp/future_return_code.hpp"
39-
#include "rclcpp/memory_strategies.hpp"
40-
#include "rclcpp/memory_strategy.hpp"
4142
#include "rclcpp/node_interfaces/node_base_interface.hpp"
4243
#include "rclcpp/utilities.hpp"
4344
#include "rclcpp/visibility_control.hpp"
45+
#include "rclcpp/wait_set.hpp"
4446

4547
namespace rclcpp
4648
{
4749

48-
typedef std::map<rclcpp::CallbackGroup::WeakPtr,
49-
rclcpp::node_interfaces::NodeBaseInterface::WeakPtr,
50-
std::owner_less<rclcpp::CallbackGroup::WeakPtr>> WeakCallbackGroupsToNodesMap;
51-
5250
// Forward declaration is used in convenience method signature.
5351
class Node;
5452
class ExecutorImplementation;
@@ -403,17 +401,6 @@ class Executor
403401
void
404402
cancel();
405403

406-
/// Support dynamic switching of the memory strategy.
407-
/**
408-
* Switching the memory strategy while the executor is spinning in another threading could have
409-
* unintended consequences.
410-
* \param[in] memory_strategy Shared pointer to the memory strategy to set.
411-
* \throws std::runtime_error if memory_strategy is null
412-
*/
413-
RCLCPP_PUBLIC
414-
void
415-
set_memory_strategy(memory_strategy::MemoryStrategy::SharedPtr memory_strategy);
416-
417404
/// Returns true if the executor is currently spinning.
418405
/**
419406
* This function can be called asynchronously from any thread.
@@ -498,6 +485,11 @@ class Executor
498485
static void
499486
execute_client(rclcpp::ClientBase::SharedPtr client);
500487

488+
/// Gather all of the waitable entities from associated nodes and callback groups.
489+
RCLCPP_PUBLIC
490+
void
491+
collect_entities();
492+
501493
/// Block until more work becomes avilable or timeout is reached.
502494
/**
503495
* Builds a set of waitable entities, which are passed to the middleware.
@@ -509,62 +501,6 @@ class Executor
509501
void
510502
wait_for_work(std::chrono::nanoseconds timeout = std::chrono::nanoseconds(-1));
511503

512-
/// Find node associated with a callback group
513-
/**
514-
* \param[in] weak_groups_to_nodes map of callback groups to nodes
515-
* \param[in] group callback group to find assocatiated node
516-
* \return Pointer to associated node if found, else nullptr
517-
*/
518-
RCLCPP_PUBLIC
519-
rclcpp::node_interfaces::NodeBaseInterface::SharedPtr
520-
get_node_by_group(
521-
const WeakCallbackGroupsToNodesMap & weak_groups_to_nodes,
522-
rclcpp::CallbackGroup::SharedPtr group);
523-
524-
/// Return true if the node has been added to this executor.
525-
/**
526-
* \param[in] node_ptr a shared pointer that points to a node base interface
527-
* \param[in] weak_groups_to_nodes map to nodes to lookup
528-
* \return true if the node is associated with the executor, otherwise false
529-
*/
530-
RCLCPP_PUBLIC
531-
bool
532-
has_node(
533-
const rclcpp::node_interfaces::NodeBaseInterface::SharedPtr node_ptr,
534-
const WeakCallbackGroupsToNodesMap & weak_groups_to_nodes) const;
535-
536-
/// Find the callback group associated with a timer
537-
/**
538-
* \param[in] timer Timer to find associated callback group
539-
* \return Pointer to callback group node if found, else nullptr
540-
*/
541-
RCLCPP_PUBLIC
542-
rclcpp::CallbackGroup::SharedPtr
543-
get_group_by_timer(rclcpp::TimerBase::SharedPtr timer);
544-
545-
/// Add a callback group to an executor
546-
/**
547-
* \see rclcpp::Executor::add_callback_group
548-
*/
549-
RCLCPP_PUBLIC
550-
virtual void
551-
add_callback_group_to_map(
552-
rclcpp::CallbackGroup::SharedPtr group_ptr,
553-
rclcpp::node_interfaces::NodeBaseInterface::SharedPtr node_ptr,
554-
WeakCallbackGroupsToNodesMap & weak_groups_to_nodes,
555-
bool notify = true) RCPPUTILS_TSA_REQUIRES(mutex_);
556-
557-
/// Remove a callback group from the executor.
558-
/**
559-
* \see rclcpp::Executor::remove_callback_group
560-
*/
561-
RCLCPP_PUBLIC
562-
virtual void
563-
remove_callback_group_from_map(
564-
rclcpp::CallbackGroup::SharedPtr group_ptr,
565-
WeakCallbackGroupsToNodesMap & weak_groups_to_nodes,
566-
bool notify = true) RCPPUTILS_TSA_REQUIRES(mutex_);
567-
568504
/// Check for executable in ready state and populate union structure.
569505
/**
570506
* \param[out] any_executable populated union structure of ready executable
@@ -575,33 +511,6 @@ class Executor
575511
bool
576512
get_next_ready_executable(AnyExecutable & any_executable);
577513

578-
/// Check for executable in ready state and populate union structure.
579-
/**
580-
* This is the implementation of get_next_ready_executable that takes into
581-
* account the current state of callback groups' association with nodes and
582-
* executors.
583-
*
584-
* This checks in a particular order for available work:
585-
* * Timers
586-
* * Subscriptions
587-
* * Services
588-
* * Clients
589-
* * Waitable
590-
*
591-
* If the next executable is not associated with this executor/node pair,
592-
* then this method will return false.
593-
*
594-
* \param[out] any_executable populated union structure of ready executable
595-
* \param[in] weak_groups_to_nodes mapping of callback groups to nodes
596-
* \return true if an executable was ready and any_executable was populated,
597-
* otherwise false
598-
*/
599-
RCLCPP_PUBLIC
600-
bool
601-
get_next_ready_executable_from_map(
602-
AnyExecutable & any_executable,
603-
const WeakCallbackGroupsToNodesMap & weak_groups_to_nodes);
604-
605514
/// Wait for executable in ready state and populate union structure.
606515
/**
607516
* If an executable is ready, it will return immediately, otherwise
@@ -619,21 +528,6 @@ class Executor
619528
AnyExecutable & any_executable,
620529
std::chrono::nanoseconds timeout = std::chrono::nanoseconds(-1));
621530

622-
/// Add all callback groups that can be automatically added from associated nodes.
623-
/**
624-
* The executor, before collecting entities, verifies if any callback group from
625-
* nodes associated with the executor, which is not already associated to an executor,
626-
* can be automatically added to this executor.
627-
* This takes care of any callback group that has been added to a node but not explicitly added
628-
* to the executor.
629-
* It is important to note that in order for the callback groups to be automatically added to an
630-
* executor through this function, the node of the callback groups needs to have been added
631-
* through the `add_node` method.
632-
*/
633-
RCLCPP_PUBLIC
634-
virtual void
635-
add_callback_groups_from_nodes_associated_to_executor() RCPPUTILS_TSA_REQUIRES(mutex_);
636-
637531
/// Spinning state, used to prevent multi threaded calls to spin and to cancel blocking spins.
638532
std::atomic_bool spinning;
639533

@@ -643,16 +537,8 @@ class Executor
643537
/// Guard condition for signaling the rmw layer to wake up for system shutdown.
644538
std::shared_ptr<rclcpp::GuardCondition> shutdown_guard_condition_;
645539

646-
/// Wait set for managing entities that the rmw layer waits on.
647-
rcl_wait_set_t wait_set_ = rcl_get_zero_initialized_wait_set();
648-
649-
// Mutex to protect the subsequent memory_strategy_.
650540
mutable std::mutex mutex_;
651541

652-
/// The memory strategy: an interface for handling user-defined memory allocation strategies.
653-
memory_strategy::MemoryStrategy::SharedPtr
654-
memory_strategy_ RCPPUTILS_TSA_PT_GUARDED_BY(mutex_);
655-
656542
/// The context associated with this executor.
657543
std::shared_ptr<rclcpp::Context> context_;
658544

@@ -662,39 +548,31 @@ class Executor
662548
virtual void
663549
spin_once_impl(std::chrono::nanoseconds timeout);
664550

665-
typedef std::map<rclcpp::node_interfaces::NodeBaseInterface::WeakPtr,
666-
const rclcpp::GuardCondition *,
667-
std::owner_less<rclcpp::node_interfaces::NodeBaseInterface::WeakPtr>>
668-
WeakNodesToGuardConditionsMap;
669-
670-
typedef std::map<rclcpp::CallbackGroup::WeakPtr,
671-
const rclcpp::GuardCondition *,
672-
std::owner_less<rclcpp::CallbackGroup::WeakPtr>>
673-
WeakCallbackGroupsToGuardConditionsMap;
674-
675-
/// maps nodes to guard conditions
676-
WeakNodesToGuardConditionsMap
677-
weak_nodes_to_guard_conditions_ RCPPUTILS_TSA_GUARDED_BY(mutex_);
551+
/// Waitable containing guard conditions controlling the executor flow.
552+
/**
553+
* This waitable contains the interrupt and shutdown guard condition, as well
554+
* as the guard condition associated with each node and callback group.
555+
* By default, if any change is detected in the monitored entities, the notify
556+
* waitable will awake the executor and rebuild the collections.
557+
*/
558+
std::shared_ptr<rclcpp::executors::ExecutorNotifyWaitable> notify_waitable_;
678559

679-
/// maps callback groups to guard conditions
680-
WeakCallbackGroupsToGuardConditionsMap
681-
weak_groups_to_guard_conditions_ RCPPUTILS_TSA_GUARDED_BY(mutex_);
560+
std::atomic_bool entities_need_rebuild_;
682561

683-
/// maps callback groups associated to nodes
684-
WeakCallbackGroupsToNodesMap
685-
weak_groups_associated_with_executor_to_nodes_ RCPPUTILS_TSA_GUARDED_BY(mutex_);
562+
/// Collector used to associate executable entities from nodes and guard conditions
563+
rclcpp::executors::ExecutorEntitiesCollector collector_;
686564

687-
/// maps callback groups to nodes associated with executor
688-
WeakCallbackGroupsToNodesMap
689-
weak_groups_to_nodes_associated_with_executor_ RCPPUTILS_TSA_GUARDED_BY(mutex_);
565+
/// WaitSet to be waited on.
566+
rclcpp::WaitSet wait_set_ RCPPUTILS_TSA_GUARDED_BY(mutex_);
567+
std::optional<rclcpp::WaitResult<rclcpp::WaitSet>> wait_result_ RCPPUTILS_TSA_GUARDED_BY(mutex_);
690568

691-
/// maps all callback groups to nodes
692-
WeakCallbackGroupsToNodesMap
693-
weak_groups_to_nodes_ RCPPUTILS_TSA_GUARDED_BY(mutex_);
569+
/// Hold the current state of the collection being waited on by the waitset
570+
rclcpp::executors::ExecutorEntitiesCollection current_collection_ RCPPUTILS_TSA_GUARDED_BY(
571+
mutex_);
694572

695-
/// nodes that are associated with the executor
696-
std::list<rclcpp::node_interfaces::NodeBaseInterface::WeakPtr>
697-
weak_nodes_ RCPPUTILS_TSA_GUARDED_BY(mutex_);
573+
/// Hold the current state of the notify waitable being waited on by the waitset
574+
std::shared_ptr<rclcpp::executors::ExecutorNotifyWaitable> current_notify_waitable_
575+
RCPPUTILS_TSA_GUARDED_BY(mutex_);
698576

699577
/// shutdown callback handle registered to Context
700578
rclcpp::OnShutdownCallbackHandle shutdown_callback_handle_;

rclcpp/include/rclcpp/executors/executor_entities_collection.hpp

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -178,6 +178,12 @@ struct ExecutorEntitiesCollection
178178

179179
/// Clear the entities collection
180180
void clear();
181+
182+
/// Remove entities that have expired weak ownership
183+
/**
184+
* \return The total number of removed entities
185+
*/
186+
size_t remove_expired_entities();
181187
};
182188

183189
/// Build an entities collection from callback groups

rclcpp/include/rclcpp/executors/executor_notify_waitable.hpp

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -48,11 +48,11 @@ class ExecutorNotifyWaitable : public rclcpp::Waitable
4848
~ExecutorNotifyWaitable() override = default;
4949

5050
RCLCPP_PUBLIC
51-
ExecutorNotifyWaitable(const ExecutorNotifyWaitable & other);
51+
ExecutorNotifyWaitable(ExecutorNotifyWaitable & other);
5252

5353

5454
RCLCPP_PUBLIC
55-
ExecutorNotifyWaitable & operator=(const ExecutorNotifyWaitable & other);
55+
ExecutorNotifyWaitable & operator=(ExecutorNotifyWaitable & other);
5656

5757
/// Add conditions to the wait set
5858
/**

0 commit comments

Comments
 (0)