Skip to content

Commit 5632a09

Browse files
mjcarrollclalancettewjwwood
authored
Utilize rclcpp::WaitSet as part of the executors (#2142)
* Deprecate callback_group call taking context Signed-off-by: Michael Carroll <[email protected]> * Add base executor objects that can be used by implementors Signed-off-by: Michael Carroll <[email protected]> * Template common operations Signed-off-by: Michael Carroll <[email protected]> * Address reviewer feedback: * Add callback to EntitiesCollector constructor * Make function to check automatically added callback groups take a list Signed-off-by: Michael Carroll <[email protected]> * Lint Signed-off-by: Michael Carroll <[email protected]> * Address reviewer feedback and fix templates Signed-off-by: Michael Carroll <[email protected]> * Lint and docs Signed-off-by: Michael Carroll <[email protected]> * Make executor own the notify waitable Signed-off-by: Michael Carroll <[email protected]> * Add pending queue to collector, remove from waitable Also change node's get_guard_condition to return shared_ptr Signed-off-by: Michael Carroll <[email protected]> * Change interrupt guard condition to shared_ptr Check if guard condition is valid before adding it to the waitable Signed-off-by: Michael Carroll <[email protected]> * Lint and docs Signed-off-by: Michael Carroll <[email protected]> * Utilize rclcpp::WaitSet as part of the executors Signed-off-by: Michael Carroll <[email protected]> * Don't exchange atomic twice Signed-off-by: Michael Carroll <[email protected]> * Fix add_node and add more tests Signed-off-by: Michael Carroll <[email protected]> * Make get_notify_guard_condition follow API tick-tock Signed-off-by: Michael Carroll <[email protected]> * Improve callback group tick-tocking Signed-off-by: Michael Carroll <[email protected]> * Don't lock twice Signed-off-by: Michael Carroll <[email protected]> * Address reviewer feedback Signed-off-by: Michael Carroll <[email protected]> * Add thread safety annotations and make locks consistent Signed-off-by: Michael Carroll <[email protected]> * @wip Signed-off-by: Michael Carroll <[email protected]> * Reset callback groups for multithreaded executor Signed-off-by: Michael Carroll <[email protected]> * Avoid many small function calls when building executables Signed-off-by: Michael Carroll <[email protected]> * Re-trigger guard condition if buffer has data Signed-off-by: Michael Carroll <[email protected]> * Address reviewer feedback Signed-off-by: Michael Carroll <[email protected]> * Trace points Signed-off-by: Michael Carroll <[email protected]> * Remove tracepoints Signed-off-by: Michael Carroll <[email protected]> * Reducing diff Signed-off-by: Michael Carroll <[email protected]> * Reduce diff Signed-off-by: Michael Carroll <[email protected]> * Uncrustify Signed-off-by: Michael Carroll <[email protected]> * Restore tests Signed-off-by: Michael Carroll <[email protected]> * Back to weak_ptr and reduce test time Signed-off-by: Michael Carroll <[email protected]> * reduce diff and lint Signed-off-by: Michael Carroll <[email protected]> * Restore static single threaded tests that weren't working before Signed-off-by: Michael Carroll <[email protected]> * Restore more tests Signed-off-by: Michael Carroll <[email protected]> * Fix multithreaded test Signed-off-by: Michael Carroll <[email protected]> * Fix assert Signed-off-by: Michael Carroll <[email protected]> * Fix constructor test Signed-off-by: Michael Carroll <[email protected]> * Change ready_executables signature back Signed-off-by: Michael Carroll <[email protected]> * Don't enforce removing callback groups before nodes Signed-off-by: Michael Carroll <[email protected]> * Remove the "add_valid_node" API Signed-off-by: Michael Carroll <[email protected]> * Only notify if the trigger condition is valid Signed-off-by: Michael Carroll <[email protected]> * Only trigger if valid and needed Signed-off-by: Michael Carroll <[email protected]> * Fix spin_some/spin_all implementation Signed-off-by: Michael Carroll <[email protected]> * Restore single threaded executor Signed-off-by: Michael Carroll <[email protected]> * Picking ABI-incompatible executor changes Signed-off-by: Michael Carroll <[email protected]> * Add PIMPL Signed-off-by: Michael Carroll <[email protected]> * Additional waitset prune Signed-off-by: Michael Carroll <[email protected]> * Fix bad merge Signed-off-by: Michael Carroll <[email protected]> * Expand test timeout Signed-off-by: Michael Carroll <[email protected]> * Introduce method to clear expired entities from a collection Signed-off-by: Michael Carroll <[email protected]> * Make sure to call remove_expired_entities(). Signed-off-by: Chris Lalancette <[email protected]> * Prune queued work when callback group is removed Signed-off-by: Michael Carroll <[email protected]> * Prune subscriptions from dynamic storage Signed-off-by: Michael Carroll <[email protected]> * Styles fixes. Signed-off-by: Chris Lalancette <[email protected]> * Re-trigger guard conditions Signed-off-by: Michael Carroll <[email protected]> * Condense to just use watiable.take_data Signed-off-by: Michael Carroll <[email protected]> * Lint Signed-off-by: Michael Carroll <[email protected]> * Address reviewer comments (nits) Signed-off-by: Michael Carroll <[email protected]> * Lock mutex when copying Signed-off-by: Michael Carroll <[email protected]> * Refactors to static single threaded based on reviewers Signed-off-by: Michael Carroll <[email protected]> * More small refactoring Signed-off-by: Michael Carroll <[email protected]> * Lint Signed-off-by: Michael Carroll <[email protected]> * Lint Signed-off-by: Michael Carroll <[email protected]> * Add ready executable accessors to WaitResult Signed-off-by: Michael Carroll <[email protected]> * Make use of accessors from wait_set Signed-off-by: Michael Carroll <[email protected]> * Fix tests Signed-off-by: Michael Carroll <[email protected]> * Fix more tests Signed-off-by: Michael Carroll <[email protected]> * Tidy up single threaded executor implementation Signed-off-by: Michael Carroll <[email protected]> * Don't null out timer, rely on call Signed-off-by: Michael Carroll <[email protected]> * change how timers are checked from wait result in executors Signed-off-by: William Woodall <[email protected]> * peak -> peek Signed-off-by: William Woodall <[email protected]> * fix bug in next_waitable logic Signed-off-by: William Woodall <[email protected]> * fix bug in StaticSTE that broke the add callback groups to executor tests Signed-off-by: William Woodall <[email protected]> * style Signed-off-by: William Woodall <[email protected]> --------- Signed-off-by: Michael Carroll <[email protected]> Signed-off-by: Michael Carroll <[email protected]> Signed-off-by: Chris Lalancette <[email protected]> Signed-off-by: William Woodall <[email protected]> Co-authored-by: Chris Lalancette <[email protected]> Co-authored-by: William Woodall <[email protected]>
1 parent 9d5aaf5 commit 5632a09

32 files changed

+975
-2061
lines changed

rclcpp/CMakeLists.txt

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -69,7 +69,6 @@ set(${PROJECT_NAME}_SRCS
6969
src/rclcpp/executors/executor_notify_waitable.cpp
7070
src/rclcpp/executors/multi_threaded_executor.cpp
7171
src/rclcpp/executors/single_threaded_executor.cpp
72-
src/rclcpp/executors/static_executor_entities_collector.cpp
7372
src/rclcpp/executors/static_single_threaded_executor.cpp
7473
src/rclcpp/expand_topic_or_service_name.cpp
7574
src/rclcpp/experimental/executors/events_executor/events_executor.cpp

rclcpp/include/rclcpp/any_executable.hpp

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -45,9 +45,9 @@ struct AnyExecutable
4545
rclcpp::ClientBase::SharedPtr client;
4646
rclcpp::Waitable::SharedPtr waitable;
4747
// These are used to keep the scope on the containing items
48-
rclcpp::CallbackGroup::SharedPtr callback_group;
49-
rclcpp::node_interfaces::NodeBaseInterface::SharedPtr node_base;
50-
std::shared_ptr<void> data;
48+
rclcpp::CallbackGroup::SharedPtr callback_group {nullptr};
49+
rclcpp::node_interfaces::NodeBaseInterface::SharedPtr node_base {nullptr};
50+
std::shared_ptr<void> data {nullptr};
5151
};
5252

5353
} // namespace rclcpp

