Skip to content

Commit 8feb7cc

Browse files
mjcarrollclalancettewjwwood
authored andcommitted
Utilize rclcpp::WaitSet as part of the executors (ros2#2142)
* Deprecate callback_group call taking context Signed-off-by: Michael Carroll <[email protected]> * Add base executor objects that can be used by implementors Signed-off-by: Michael Carroll <[email protected]> * Template common operations Signed-off-by: Michael Carroll <[email protected]> * Address reviewer feedback: * Add callback to EntitiesCollector constructor * Make function to check automatically added callback groups take a list Signed-off-by: Michael Carroll <[email protected]> * Lint Signed-off-by: Michael Carroll <[email protected]> * Address reviewer feedback and fix templates Signed-off-by: Michael Carroll <[email protected]> * Lint and docs Signed-off-by: Michael Carroll <[email protected]> * Make executor own the notify waitable Signed-off-by: Michael Carroll <[email protected]> * Add pending queue to collector, remove from waitable Also change node's get_guard_condition to return shared_ptr Signed-off-by: Michael Carroll <[email protected]> * Change interrupt guard condition to shared_ptr Check if guard condition is valid before adding it to the waitable Signed-off-by: Michael Carroll <[email protected]> * Lint and docs Signed-off-by: Michael Carroll <[email protected]> * Utilize rclcpp::WaitSet as part of the executors Signed-off-by: Michael Carroll <[email protected]> * Don't exchange atomic twice Signed-off-by: Michael Carroll <[email protected]> * Fix add_node and add more tests Signed-off-by: Michael Carroll <[email protected]> * Make get_notify_guard_condition follow API tick-tock Signed-off-by: Michael Carroll <[email protected]> * Improve callback group tick-tocking Signed-off-by: Michael Carroll <[email protected]> * Don't lock twice Signed-off-by: Michael Carroll <[email protected]> * Address reviewer feedback Signed-off-by: Michael Carroll <[email protected]> * Add thread safety annotations and make locks consistent Signed-off-by: Michael Carroll <[email protected]> * @wip Signed-off-by: Michael Carroll <[email protected]> * Reset callback groups for multithreaded executor Signed-off-by: Michael Carroll <[email protected]> * Avoid many small function calls when building executables Signed-off-by: Michael Carroll <[email protected]> * Re-trigger guard condition if buffer has data Signed-off-by: Michael Carroll <[email protected]> * Address reviewer feedback Signed-off-by: Michael Carroll <[email protected]> * Trace points Signed-off-by: Michael Carroll <[email protected]> * Remove tracepoints Signed-off-by: Michael Carroll <[email protected]> * Reducing diff Signed-off-by: Michael Carroll <[email protected]> * Reduce diff Signed-off-by: Michael Carroll <[email protected]> * Uncrustify Signed-off-by: Michael Carroll <[email protected]> * Restore tests Signed-off-by: Michael Carroll <[email protected]> * Back to weak_ptr and reduce test time Signed-off-by: Michael Carroll <[email protected]> * reduce diff and lint Signed-off-by: Michael Carroll <[email protected]> * Restore static single threaded tests that weren't working before Signed-off-by: Michael Carroll <[email protected]> * Restore more tests Signed-off-by: Michael Carroll <[email protected]> * Fix multithreaded test Signed-off-by: Michael Carroll <[email protected]> * Fix assert Signed-off-by: Michael Carroll <[email protected]> * Fix constructor test Signed-off-by: Michael Carroll <[email protected]> * Change ready_executables signature back Signed-off-by: Michael Carroll <[email protected]> * Don't enforce removing callback groups before nodes Signed-off-by: Michael Carroll <[email protected]> * Remove the "add_valid_node" API Signed-off-by: Michael Carroll <[email protected]> * Only notify if the trigger condition is valid Signed-off-by: Michael Carroll <[email protected]> * Only trigger if valid and needed Signed-off-by: Michael Carroll <[email protected]> * Fix spin_some/spin_all implementation Signed-off-by: Michael Carroll <[email protected]> * Restore single threaded executor Signed-off-by: Michael Carroll <[email protected]> * Picking ABI-incompatible executor changes Signed-off-by: Michael Carroll <[email protected]> * Add PIMPL Signed-off-by: Michael Carroll <[email protected]> * Additional waitset prune Signed-off-by: Michael Carroll <[email protected]> * Fix bad merge Signed-off-by: Michael Carroll <[email protected]> * Expand test timeout Signed-off-by: Michael Carroll <[email protected]> * Introduce method to clear expired entities from a collection Signed-off-by: Michael Carroll <[email protected]> * Make sure to call remove_expired_entities(). Signed-off-by: Chris Lalancette <[email protected]> * Prune queued work when callback group is removed Signed-off-by: Michael Carroll <[email protected]> * Prune subscriptions from dynamic storage Signed-off-by: Michael Carroll <[email protected]> * Styles fixes. Signed-off-by: Chris Lalancette <[email protected]> * Re-trigger guard conditions Signed-off-by: Michael Carroll <[email protected]> * Condense to just use watiable.take_data Signed-off-by: Michael Carroll <[email protected]> * Lint Signed-off-by: Michael Carroll <[email protected]> * Address reviewer comments (nits) Signed-off-by: Michael Carroll <[email protected]> * Lock mutex when copying Signed-off-by: Michael Carroll <[email protected]> * Refactors to static single threaded based on reviewers Signed-off-by: Michael Carroll <[email protected]> * More small refactoring Signed-off-by: Michael Carroll <[email protected]> * Lint Signed-off-by: Michael Carroll <[email protected]> * Lint Signed-off-by: Michael Carroll <[email protected]> * Add ready executable accessors to WaitResult Signed-off-by: Michael Carroll <[email protected]> * Make use of accessors from wait_set Signed-off-by: Michael Carroll <[email protected]> * Fix tests Signed-off-by: Michael Carroll <[email protected]> * Fix more tests Signed-off-by: Michael Carroll <[email protected]> * Tidy up single threaded executor implementation Signed-off-by: Michael Carroll <[email protected]> * Don't null out timer, rely on call Signed-off-by: Michael Carroll <[email protected]> * change how timers are checked from wait result in executors Signed-off-by: William Woodall <[email protected]> * peak -> peek Signed-off-by: William Woodall <[email protected]> * fix bug in next_waitable logic Signed-off-by: William Woodall <[email protected]> * fix bug in StaticSTE that broke the add callback groups to executor tests Signed-off-by: William Woodall <[email protected]> * style Signed-off-by: William Woodall <[email protected]> --------- Signed-off-by: Michael Carroll <[email protected]> Signed-off-by: Michael Carroll <[email protected]> Signed-off-by: Chris Lalancette <[email protected]> Signed-off-by: William Woodall <[email protected]> Co-authored-by: Chris Lalancette <[email protected]> Co-authored-by: William Woodall <[email protected]>
1 parent 4ddaaca commit 8feb7cc

32 files changed

+992
-2021
lines changed

rclcpp/CMakeLists.txt

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -63,7 +63,6 @@ set(${PROJECT_NAME}_SRCS
6363
src/rclcpp/executors/executor_notify_waitable.cpp
6464
src/rclcpp/executors/multi_threaded_executor.cpp
6565
src/rclcpp/executors/single_threaded_executor.cpp
66-
src/rclcpp/executors/static_executor_entities_collector.cpp
6766
src/rclcpp/executors/static_single_threaded_executor.cpp
6867
src/rclcpp/expand_topic_or_service_name.cpp
6968
src/rclcpp/future_return_code.cpp

rclcpp/include/rclcpp/any_executable.hpp

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -45,9 +45,9 @@ struct AnyExecutable
4545
rclcpp::ClientBase::SharedPtr client;
4646
rclcpp::Waitable::SharedPtr waitable;
4747
// These are used to keep the scope on the containing items
48-
rclcpp::CallbackGroup::SharedPtr callback_group;
49-
rclcpp::node_interfaces::NodeBaseInterface::SharedPtr node_base;
50-
std::shared_ptr<void> data;
48+
rclcpp::CallbackGroup::SharedPtr callback_group {nullptr};
49+
rclcpp::node_interfaces::NodeBaseInterface::SharedPtr node_base {nullptr};
50+
std::shared_ptr<void> data {nullptr};
5151
};
5252

5353
} // namespace rclcpp

