Skip to content
11 changes: 11 additions & 0 deletions rclcpp/include/rclcpp/clock.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -193,6 +193,17 @@ class Clock
bool
ros_time_is_active();

/**
* Cancels an ongoing or future sleep operation of one thread.
*
* This function is intended for multi threaded signaling. It can
* be used by one thread, to wakeup another thread, using any
* of the sleep_ or wait_ methods.
*/
RCLCPP_PUBLIC
void
cancel_sleep_or_wait();

/// Return the rcl_clock_t clock handle
RCLCPP_PUBLIC
rcl_clock_t *
Expand Down
111 changes: 41 additions & 70 deletions rclcpp/include/rclcpp/executor.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -242,42 +242,30 @@ class Executor
* spin_node_once to block indefinitely (the default behavior). A timeout of 0 causes this
* function to be non-blocking.
*/
template<typename RepT = int64_t, typename T = std::milli>
void
RCLCPP_PUBLIC
virtual void
spin_node_once(
rclcpp::node_interfaces::NodeBaseInterface::SharedPtr node,
std::chrono::duration<RepT, T> timeout = std::chrono::duration<RepT, T>(-1))
{
return spin_node_once_nanoseconds(
node,
std::chrono::duration_cast<std::chrono::nanoseconds>(timeout)
);
}
const rclcpp::node_interfaces::NodeBaseInterface::SharedPtr & node,
std::chrono::nanoseconds timeout = std::chrono::nanoseconds(-1));

/// Convenience function which takes Node and forwards NodeBaseInterface.
template<typename NodeT = rclcpp::Node, typename RepT = int64_t, typename T = std::milli>
void
RCLCPP_PUBLIC
virtual void
spin_node_once(
std::shared_ptr<NodeT> node,
std::chrono::duration<RepT, T> timeout = std::chrono::duration<RepT, T>(-1))
{
return spin_node_once_nanoseconds(
node->get_node_base_interface(),
std::chrono::duration_cast<std::chrono::nanoseconds>(timeout)
);
}
const std::shared_ptr<rclcpp::Node> & node,
std::chrono::nanoseconds timeout = std::chrono::nanoseconds(-1));
Comment on lines -245 to +256
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do you absolutely need to make these non-template? I'm afraid existing code may not compile against this if certain types of durations are passed.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't really need to, but all other functions in the executor have the same signature and compile just fine. Any std::chono::duration type should auto convert to std::chrono::nanoseconds.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Arguably the others taking only nanoseconds is a probably a bug, see: https://stackoverflow.com/a/22362867

It seems like in C++20 we could have a function that takes std::chrono::duration<auto>, but we don't have that right now.

I'll remove this change in my fix up.


/// Add a node, complete all immediately available work, and remove the node.
/**
* \param[in] node Shared pointer to the node to add.
*/
RCLCPP_PUBLIC
void
virtual void
spin_node_some(rclcpp::node_interfaces::NodeBaseInterface::SharedPtr node);

/// Convenience function which takes Node and forwards NodeBaseInterface.
RCLCPP_PUBLIC
void
virtual void
spin_node_some(std::shared_ptr<rclcpp::Node> node);

/// Collect work once and execute all available work, optionally within a max duration.
Expand Down Expand Up @@ -307,14 +295,14 @@ class Executor
* \param[in] node Shared pointer to the node to add.
*/
RCLCPP_PUBLIC
void
virtual void
spin_node_all(
rclcpp::node_interfaces::NodeBaseInterface::SharedPtr node,
std::chrono::nanoseconds max_duration);

/// Convenience function which takes Node and forwards NodeBaseInterface.
RCLCPP_PUBLIC
void
virtual void
spin_node_all(std::shared_ptr<rclcpp::Node> node, std::chrono::nanoseconds max_duration);

