Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion rclcpp/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -63,9 +63,10 @@ set(${PROJECT_NAME}_SRCS
src/rclcpp/executors/executor_notify_waitable.cpp
src/rclcpp/executors/multi_threaded_executor.cpp
src/rclcpp/executors/single_threaded_executor.cpp
src/rclcpp/executors/static_executor_entities_collector.cpp
src/rclcpp/executors/static_single_threaded_executor.cpp
src/rclcpp/expand_topic_or_service_name.cpp
src/rclcpp/experimental/executors/events_executor/events_executor.cpp
src/rclcpp/experimental/timers_manager.cpp
src/rclcpp/future_return_code.cpp
src/rclcpp/generic_publisher.cpp
src/rclcpp/generic_subscription.cpp
Expand Down
6 changes: 3 additions & 3 deletions rclcpp/include/rclcpp/any_executable.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -45,9 +45,9 @@ struct AnyExecutable
rclcpp::ClientBase::SharedPtr client;
rclcpp::Waitable::SharedPtr waitable;
// These are used to keep the scope on the containing items
rclcpp::CallbackGroup::SharedPtr callback_group;
rclcpp::node_interfaces::NodeBaseInterface::SharedPtr node_base;
std::shared_ptr<void> data;
rclcpp::CallbackGroup::SharedPtr callback_group {nullptr};
rclcpp::node_interfaces::NodeBaseInterface::SharedPtr node_base {nullptr};
std::shared_ptr<void> data {nullptr};
};

} // namespace rclcpp
Expand Down
27 changes: 25 additions & 2 deletions rclcpp/include/rclcpp/callback_group.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -185,18 +185,41 @@ class CallbackGroup
* \return the number of entities in the callback group.
*/
RCLCPP_PUBLIC
size_t size() const;
size_t
size() const;

/// Return a reference to the 'can be taken' atomic boolean.
/**
* The resulting bool will be true in the case that no executor is currently
* using an executable entity from this group.
* The resulting bool will be false in the case that an executor is currently
* using an executable entity from this group, and the group policy doesn't
* allow a second take (eg mutual exclusion)
* \return a reference to the flag
*/
RCLCPP_PUBLIC
std::atomic_bool &
can_be_taken_from();

/// Get the group type.
/**
* \return the group type
*/
RCLCPP_PUBLIC
const CallbackGroupType &
type() const;