rclcpp/include/rclcpp/callback_group.hpp

Lines changed: 25 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -185,18 +185,41 @@ class CallbackGroup
185185
* \return the number of entities in the callback group.
186186
*/
187187
RCLCPP_PUBLIC
188-
size_t size() const;
188+
size_t
189+
size() const;
189190

191+
/// Return a reference to the 'can be taken' atomic boolean.
192+
/**
193+
* The resulting bool will be true in the case that no executor is currently
194+
* using an executable entity from this group.
195+
* The resulting bool will be false in the case that an executor is currently
196+
* using an executable entity from this group, and the group policy doesn't
197+
* allow a second take (eg mutual exclusion)
198+
* \return a reference to the flag
199+
*/
190200
RCLCPP_PUBLIC
191201
std::atomic_bool &
192202
can_be_taken_from();
193203

204+
/// Get the group type.
205+
/**
206+
* \return the group type
207+
*/
194208
RCLCPP_PUBLIC
195209
const CallbackGroupType &
196210
type() const;
197211

212+
/// Collect all of the entity pointers contained in this callback group.
213+
/**
214+
* \param[in] sub_func Function to execute for each subscription
215+
* \param[in] service_func Function to execute for each service
216+
* \param[in] client_func Function to execute for each client
217+
* \param[in] timer_func Function to execute for each timer
218+
* \param[in] waitable_fuinc Function to execute for each waitable
219+
*/
198220
RCLCPP_PUBLIC
199-
void collect_all_ptrs(
221+
void
222+
collect_all_ptrs(
200223
std::function<void(const rclcpp::SubscriptionBase::SharedPtr &)> sub_func,
201224
std::function<void(const rclcpp::ServiceBase::SharedPtr &)> service_func,
202225
std::function<void(const rclcpp::ClientBase::SharedPtr &)> client_func,

rclcpp/include/rclcpp/executor.hpp

Lines changed: 44 additions & 116 deletions
Original file line numberDiff line numberDiff line change
@@ -29,26 +29,24 @@
2929

3030
#include "rcl/guard_condition.h"
3131
#include "rcl/wait.h"
32+
#include "rclcpp/executors/executor_notify_waitable.hpp"
3233
#include "rcpputils/scope_exit.hpp"
3334

3435
#include "rclcpp/context.hpp"
3536
#include "rclcpp/contexts/default_context.hpp"
3637
#include "rclcpp/guard_condition.hpp"
3738
#include "rclcpp/executor_options.hpp"
39+
#include "rclcpp/executors/executor_entities_collection.hpp"
40+
#include "rclcpp/executors/executor_entities_collector.hpp"
3841
#include "rclcpp/future_return_code.hpp"
39-
#include "rclcpp/memory_strategies.hpp"
40-
#include "rclcpp/memory_strategy.hpp"
4142
#include "rclcpp/node_interfaces/node_base_interface.hpp"
4243
#include "rclcpp/utilities.hpp"
4344
#include "rclcpp/visibility_control.hpp"
45+
#include "rclcpp/wait_set.hpp"
4446

4547
namespace rclcpp
4648
{
4749

48-
typedef std::map<rclcpp::CallbackGroup::WeakPtr,
49-
rclcpp::node_interfaces::NodeBaseInterface::WeakPtr,
50-
std::owner_less<rclcpp::CallbackGroup::WeakPtr>> WeakCallbackGroupsToNodesMap;
51-
5250
// Forward declaration is used in convenience method signature.
5351
class Node;
5452
class ExecutorImplementation;
@@ -393,17 +391,6 @@ class Executor
393391
void
394392
cancel();
395393

396-
/// Support dynamic switching of the memory strategy.
397-
/**
398-
* Switching the memory strategy while the executor is spinning in another threading could have
399-
* unintended consequences.
400-
* \param[in] memory_strategy Shared pointer to the memory strategy to set.
401-
* \throws std::runtime_error if memory_strategy is null
402-
*/
403-
RCLCPP_PUBLIC
404-
void
405-
set_memory_strategy(memory_strategy::MemoryStrategy::SharedPtr memory_strategy);
406-
407394
/// Returns true if the executor is currently spinning.
408395
/**
409396
* This function can be called asynchronously from any thread.
@@ -451,89 +438,46 @@ class Executor
451438
static void
452439
execute_client(rclcpp::ClientBase::SharedPtr client);
453440

441+
/// Gather all of the waitable entities from associated nodes and callback groups.
442+
RCLCPP_PUBLIC
443+
void
444+
collect_entities();
445+
446+
/// Block until more work becomes avilable or timeout is reached.
454447
/**
455448
* \throws std::runtime_error if the wait set can be cleared
456449
*/
457450
RCLCPP_PUBLIC
458451
void
459452
wait_for_work(std::chrono::nanoseconds timeout = std::chrono::nanoseconds(-1));
460453

461-
RCLCPP_PUBLIC
462-
rclcpp::node_interfaces::NodeBaseInterface::SharedPtr
463-
get_node_by_group(
464-
const WeakCallbackGroupsToNodesMap & weak_groups_to_nodes,
465-
rclcpp::CallbackGroup::SharedPtr group);
466-
467-
/// Return true if the node has been added to this executor.
454+
/// Check for executable in ready state and populate union structure.
468455
/**
469-
* \param[in] node_ptr a shared pointer that points to a node base interface
470-
* \param[in] weak_groups_to_nodes map to nodes to lookup
471-
* \return true if the node is associated with the executor, otherwise false
456+
* \param[out] any_executable populated union structure of ready executable
457+
* \return true if an executable was ready and any_executable was populated,
458+
* otherwise false
472459
*/
473460
RCLCPP_PUBLIC
474461
bool
475-
has_node(
476-
const rclcpp::node_interfaces::NodeBaseInterface::SharedPtr node_ptr,
477-
const WeakCallbackGroupsToNodesMap & weak_groups_to_nodes) const;
478-
479-
RCLCPP_PUBLIC
480-
rclcpp::CallbackGroup::SharedPtr
481-
get_group_by_timer(rclcpp::TimerBase::SharedPtr timer);
482-
483-
/// Add a callback group to an executor
484-
/**
485-
* \see rclcpp::Executor::add_callback_group
486-
*/
487-
RCLCPP_PUBLIC
488-
virtual void
489-
add_callback_group_to_map(
490-
rclcpp::CallbackGroup::SharedPtr group_ptr,
491-
rclcpp::node_interfaces::NodeBaseInterface::SharedPtr node_ptr,
492-
WeakCallbackGroupsToNodesMap & weak_groups_to_nodes,
493-
bool notify = true) RCPPUTILS_TSA_REQUIRES(mutex_);
462+
get_next_ready_executable(AnyExecutable & any_executable);
494463

495-
/// Remove a callback group from the executor.
464+
/// Wait for executable in ready state and populate union structure.
496465
/**
497-
* \see rclcpp::Executor::remove_callback_group
466+
* If an executable is ready, it will return immediately, otherwise
467+
* block based on the timeout for work to become ready.
468+
*
469+
* \param[out] any_executable populated union structure of ready executable
470+
* \param[in] timeout duration of time to wait for work, a negative value
471+
* (the defualt behavior), will make this function block indefinitely
472+
* \return true if an executable was ready and any_executable was populated,
473+
* otherwise false
498474
*/
499475
RCLCPP_PUBLIC
500-
virtual void
501-
remove_callback_group_from_map(
502-
rclcpp::CallbackGroup::SharedPtr group_ptr,
503-
WeakCallbackGroupsToNodesMap & weak_groups_to_nodes,
504-
bool notify = true) RCPPUTILS_TSA_REQUIRES(mutex_);
505-
506-
RCLCPP_PUBLIC
507-
bool
508-
get_next_ready_executable(AnyExecutable & any_executable);
509-
510-
RCLCPP_PUBLIC
511-
bool
512-
get_next_ready_executable_from_map(
513-
AnyExecutable & any_executable,
514-
const WeakCallbackGroupsToNodesMap & weak_groups_to_nodes);
515-
516-
RCLCPP_PUBLIC
517476
bool
518477
get_next_executable(
519478
AnyExecutable & any_executable,
520479
std::chrono::nanoseconds timeout = std::chrono::nanoseconds(-1));
521480

522-
/// Add all callback groups that can be automatically added from associated nodes.
523-
/**
524-
* The executor, before collecting entities, verifies if any callback group from
525-
* nodes associated with the executor, which is not already associated to an executor,
526-
* can be automatically added to this executor.
527-
* This takes care of any callback group that has been added to a node but not explicitly added
528-
* to the executor.
529-
* It is important to note that in order for the callback groups to be automatically added to an
530-
* executor through this function, the node of the callback groups needs to have been added
531-
* through the `add_node` method.
532-
*/
533-
RCLCPP_PUBLIC
534-
virtual void
535-
add_callback_groups_from_nodes_associated_to_executor() RCPPUTILS_TSA_REQUIRES(mutex_);
536-
537481
/// Spinning state, used to prevent multi threaded calls to spin and to cancel blocking spins.
538482
std::atomic_bool spinning;
539483

@@ -543,16 +487,8 @@ class Executor
543487
/// Guard condition for signaling the rmw layer to wake up for system shutdown.
544488
std::shared_ptr<rclcpp::GuardCondition> shutdown_guard_condition_;
545489

546-
/// Wait set for managing entities that the rmw layer waits on.
547-
rcl_wait_set_t wait_set_ = rcl_get_zero_initialized_wait_set();
548-
549-
// Mutex to protect the subsequent memory_strategy_.
550490
mutable std::mutex mutex_;
551491

552-
/// The memory strategy: an interface for handling user-defined memory allocation strategies.
553-
memory_strategy::MemoryStrategy::SharedPtr
554-
memory_strategy_ RCPPUTILS_TSA_PT_GUARDED_BY(mutex_);
555-
556492
/// The context associated with this executor.
557493
std::shared_ptr<rclcpp::Context> context_;
558494

@@ -562,39 +498,31 @@ class Executor
562498
virtual void
563499
spin_once_impl(std::chrono::nanoseconds timeout);
564500

565-
typedef std::map<rclcpp::node_interfaces::NodeBaseInterface::WeakPtr,
566-
const rclcpp::GuardCondition *,
567-
std::owner_less<rclcpp::node_interfaces::NodeBaseInterface::WeakPtr>>
568-
WeakNodesToGuardConditionsMap;
569-
570-
typedef std::map<rclcpp::CallbackGroup::WeakPtr,
571-
const rclcpp::GuardCondition *,
572-
std::owner_less<rclcpp::CallbackGroup::WeakPtr>>
573-
WeakCallbackGroupsToGuardConditionsMap;
574-
575-
/// maps nodes to guard conditions
576-
WeakNodesToGuardConditionsMap
577-
weak_nodes_to_guard_conditions_ RCPPUTILS_TSA_GUARDED_BY(mutex_);
501+
/// Waitable containing guard conditions controlling the executor flow.
502+
/**
503+
* This waitable contains the interrupt and shutdown guard condition, as well
504+
* as the guard condition associated with each node and callback group.
505+
* By default, if any change is detected in the monitored entities, the notify
506+
* waitable will awake the executor and rebuild the collections.
507+
*/
508+
std::shared_ptr<rclcpp::executors::ExecutorNotifyWaitable> notify_waitable_;
578509

579-
/// maps callback groups to guard conditions
580-
WeakCallbackGroupsToGuardConditionsMap
581-
weak_groups_to_guard_conditions_ RCPPUTILS_TSA_GUARDED_BY(mutex_);
510+
std::atomic_bool entities_need_rebuild_;
582511

583-
/// maps callback groups associated to nodes
584-
WeakCallbackGroupsToNodesMap
585-
weak_groups_associated_with_executor_to_nodes_ RCPPUTILS_TSA_GUARDED_BY(mutex_);
512+
/// Collector used to associate executable entities from nodes and guard conditions
513+
rclcpp::executors::ExecutorEntitiesCollector collector_;
586514

587-
/// maps callback groups to nodes associated with executor
588-
WeakCallbackGroupsToNodesMap
589-
weak_groups_to_nodes_associated_with_executor_ RCPPUTILS_TSA_GUARDED_BY(mutex_);
515+
/// WaitSet to be waited on.
516+
rclcpp::WaitSet wait_set_ RCPPUTILS_TSA_GUARDED_BY(mutex_);
517+
std::optional<rclcpp::WaitResult<rclcpp::WaitSet>> wait_result_ RCPPUTILS_TSA_GUARDED_BY(mutex_);
590518

591-
/// maps all callback groups to nodes
592-
WeakCallbackGroupsToNodesMap
593-
weak_groups_to_nodes_ RCPPUTILS_TSA_GUARDED_BY(mutex_);
519+
/// Hold the current state of the collection being waited on by the waitset
520+
rclcpp::executors::ExecutorEntitiesCollection current_collection_ RCPPUTILS_TSA_GUARDED_BY(
521+
mutex_);
594522

595-
/// nodes that are associated with the executor
596-
std::list<rclcpp::node_interfaces::NodeBaseInterface::WeakPtr>
597-
weak_nodes_ RCPPUTILS_TSA_GUARDED_BY(mutex_);
523+
/// Hold the current state of the notify waitable being waited on by the waitset
524+
std::shared_ptr<rclcpp::executors::ExecutorNotifyWaitable> current_notify_waitable_
525+
RCPPUTILS_TSA_GUARDED_BY(mutex_);
598526

599527
/// shutdown callback handle registered to Context
600528
rclcpp::OnShutdownCallbackHandle shutdown_callback_handle_;

rclcpp/include/rclcpp/executors/executor_entities_collection.hpp

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -178,6 +178,12 @@ struct ExecutorEntitiesCollection
178178

179179
/// Clear the entities collection
180180
void clear();
181+
182+
/// Remove entities that have expired weak ownership
183+
/**
184+
* \return The total number of removed entities
185+
*/
186+
size_t remove_expired_entities();
181187
};
182188

183189
/// Build an entities collection from callback groups

rclcpp/include/rclcpp/executors/executor_notify_waitable.hpp

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -48,11 +48,11 @@ class ExecutorNotifyWaitable : public rclcpp::Waitable
4848
~ExecutorNotifyWaitable() override = default;
4949

5050
RCLCPP_PUBLIC
51-
ExecutorNotifyWaitable(const ExecutorNotifyWaitable & other);
51+
ExecutorNotifyWaitable(ExecutorNotifyWaitable & other);
5252

5353

5454
RCLCPP_PUBLIC
55-
ExecutorNotifyWaitable & operator=(const ExecutorNotifyWaitable & other);
55+
ExecutorNotifyWaitable & operator=(ExecutorNotifyWaitable & other);
5656

5757
/// Add conditions to the wait set
5858
/**

0 commit comments

Comments
 (0)