/// Collect and execute work repeatedly within a duration or until no more work is available.
Expand Down Expand Up @@ -366,52 +354,11 @@ class Executor
const FutureT & future,
std::chrono::duration<TimeRepT, TimeT> timeout = std::chrono::duration<TimeRepT, TimeT>(-1))
{
// TODO(wjwwood): does not work recursively; can't call spin_node_until_future_complete
// inside a callback executed by an executor.

// Check the future before entering the while loop.
// If the future is already complete, don't try to spin.
std::future_status status = future.wait_for(std::chrono::seconds(0));
if (status == std::future_status::ready) {
return FutureReturnCode::SUCCESS;
}

auto end_time = std::chrono::steady_clock::now();
std::chrono::nanoseconds timeout_ns = std::chrono::duration_cast<std::chrono::nanoseconds>(
timeout);
if (timeout_ns > std::chrono::nanoseconds::zero()) {
end_time += timeout_ns;
}
std::chrono::nanoseconds timeout_left = timeout_ns;

if (spinning.exchange(true)) {
throw std::runtime_error("spin_until_future_complete() called while already spinning");
return spin_until_future_complete_impl(std::chrono::duration_cast<std::chrono::nanoseconds>(
timeout), [&future] (std::chrono::nanoseconds wait_time) {
return future.wait_for(wait_time);
}
RCPPUTILS_SCOPE_EXIT(this->spinning.store(false); );
while (rclcpp::ok(this->context_) && spinning.load()) {
// Do one item of work.
spin_once_impl(timeout_left);

// Check if the future is set, return SUCCESS if it is.
status = future.wait_for(std::chrono::seconds(0));
if (status == std::future_status::ready) {
return FutureReturnCode::SUCCESS;
}
// If the original timeout is < 0, then this is blocking, never TIMEOUT.
if (timeout_ns < std::chrono::nanoseconds::zero()) {
continue;
}
// Otherwise check if we still have time to wait, return TIMEOUT if not.
auto now = std::chrono::steady_clock::now();
if (now >= end_time) {
return FutureReturnCode::TIMEOUT;
}
// Subtract the elapsed time from the original timeout.
timeout_left = std::chrono::duration_cast<std::chrono::nanoseconds>(end_time - now);
}

// The future did not complete before ok() returned false, return INTERRUPTED.
return FutureReturnCode::INTERRUPTED;
);
}

/// Cancel any running spin* function, causing it to return.
Expand All @@ -420,7 +367,7 @@ class Executor
* \throws std::runtime_error if there is an issue triggering the guard condition
*/
RCLCPP_PUBLIC
void
virtual void
cancel();

/// Returns true if the executor is currently spinning.
Expand All @@ -433,6 +380,15 @@ class Executor
is_spinning();

protected:
/// Constructor that will not initialize any non-trivial members.
/**
* This constructor is intended to be used by any derived executor
* that explicitly does not want to use the default implementation provided
* by this class.
* This constructor is guaranteed to not modify the system state.
*/
explicit Executor(const std::shared_ptr<rclcpp::Context> & context);

/// Add a node to executor, execute the next available unit of work, and remove the node.
/**
* Implementation of spin_node_once using std::chrono::nanoseconds
Expand All @@ -447,6 +403,21 @@ class Executor
rclcpp::node_interfaces::NodeBaseInterface::SharedPtr node,
std::chrono::nanoseconds timeout);

/// Spin (blocking) until the get_future_status returns ready, max_duration is reached,
/// or rclcpp is interrupted.
/**
* \param[in] max_duration Optional duration parameter, which gets passed to Executor::spin_node_once.
* `-1` is block forever, `0` is non-blocking.
* If the time spent inside the blocking loop exceeds this timeout, return a TIMEOUT return
* code.
* \param[in] wait_for_future A function waiting for a future to become ready.
* \return The return code, one of `SUCCESS`, `INTERRUPTED`, or `TIMEOUT`.
*/
RCLCPP_PUBLIC
virtual FutureReturnCode spin_until_future_complete_impl(
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
virtual FutureReturnCode spin_until_future_complete_impl(
RCLCPP_PUBLIC
virtual FutureReturnCode
spin_until_future_complete_impl(

Also, needs at least a minimal docstring.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

As I said above, I'd consider dropping this change, but if you keep it, consider the name spin_until_future_complete_nanoseconds() since that would match other functions like spin_node_once() -> spin_node_once_nanoseconds().

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We got two colliding styles here, there is also
spin_once -> spin_once_impl and spin_some -> spin_some_impl

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Added docs

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We got two colliding styles here, there is also
spin_once -> spin_once_impl and spin_some -> spin_some_impl

Yeah, I'll leave it as-is, but I think this is related to the templated functions that take duration vs just taking nanoseconds, I still believe the templated approach is better, and in that case the narrowing to the non-template protected methods are truly just to cast it down to nanoseconds, so that makes sense in those cases. Likely if I had noticed I would have suggesting the newer functions follow that pattern, but it's not that important now.

std::chrono::nanoseconds max_duration,
const std::function<std::future_status(std::chrono::nanoseconds wait_time)> & wait_for_future);

/// Collect work and execute available work, optionally within a duration.
/**
* Implementation of spin_some and spin_all.
Expand Down
71 changes: 49 additions & 22 deletions rclcpp/src/rclcpp/clock.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,10 @@ class Clock::Impl

rcl_clock_t rcl_clock_;
rcl_allocator_t allocator_;
bool stop_sleeping_ = false;
bool shutdown_ = false;
std::condition_variable cv_;
std::mutex wait_mutex_;
std::mutex clock_mutex_;
};

Expand Down Expand Up @@ -79,8 +83,20 @@ Clock::now() const
return now;
}

void
Clock::cancel_sleep_or_wait()
{
{
std::unique_lock lock(impl_->wait_mutex_);
impl_->stop_sleeping_ = true;
}
impl_->cv_.notify_one();
}

bool
Clock::sleep_until(Time until, Context::SharedPtr context)
Clock::sleep_until(
Time until,
Context::SharedPtr context)
{
if (!context || !context->is_valid()) {
throw std::runtime_error("context cannot be slept with because it's invalid");
Expand All @@ -91,12 +107,14 @@ Clock::sleep_until(Time until, Context::SharedPtr context)
}
bool time_source_changed = false;

std::condition_variable cv;

// Wake this thread if the context is shutdown
rclcpp::OnShutdownCallbackHandle shutdown_cb_handle = context->add_on_shutdown_callback(
[&cv]() {
cv.notify_one();
[this]() {
{
std::unique_lock lock(impl_->wait_mutex_);
impl_->shutdown_ = true;
}
impl_->cv_.notify_one();
});
// No longer need the shutdown callback when this function exits
auto callback_remover = rcpputils::scope_exit(
Expand All @@ -112,22 +130,24 @@ Clock::sleep_until(Time until, Context::SharedPtr context)
const std::chrono::steady_clock::time_point chrono_until =
chrono_entry + std::chrono::nanoseconds(delta_t.nanoseconds());

// loop over spurious wakeups but notice shutdown
std::unique_lock lock(impl_->clock_mutex_);
while (now() < until && context->is_valid()) {
cv.wait_until(lock, chrono_until);
// loop over spurious wakeups but notice shutdown or stop of sleep
std::unique_lock lock(impl_->wait_mutex_);
while (now() < until && !impl_->stop_sleeping_ && !impl_->shutdown_ && context->is_valid()) {
impl_->cv_.wait_until(lock, chrono_until);
}
impl_->stop_sleeping_ = false;
} else if (this_clock_type == RCL_SYSTEM_TIME) {
auto system_time = std::chrono::system_clock::time_point(
// Cast because system clock resolution is too big for nanoseconds on some systems
std::chrono::duration_cast<std::chrono::system_clock::duration>(
std::chrono::nanoseconds(until.nanoseconds())));

// loop over spurious wakeups but notice shutdown
std::unique_lock lock(impl_->clock_mutex_);
while (now() < until && context->is_valid()) {
cv.wait_until(lock, system_time);
// loop over spurious wakeups but notice shutdown or stop of sleep
std::unique_lock lock(impl_->wait_mutex_);
while (now() < until && !impl_->stop_sleeping_ && !impl_->shutdown_ && context->is_valid()) {
impl_->cv_.wait_until(lock, system_time);
}
impl_->stop_sleeping_ = false;
} else if (this_clock_type == RCL_ROS_TIME) {
// Install jump handler for any amount of time change, for two purposes:
// - if ROS time is active, check if time reached on each new clock sample
Expand All @@ -139,11 +159,12 @@ Clock::sleep_until(Time until, Context::SharedPtr context)
threshold.min_forward.nanoseconds = 1;
auto clock_handler = create_jump_callback(
nullptr,
[&cv, &time_source_changed](const rcl_time_jump_t & jump) {
[this, &time_source_changed](const rcl_time_jump_t & jump) {
if (jump.clock_change != RCL_ROS_TIME_NO_CHANGE) {
std::lock_guard<std::mutex> lk(impl_->wait_mutex_);
time_source_changed = true;
}
cv.notify_one();
impl_->cv_.notify_one();
},
threshold);

Expand All @@ -153,19 +174,25 @@ Clock::sleep_until(Time until, Context::SharedPtr context)
std::chrono::duration_cast<std::chrono::system_clock::duration>(
std::chrono::nanoseconds(until.nanoseconds())));

// loop over spurious wakeups but notice shutdown or time source change
std::unique_lock lock(impl_->clock_mutex_);
while (now() < until && context->is_valid() && !time_source_changed) {
cv.wait_until(lock, system_time);
// loop over spurious wakeups but notice shutdown, stop of sleep or time source change
std::unique_lock lock(impl_->wait_mutex_);
while (now() < until && !impl_->stop_sleeping_ && !impl_->shutdown_ && context->is_valid() &&
!time_source_changed)
{
impl_->cv_.wait_until(lock, system_time);
}
impl_->stop_sleeping_ = false;
} else {
// RCL_ROS_TIME with ros_time_is_active.
// Just wait without "until" because installed
// jump callbacks wake the cv on every new sample.
std::unique_lock lock(impl_->clock_mutex_);
while (now() < until && context->is_valid() && !time_source_changed) {
cv.wait(lock);
std::unique_lock lock(impl_->wait_mutex_);
while (now() < until && !impl_->stop_sleeping_ && !impl_->shutdown_ && context->is_valid() &&
!time_source_changed)
{
impl_->cv_.wait(lock);
}
impl_->stop_sleeping_ = false;
}
}

Expand Down
Loading