/// Collect all of the entity pointers contained in this callback group.
/**
* \param[in] sub_func Function to execute for each subscription
* \param[in] service_func Function to execute for each service
* \param[in] client_func Function to execute for each client
* \param[in] timer_func Function to execute for each timer
* \param[in] waitable_fuinc Function to execute for each waitable
*/
RCLCPP_PUBLIC
void collect_all_ptrs(
void
collect_all_ptrs(
std::function<void(const rclcpp::SubscriptionBase::SharedPtr &)> sub_func,
std::function<void(const rclcpp::ServiceBase::SharedPtr &)> service_func,
std::function<void(const rclcpp::ClientBase::SharedPtr &)> client_func,
Expand Down
162 changes: 45 additions & 117 deletions rclcpp/include/rclcpp/executor.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -29,26 +29,24 @@

#include "rcl/guard_condition.h"
#include "rcl/wait.h"
#include "rclcpp/executors/executor_notify_waitable.hpp"
#include "rcpputils/scope_exit.hpp"

#include "rclcpp/context.hpp"
#include "rclcpp/contexts/default_context.hpp"
#include "rclcpp/guard_condition.hpp"
#include "rclcpp/executor_options.hpp"
#include "rclcpp/executors/executor_entities_collection.hpp"
#include "rclcpp/executors/executor_entities_collector.hpp"
#include "rclcpp/future_return_code.hpp"
#include "rclcpp/memory_strategies.hpp"
#include "rclcpp/memory_strategy.hpp"
#include "rclcpp/node_interfaces/node_base_interface.hpp"
#include "rclcpp/utilities.hpp"
#include "rclcpp/visibility_control.hpp"
#include "rclcpp/wait_set.hpp"

namespace rclcpp
{

typedef std::map<rclcpp::CallbackGroup::WeakPtr,
rclcpp::node_interfaces::NodeBaseInterface::WeakPtr,
std::owner_less<rclcpp::CallbackGroup::WeakPtr>> WeakCallbackGroupsToNodesMap;

// Forward declaration is used in convenience method signature.
class Node;
class ExecutorImplementation;
Expand Down Expand Up @@ -393,17 +391,6 @@ class Executor
void
cancel();

/// Support dynamic switching of the memory strategy.
/**
* Switching the memory strategy while the executor is spinning in another threading could have
* unintended consequences.
* \param[in] memory_strategy Shared pointer to the memory strategy to set.
* \throws std::runtime_error if memory_strategy is null
*/
RCLCPP_PUBLIC
void
set_memory_strategy(memory_strategy::MemoryStrategy::SharedPtr memory_strategy);

/// Returns true if the executor is currently spinning.
/**
* This function can be called asynchronously from any thread.
Expand Down Expand Up @@ -441,7 +428,7 @@ class Executor

RCLCPP_PUBLIC
static void
execute_timer(rclcpp::TimerBase::SharedPtr timer);
execute_timer(rclcpp::TimerBase::SharedPtr timer, const std::shared_ptr<void> & dataPtr);

RCLCPP_PUBLIC
static void
Expand All @@ -451,89 +438,46 @@ class Executor
static void
execute_client(rclcpp::ClientBase::SharedPtr client);

/// Gather all of the waitable entities from associated nodes and callback groups.
RCLCPP_PUBLIC
void
collect_entities();

/// Block until more work becomes avilable or timeout is reached.
/**
* \throws std::runtime_error if the wait set can be cleared
*/
RCLCPP_PUBLIC
void
wait_for_work(std::chrono::nanoseconds timeout = std::chrono::nanoseconds(-1));

RCLCPP_PUBLIC
rclcpp::node_interfaces::NodeBaseInterface::SharedPtr
get_node_by_group(
const WeakCallbackGroupsToNodesMap & weak_groups_to_nodes,
rclcpp::CallbackGroup::SharedPtr group);

/// Return true if the node has been added to this executor.
/// Check for executable in ready state and populate union structure.
/**
* \param[in] node_ptr a shared pointer that points to a node base interface
* \param[in] weak_groups_to_nodes map to nodes to lookup
* \return true if the node is associated with the executor, otherwise false
* \param[out] any_executable populated union structure of ready executable
* \return true if an executable was ready and any_executable was populated,
* otherwise false
*/
RCLCPP_PUBLIC
bool
has_node(
const rclcpp::node_interfaces::NodeBaseInterface::SharedPtr node_ptr,
const WeakCallbackGroupsToNodesMap & weak_groups_to_nodes) const;

RCLCPP_PUBLIC
rclcpp::CallbackGroup::SharedPtr
get_group_by_timer(rclcpp::TimerBase::SharedPtr timer);

/// Add a callback group to an executor
/**
* \see rclcpp::Executor::add_callback_group
*/
RCLCPP_PUBLIC
virtual void
add_callback_group_to_map(
rclcpp::CallbackGroup::SharedPtr group_ptr,
rclcpp::node_interfaces::NodeBaseInterface::SharedPtr node_ptr,
WeakCallbackGroupsToNodesMap & weak_groups_to_nodes,
bool notify = true) RCPPUTILS_TSA_REQUIRES(mutex_);
get_next_ready_executable(AnyExecutable & any_executable);

/// Remove a callback group from the executor.
/// Wait for executable in ready state and populate union structure.
/**
* \see rclcpp::Executor::remove_callback_group
* If an executable is ready, it will return immediately, otherwise
* block based on the timeout for work to become ready.
*
* \param[out] any_executable populated union structure of ready executable
* \param[in] timeout duration of time to wait for work, a negative value
* (the defualt behavior), will make this function block indefinitely
* \return true if an executable was ready and any_executable was populated,
* otherwise false
*/
RCLCPP_PUBLIC
virtual void
remove_callback_group_from_map(
rclcpp::CallbackGroup::SharedPtr group_ptr,
WeakCallbackGroupsToNodesMap & weak_groups_to_nodes,
bool notify = true) RCPPUTILS_TSA_REQUIRES(mutex_);

RCLCPP_PUBLIC
bool
get_next_ready_executable(AnyExecutable & any_executable);

RCLCPP_PUBLIC
bool
get_next_ready_executable_from_map(
AnyExecutable & any_executable,
const WeakCallbackGroupsToNodesMap & weak_groups_to_nodes);

RCLCPP_PUBLIC
bool
get_next_executable(
AnyExecutable & any_executable,
std::chrono::nanoseconds timeout = std::chrono::nanoseconds(-1));

/// Add all callback groups that can be automatically added from associated nodes.
/**
* The executor, before collecting entities, verifies if any callback group from
* nodes associated with the executor, which is not already associated to an executor,
* can be automatically added to this executor.
* This takes care of any callback group that has been added to a node but not explicitly added
* to the executor.
* It is important to note that in order for the callback groups to be automatically added to an
* executor through this function, the node of the callback groups needs to have been added
* through the `add_node` method.
*/
RCLCPP_PUBLIC
virtual void
add_callback_groups_from_nodes_associated_to_executor() RCPPUTILS_TSA_REQUIRES(mutex_);

/// Spinning state, used to prevent multi threaded calls to spin and to cancel blocking spins.
std::atomic_bool spinning;

Expand All @@ -543,16 +487,8 @@ class Executor
/// Guard condition for signaling the rmw layer to wake up for system shutdown.
std::shared_ptr<rclcpp::GuardCondition> shutdown_guard_condition_;

/// Wait set for managing entities that the rmw layer waits on.
rcl_wait_set_t wait_set_ = rcl_get_zero_initialized_wait_set();

// Mutex to protect the subsequent memory_strategy_.
mutable std::mutex mutex_;

/// The memory strategy: an interface for handling user-defined memory allocation strategies.
memory_strategy::MemoryStrategy::SharedPtr
memory_strategy_ RCPPUTILS_TSA_PT_GUARDED_BY(mutex_);

/// The context associated with this executor.
std::shared_ptr<rclcpp::Context> context_;

Expand All @@ -562,39 +498,31 @@ class Executor
virtual void
spin_once_impl(std::chrono::nanoseconds timeout);

typedef std::map<rclcpp::node_interfaces::NodeBaseInterface::WeakPtr,
const rclcpp::GuardCondition *,
std::owner_less<rclcpp::node_interfaces::NodeBaseInterface::WeakPtr>>
WeakNodesToGuardConditionsMap;

typedef std::map<rclcpp::CallbackGroup::WeakPtr,
const rclcpp::GuardCondition *,
std::owner_less<rclcpp::CallbackGroup::WeakPtr>>
WeakCallbackGroupsToGuardConditionsMap;

/// maps nodes to guard conditions
WeakNodesToGuardConditionsMap
weak_nodes_to_guard_conditions_ RCPPUTILS_TSA_GUARDED_BY(mutex_);
/// Waitable containing guard conditions controlling the executor flow.
/**
* This waitable contains the interrupt and shutdown guard condition, as well
* as the guard condition associated with each node and callback group.
* By default, if any change is detected in the monitored entities, the notify
* waitable will awake the executor and rebuild the collections.
*/
std::shared_ptr<rclcpp::executors::ExecutorNotifyWaitable> notify_waitable_;

/// maps callback groups to guard conditions
WeakCallbackGroupsToGuardConditionsMap
weak_groups_to_guard_conditions_ RCPPUTILS_TSA_GUARDED_BY(mutex_);
std::atomic_bool entities_need_rebuild_;

/// maps callback groups associated to nodes
WeakCallbackGroupsToNodesMap
weak_groups_associated_with_executor_to_nodes_ RCPPUTILS_TSA_GUARDED_BY(mutex_);
/// Collector used to associate executable entities from nodes and guard conditions
rclcpp::executors::ExecutorEntitiesCollector collector_;

/// maps callback groups to nodes associated with executor
WeakCallbackGroupsToNodesMap
weak_groups_to_nodes_associated_with_executor_ RCPPUTILS_TSA_GUARDED_BY(mutex_);
/// WaitSet to be waited on.
rclcpp::WaitSet wait_set_ RCPPUTILS_TSA_GUARDED_BY(mutex_);
std::optional<rclcpp::WaitResult<rclcpp::WaitSet>> wait_result_ RCPPUTILS_TSA_GUARDED_BY(mutex_);

/// maps all callback groups to nodes
WeakCallbackGroupsToNodesMap
weak_groups_to_nodes_ RCPPUTILS_TSA_GUARDED_BY(mutex_);
/// Hold the current state of the collection being waited on by the waitset
rclcpp::executors::ExecutorEntitiesCollection current_collection_ RCPPUTILS_TSA_GUARDED_BY(
mutex_);

/// nodes that are associated with the executor
std::list<rclcpp::node_interfaces::NodeBaseInterface::WeakPtr>
weak_nodes_ RCPPUTILS_TSA_GUARDED_BY(mutex_);
/// Hold the current state of the notify waitable being waited on by the waitset
std::shared_ptr<rclcpp::executors::ExecutorNotifyWaitable> current_notify_waitable_
RCPPUTILS_TSA_GUARDED_BY(mutex_);

/// shutdown callback handle registered to Context
rclcpp::OnShutdownCallbackHandle shutdown_callback_handle_;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -178,6 +178,12 @@ struct ExecutorEntitiesCollection

/// Clear the entities collection
void clear();

/// Remove entities that have expired weak ownership
/**
* \return The total number of removed entities
*/
size_t remove_expired_entities();
};

/// Build an entities collection from callback groups
Expand Down
37 changes: 33 additions & 4 deletions rclcpp/include/rclcpp/executors/executor_notify_waitable.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -48,11 +48,11 @@ class ExecutorNotifyWaitable : public rclcpp::Waitable
~ExecutorNotifyWaitable() override = default;

RCLCPP_PUBLIC
ExecutorNotifyWaitable(const ExecutorNotifyWaitable & other);
ExecutorNotifyWaitable(ExecutorNotifyWaitable & other);


RCLCPP_PUBLIC
ExecutorNotifyWaitable & operator=(const ExecutorNotifyWaitable & other);
ExecutorNotifyWaitable & operator=(ExecutorNotifyWaitable & other);

/// Add conditions to the wait set
/**
Expand Down Expand Up @@ -88,6 +88,25 @@ class ExecutorNotifyWaitable : public rclcpp::Waitable
std::shared_ptr<void>
take_data() override;

/// Take the data from an entity ID so that it can be consumed with `execute`.
/**
* \param[in] id ID of the entity to take data from.
* \return If available, data to be used, otherwise nullptr
* \sa rclcpp::Waitable::take_data_by_entity_id
*/
RCLCPP_PUBLIC
std::shared_ptr<void>
take_data_by_entity_id(size_t id) override;

/// Set a callback to be called whenever the waitable becomes ready.
/**
* \param[in] callback callback to set
* \sa rclcpp::Waitable::set_on_ready_callback
*/
RCLCPP_PUBLIC
void
set_on_ready_callback(std::function<void(size_t, int)> callback) override;

/// Add a guard condition to be waited on.
/**
* \param[in] guard_condition The guard condition to add.
Expand All @@ -96,13 +115,21 @@ class ExecutorNotifyWaitable : public rclcpp::Waitable
void
add_guard_condition(rclcpp::GuardCondition::WeakPtr guard_condition);

/// Unset any callback registered via set_on_ready_callback.
/**
* \sa rclcpp::Waitable::clear_on_ready_callback
*/
RCLCPP_PUBLIC
void
clear_on_ready_callback() override;

/// Remove a guard condition from being waited on.
/**
* \param[in] guard_condition The guard condition to remove.
* \param[in] weak_guard_condition The guard condition to remove.
*/
RCLCPP_PUBLIC
void
remove_guard_condition(rclcpp::GuardCondition::WeakPtr guard_condition);
remove_guard_condition(rclcpp::GuardCondition::WeakPtr weak_guard_condition);

/// Get the number of ready guard_conditions
/**
Expand All @@ -118,6 +145,8 @@ class ExecutorNotifyWaitable : public rclcpp::Waitable

std::mutex guard_condition_mutex_;

std::function<void(size_t)> on_ready_callback_;

/// The collection of guard conditions to be waited on.
std::set<rclcpp::GuardCondition::WeakPtr,
std::owner_less<rclcpp::GuardCondition::WeakPtr>> notify_guard_conditions_;
Expand Down
Loading