rclcpp/include/rclcpp/callback_group.hpp

Lines changed: 25 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -184,18 +184,41 @@ class CallbackGroup
184184
* \return the number of entities in the callback group.
185185
*/
186186
RCLCPP_PUBLIC
187-
size_t size() const;
187+
size_t
188+
size() const;
188189

190+
/// Return a reference to the 'can be taken' atomic boolean.
191+
/**
192+
* The resulting bool will be true in the case that no executor is currently
193+
* using an executable entity from this group.
194+
* The resulting bool will be false in the case that an executor is currently
195+
* using an executable entity from this group, and the group policy doesn't
196+
* allow a second take (eg mutual exclusion)
197+
* \return a reference to the flag
198+
*/
189199
RCLCPP_PUBLIC
190200
std::atomic_bool &
191201
can_be_taken_from();
192202

203+
/// Get the group type.
204+
/**
205+
* \return the group type
206+
*/
193207
RCLCPP_PUBLIC
194208
const CallbackGroupType &
195209
type() const;
196210

211+
/// Collect all of the entity pointers contained in this callback group.
212+
/**
213+
* \param[in] sub_func Function to execute for each subscription
214+
* \param[in] service_func Function to execute for each service
215+
* \param[in] client_func Function to execute for each client
216+
* \param[in] timer_func Function to execute for each timer
217+
* \param[in] waitable_fuinc Function to execute for each waitable
218+
*/
197219
RCLCPP_PUBLIC
198-
void collect_all_ptrs(
220+
void
221+
collect_all_ptrs(
199222
std::function<void(const rclcpp::SubscriptionBase::SharedPtr &)> sub_func,
200223
std::function<void(const rclcpp::ServiceBase::SharedPtr &)> service_func,
201224
std::function<void(const rclcpp::ClientBase::SharedPtr &)> client_func,

rclcpp/include/rclcpp/executor.hpp

Lines changed: 29 additions & 151 deletions
Original file line numberDiff line numberDiff line change
@@ -29,26 +29,24 @@
2929

3030
#include "rcl/guard_condition.h"
3131
#include "rcl/wait.h"
32+
#include "rclcpp/executors/executor_notify_waitable.hpp"
3233
#include "rcpputils/scope_exit.hpp"
3334

3435
#include "rclcpp/context.hpp"
3536
#include "rclcpp/contexts/default_context.hpp"
3637
#include "rclcpp/guard_condition.hpp"
3738
#include "rclcpp/executor_options.hpp"
39+
#include "rclcpp/executors/executor_entities_collection.hpp"
40+
#include "rclcpp/executors/executor_entities_collector.hpp"
3841
#include "rclcpp/future_return_code.hpp"
39-
#include "rclcpp/memory_strategies.hpp"
40-
#include "rclcpp/memory_strategy.hpp"
4142
#include "rclcpp/node_interfaces/node_base_interface.hpp"
4243
#include "rclcpp/utilities.hpp"
4344
#include "rclcpp/visibility_control.hpp"
45+
#include "rclcpp/wait_set.hpp"
4446

4547
namespace rclcpp
4648
{
4749

48-
typedef std::map<rclcpp::CallbackGroup::WeakPtr,
49-
rclcpp::node_interfaces::NodeBaseInterface::WeakPtr,
50-
std::owner_less<rclcpp::CallbackGroup::WeakPtr>> WeakCallbackGroupsToNodesMap;
51-
5250
// Forward declaration is used in convenience method signature.
5351
class Node;
5452
class ExecutorImplementation;
@@ -425,17 +423,6 @@ class Executor
425423
void
426424
cancel();
427425

428-
/// Support dynamic switching of the memory strategy.
429-
/**
430-
* Switching the memory strategy while the executor is spinning in another threading could have
431-
* unintended consequences.
432-
* \param[in] memory_strategy Shared pointer to the memory strategy to set.
433-
* \throws std::runtime_error if memory_strategy is null
434-
*/
435-
RCLCPP_PUBLIC
436-
void
437-
set_memory_strategy(memory_strategy::MemoryStrategy::SharedPtr memory_strategy);
438-
439426
/// Returns true if the executor is currently spinning.
440427
/**
441428
* This function can be called asynchronously from any thread.
@@ -520,6 +507,11 @@ class Executor
520507
static void
521508
execute_client(rclcpp::ClientBase::SharedPtr client);
522509

510+
/// Gather all of the waitable entities from associated nodes and callback groups.
511+
RCLCPP_PUBLIC
512+
void
513+
collect_entities();
514+
523515
/// Block until more work becomes avilable or timeout is reached.
524516
/**
525517
* Builds a set of waitable entities, which are passed to the middleware.
@@ -531,62 +523,6 @@ class Executor
531523
void
532524
wait_for_work(std::chrono::nanoseconds timeout = std::chrono::nanoseconds(-1));
533525

534-
/// Find node associated with a callback group
535-
/**
536-
* \param[in] weak_groups_to_nodes map of callback groups to nodes
537-
* \param[in] group callback group to find assocatiated node
538-
* \return Pointer to associated node if found, else nullptr
539-
*/
540-
RCLCPP_PUBLIC
541-
rclcpp::node_interfaces::NodeBaseInterface::SharedPtr
542-
get_node_by_group(
543-
const WeakCallbackGroupsToNodesMap & weak_groups_to_nodes,
544-
rclcpp::CallbackGroup::SharedPtr group);
545-
546-
/// Return true if the node has been added to this executor.
547-
/**
548-
* \param[in] node_ptr a shared pointer that points to a node base interface
549-
* \param[in] weak_groups_to_nodes map to nodes to lookup
550-
* \return true if the node is associated with the executor, otherwise false
551-
*/
552-
RCLCPP_PUBLIC
553-
bool
554-
has_node(
555-
const rclcpp::node_interfaces::NodeBaseInterface::SharedPtr node_ptr,
556-
const WeakCallbackGroupsToNodesMap & weak_groups_to_nodes) const;
557-
558-
/// Find the callback group associated with a timer
559-
/**
560-
* \param[in] timer Timer to find associated callback group
561-
* \return Pointer to callback group node if found, else nullptr
562-
*/
563-
RCLCPP_PUBLIC
564-
rclcpp::CallbackGroup::SharedPtr
565-
get_group_by_timer(rclcpp::TimerBase::SharedPtr timer);
566-
567-
/// Add a callback group to an executor
568-
/**
569-
* \see rclcpp::Executor::add_callback_group
570-
*/
571-
RCLCPP_PUBLIC
572-
virtual void
573-
add_callback_group_to_map(
574-
rclcpp::CallbackGroup::SharedPtr group_ptr,
575-
rclcpp::node_interfaces::NodeBaseInterface::SharedPtr node_ptr,
576-
WeakCallbackGroupsToNodesMap & weak_groups_to_nodes,
577-
bool notify = true) RCPPUTILS_TSA_REQUIRES(mutex_);
578-
579-
/// Remove a callback group from the executor.
580-
/**
581-
* \see rclcpp::Executor::remove_callback_group
582-
*/
583-
RCLCPP_PUBLIC
584-
virtual void
585-
remove_callback_group_from_map(
586-
rclcpp::CallbackGroup::SharedPtr group_ptr,
587-
WeakCallbackGroupsToNodesMap & weak_groups_to_nodes,
588-
bool notify = true) RCPPUTILS_TSA_REQUIRES(mutex_);
589-
590526
/// Check for executable in ready state and populate union structure.
591527
/**
592528
* \param[out] any_executable populated union structure of ready executable
@@ -597,33 +533,6 @@ class Executor
597533
bool
598534
get_next_ready_executable(AnyExecutable & any_executable);
599535

600-
/// Check for executable in ready state and populate union structure.
601-
/**
602-
* This is the implementation of get_next_ready_executable that takes into
603-
* account the current state of callback groups' association with nodes and
604-
* executors.
605-
*
606-
* This checks in a particular order for available work:
607-
* * Timers
608-
* * Subscriptions
609-
* * Services
610-
* * Clients
611-
* * Waitable
612-
*
613-
* If the next executable is not associated with this executor/node pair,
614-
* then this method will return false.
615-
*
616-
* \param[out] any_executable populated union structure of ready executable
617-
* \param[in] weak_groups_to_nodes mapping of callback groups to nodes
618-
* \return true if an executable was ready and any_executable was populated,
619-
* otherwise false
620-
*/
621-
RCLCPP_PUBLIC
622-
bool
623-
get_next_ready_executable_from_map(
624-
AnyExecutable & any_executable,
625-
const WeakCallbackGroupsToNodesMap & weak_groups_to_nodes);
626-
627536
/// Wait for executable in ready state and populate union structure.
628537
/**
629538
* If an executable is ready, it will return immediately, otherwise
@@ -641,21 +550,6 @@ class Executor
641550
AnyExecutable & any_executable,
642551
std::chrono::nanoseconds timeout = std::chrono::nanoseconds(-1));
643552

644-
/// Add all callback groups that can be automatically added from associated nodes.
645-
/**
646-
* The executor, before collecting entities, verifies if any callback group from
647-
* nodes associated with the executor, which is not already associated to an executor,
648-
* can be automatically added to this executor.
649-
* This takes care of any callback group that has been added to a node but not explicitly added
650-
* to the executor.
651-
* It is important to note that in order for the callback groups to be automatically added to an
652-
* executor through this function, the node of the callback groups needs to have been added
653-
* through the `add_node` method.
654-
*/
655-
RCLCPP_PUBLIC
656-
virtual void
657-
add_callback_groups_from_nodes_associated_to_executor() RCPPUTILS_TSA_REQUIRES(mutex_);
658-
659553
/// Spinning state, used to prevent multi threaded calls to spin and to cancel blocking spins.
660554
std::atomic_bool spinning;
661555

@@ -665,16 +559,8 @@ class Executor
665559
/// Guard condition for signaling the rmw layer to wake up for system shutdown.
666560
std::shared_ptr<rclcpp::GuardCondition> shutdown_guard_condition_;
667561

668-
/// Wait set for managing entities that the rmw layer waits on.
669-
rcl_wait_set_t wait_set_ = rcl_get_zero_initialized_wait_set();
670-
671-
// Mutex to protect the subsequent memory_strategy_.
672562
mutable std::mutex mutex_;
673563

674-
/// The memory strategy: an interface for handling user-defined memory allocation strategies.
675-
memory_strategy::MemoryStrategy::SharedPtr
676-
memory_strategy_ RCPPUTILS_TSA_PT_GUARDED_BY(mutex_);
677-
678564
/// The context associated with this executor.
679565
std::shared_ptr<rclcpp::Context> context_;
680566

@@ -684,39 +570,31 @@ class Executor
684570
virtual void
685571
spin_once_impl(std::chrono::nanoseconds timeout);
686572

687-
typedef std::map<rclcpp::node_interfaces::NodeBaseInterface::WeakPtr,
688-
const rclcpp::GuardCondition *,
689-
std::owner_less<rclcpp::node_interfaces::NodeBaseInterface::WeakPtr>>
690-
WeakNodesToGuardConditionsMap;
691-
692-
typedef std::map<rclcpp::CallbackGroup::WeakPtr,
693-
const rclcpp::GuardCondition *,
694-
std::owner_less<rclcpp::CallbackGroup::WeakPtr>>
695-
WeakCallbackGroupsToGuardConditionsMap;
696-
697-
/// maps nodes to guard conditions
698-
WeakNodesToGuardConditionsMap
699-
weak_nodes_to_guard_conditions_ RCPPUTILS_TSA_GUARDED_BY(mutex_);
573+
/// Waitable containing guard conditions controlling the executor flow.
574+
/**
575+
* This waitable contains the interrupt and shutdown guard condition, as well
576+
* as the guard condition associated with each node and callback group.
577+
* By default, if any change is detected in the monitored entities, the notify
578+
* waitable will awake the executor and rebuild the collections.
579+
*/
580+
std::shared_ptr<rclcpp::executors::ExecutorNotifyWaitable> notify_waitable_;
700581

701-
/// maps callback groups to guard conditions
702-
WeakCallbackGroupsToGuardConditionsMap
703-
weak_groups_to_guard_conditions_ RCPPUTILS_TSA_GUARDED_BY(mutex_);
582+
std::atomic_bool entities_need_rebuild_;
704583

705-
/// maps callback groups associated to nodes
706-
WeakCallbackGroupsToNodesMap
707-
weak_groups_associated_with_executor_to_nodes_ RCPPUTILS_TSA_GUARDED_BY(mutex_);
584+
/// Collector used to associate executable entities from nodes and guard conditions
585+
rclcpp::executors::ExecutorEntitiesCollector collector_;
708586

709-
/// maps callback groups to nodes associated with executor
710-
WeakCallbackGroupsToNodesMap
711-
weak_groups_to_nodes_associated_with_executor_ RCPPUTILS_TSA_GUARDED_BY(mutex_);
587+
/// WaitSet to be waited on.
588+
rclcpp::WaitSet wait_set_ RCPPUTILS_TSA_GUARDED_BY(mutex_);
589+
std::optional<rclcpp::WaitResult<rclcpp::WaitSet>> wait_result_ RCPPUTILS_TSA_GUARDED_BY(mutex_);
712590

713-
/// maps all callback groups to nodes
714-
WeakCallbackGroupsToNodesMap
715-
weak_groups_to_nodes_ RCPPUTILS_TSA_GUARDED_BY(mutex_);
591+
/// Hold the current state of the collection being waited on by the waitset
592+
rclcpp::executors::ExecutorEntitiesCollection current_collection_ RCPPUTILS_TSA_GUARDED_BY(
593+
mutex_);
716594

717-
/// nodes that are associated with the executor
718-
std::list<rclcpp::node_interfaces::NodeBaseInterface::WeakPtr>
719-
weak_nodes_ RCPPUTILS_TSA_GUARDED_BY(mutex_);
595+
/// Hold the current state of the notify waitable being waited on by the waitset
596+
std::shared_ptr<rclcpp::executors::ExecutorNotifyWaitable> current_notify_waitable_
597+
RCPPUTILS_TSA_GUARDED_BY(mutex_);
720598

721599
/// shutdown callback handle registered to Context
722600
rclcpp::OnShutdownCallbackHandle shutdown_callback_handle_;

rclcpp/include/rclcpp/executors/executor_entities_collection.hpp

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -178,6 +178,12 @@ struct ExecutorEntitiesCollection
178178

179179
/// Clear the entities collection
180180
void clear();
181+
182+
/// Remove entities that have expired weak ownership
183+
/**
184+
* \return The total number of removed entities
185+
*/
186+
size_t remove_expired_entities();
181187
};
182188

183189
/// Build an entities collection from callback groups

rclcpp/include/rclcpp/executors/executor_notify_waitable.hpp

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -48,11 +48,11 @@ class ExecutorNotifyWaitable : public rclcpp::Waitable
4848
~ExecutorNotifyWaitable() override = default;
4949

5050
RCLCPP_PUBLIC
51-
ExecutorNotifyWaitable(const ExecutorNotifyWaitable & other);
51+
ExecutorNotifyWaitable(ExecutorNotifyWaitable & other);
5252

5353

5454
RCLCPP_PUBLIC
55-
ExecutorNotifyWaitable & operator=(const ExecutorNotifyWaitable & other);
55+
ExecutorNotifyWaitable & operator=(ExecutorNotifyWaitable & other);
5656

5757
/// Add conditions to the wait set
5858
/**

0 commit comments

Comments
 (0)