Skip to content

Commit c118463

Browse files
author
Janosch Machowinski
committed
feat: Added support for multi thread wait / shutdown
This adds support for multiple threads waiting on the same clock, while an shutdown is invoked. Signed-off-by: Janosch Machowinski <[email protected]>
1 parent 36061ed commit c118463

File tree

3 files changed

+87
-20
lines changed

3 files changed

+87
-20
lines changed

rclcpp/include/rclcpp/clock.hpp

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -193,11 +193,17 @@ class Clock
193193
bool
194194
ros_time_is_active();
195195

196+
/**
197+
* Cancels an ongoing or future sleep operation of one thread.
198+
*
199+
* This function is intended for multi threaded signaling. It can
200+
* be used by one thread, to wakeup another thread, using any
201+
* of the sleep_ or wait_ methods.
202+
*/
196203
RCLCPP_PUBLIC
197204
void
198205
cancel_sleep_or_wait();
199206

200-
201207
/// Return the rcl_clock_t clock handle
202208
RCLCPP_PUBLIC
203209
rcl_clock_t *

rclcpp/src/rclcpp/clock.cpp

Lines changed: 10 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -50,6 +50,7 @@ class Clock::Impl
5050
rcl_clock_t rcl_clock_;
5151
rcl_allocator_t allocator_;
5252
bool stop_sleeping_ = false;
53+
bool shutdown_ = false;
5354
std::condition_variable cv_;
5455
std::mutex wait_mutex_;
5556
std::mutex clock_mutex_;
@@ -109,7 +110,11 @@ Clock::sleep_until(
109110
// Wake this thread if the context is shutdown
110111
rclcpp::OnShutdownCallbackHandle shutdown_cb_handle = context->add_on_shutdown_callback(
111112
[this]() {
112-
cancel_sleep_or_wait();
113+
{
114+
std::unique_lock lock(impl_->wait_mutex_);
115+
impl_->shutdown_ = true;
116+
}
117+
impl_->cv_.notify_one();
113118
});
114119
// No longer need the shutdown callback when this function exits
115120
auto callback_remover = rcpputils::scope_exit(
@@ -127,7 +132,7 @@ Clock::sleep_until(
127132

128133
// loop over spurious wakeups but notice shutdown or stop of sleep
129134
std::unique_lock lock(impl_->wait_mutex_);
130-
while (now() < until && !impl_->stop_sleeping_ && context->is_valid()) {
135+
while (now() < until && !impl_->stop_sleeping_ && !impl_->shutdown_ && context->is_valid()) {
131136
impl_->cv_.wait_until(lock, chrono_until);
132137
}
133138
impl_->stop_sleeping_ = false;
@@ -139,7 +144,7 @@ Clock::sleep_until(
139144

140145
// loop over spurious wakeups but notice shutdown or stop of sleep
141146
std::unique_lock lock(impl_->wait_mutex_);
142-
while (now() < until && !impl_->stop_sleeping_ && context->is_valid()) {
147+
while (now() < until && !impl_->stop_sleeping_ && !impl_->shutdown_ && context->is_valid()) {
143148
impl_->cv_.wait_until(lock, system_time);
144149
}
145150
impl_->stop_sleeping_ = false;
@@ -171,7 +176,7 @@ Clock::sleep_until(
171176

172177
// loop over spurious wakeups but notice shutdown, stop of sleep or time source change
173178
std::unique_lock lock(impl_->wait_mutex_);
174-
while (now() < until && !impl_->stop_sleeping_ && context->is_valid() &&
179+
while (now() < until && !impl_->stop_sleeping_ && !impl_->shutdown_ && context->is_valid() &&
175180
!time_source_changed)
176181
{
177182
impl_->cv_.wait_until(lock, system_time);
@@ -182,7 +187,7 @@ Clock::sleep_until(
182187
// Just wait without "until" because installed
183188
// jump callbacks wake the cv on every new sample.
184189
std::unique_lock lock(impl_->wait_mutex_);
185-
while (now() < until && !impl_->stop_sleeping_ && context->is_valid() &&
190+
while (now() < until && !impl_->stop_sleeping_ && !impl_->shutdown_ && context->is_valid() &&
186191
!time_source_changed)
187192
{
188193
impl_->cv_.wait(lock);

rclcpp/test/rclcpp/test_clock.cpp

Lines changed: 70 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -25,7 +25,16 @@
2525
#include "../utils/rclcpp_gtest_macros.hpp"
2626

2727
using namespace std::chrono_literals;
28-
class TestClockWakeup : public ::testing::Test
28+
29+
enum class ClockType
30+
{
31+
RCL_STEADY_TIME,
32+
RCL_SYSTEM_TIME,
33+
RCL_ROS_TIME,
34+
};
35+
36+
37+
class TestClockWakeup : public ::testing::TestWithParam<rcl_clock_type_e>
2938
{
3039
public:
3140
void test_wakeup_before_sleep(const rclcpp::Clock::SharedPtr & clock)
@@ -111,20 +120,15 @@ class TestClockWakeup : public ::testing::Test
111120
rclcpp::Node::SharedPtr node;
112121
};
113122

114-
TEST_F(TestClockWakeup, wakeup_sleep_steay_time) {
115-
auto clock = std::make_shared<rclcpp::Clock>(RCL_STEADY_TIME);
116-
test_wakeup_after_sleep(clock);
117-
test_wakeup_before_sleep(clock);
118-
}
119-
120-
TEST_F(TestClockWakeup, wakeup_sleep_system_time) {
121-
auto clock = std::make_shared<rclcpp::Clock>(RCL_SYSTEM_TIME);
122-
test_wakeup_after_sleep(clock);
123-
test_wakeup_before_sleep(clock);
124-
}
123+
INSTANTIATE_TEST_SUITE_P(
124+
Clocks,
125+
TestClockWakeup,
126+
::testing::Values(
127+
RCL_SYSTEM_TIME, RCL_ROS_TIME, RCL_STEADY_TIME
128+
));
125129

126-
TEST_F(TestClockWakeup, wakeup_sleep_ros_time_not_active) {
127-
auto clock = std::make_shared<rclcpp::Clock>(RCL_ROS_TIME);
130+
TEST_P(TestClockWakeup, wakeup_sleep) {
131+
auto clock = std::make_shared<rclcpp::Clock>(GetParam());
128132
test_wakeup_after_sleep(clock);
129133
test_wakeup_before_sleep(clock);
130134
}
@@ -178,3 +182,55 @@ TEST_F(TestClockWakeup, no_wakeup_on_sim_time) {
178182
EXPECT_TRUE(thread_finished);
179183
EXPECT_LT(cur_time, start_time + std::chrono::seconds(1));
180184
}
185+
186+
TEST_F(TestClockWakeup, multiple_threads_wait_on_one_clock) {
187+
auto clock = std::make_shared<rclcpp::Clock>(RCL_ROS_TIME);
188+
189+
std::vector<bool> thread_finished(10, false);
190+
191+
std::vector<std::thread> threads;
192+
193+
for(size_t nr = 0; nr < thread_finished.size(); nr++) {
194+
threads.push_back(std::thread(
195+
[&clock, &thread_finished, nr]()
196+
{
197+
// make sure the thread starts sleeping late
198+
clock->sleep_until(clock->now() + std::chrono::seconds(10));
199+
thread_finished[nr] = true;
200+
}));
201+
}
202+
203+
// wait a bit so all threads can execute the sleep_until
204+
std::this_thread::sleep_for(std::chrono::milliseconds(500));
205+
206+
for(const bool & finished : thread_finished) {
207+
EXPECT_FALSE(finished);
208+
}
209+
210+
rclcpp::shutdown();
211+
212+
auto start_time = std::chrono::steady_clock::now();
213+
auto cur_time = start_time;
214+
bool threads_finished = false;
215+
while (!threads_finished && start_time + std::chrono::seconds(1) > cur_time) {
216+
threads_finished = true;
217+
for(const bool finished : thread_finished) {
218+
if(!finished) {
219+
threads_finished = false;
220+
}
221+
}
222+
223+
std::this_thread::sleep_for(std::chrono::milliseconds(10));
224+
cur_time = std::chrono::steady_clock::now();
225+
}
226+
227+
for(const bool finished : thread_finished) {
228+
EXPECT_TRUE(finished);
229+
}
230+
231+
for(auto & thread : threads) {
232+
thread.join();
233+
}
234+
235+
EXPECT_LT(cur_time, start_time + std::chrono::seconds(1));
236+
}

0 commit comments

Comments
 (0)