Skip to content

Commit b5eb517

Browse files
Janosch MachowinskiJanosch Machowinski
authored andcommitted
Made double_take_data test work without intrusive behavior
Signed-off-by: Janosch Machowinski <[email protected]>
1 parent 8c68c71 commit b5eb517

File tree

1 file changed

+201
-86
lines changed

1 file changed

+201
-86
lines changed

rclcpp/test/rclcpp/executors/test_executors.cpp

Lines changed: 201 additions & 86 deletions
Original file line numberDiff line numberDiff line change
@@ -410,19 +410,36 @@ class TestWaitable : public rclcpp::Waitable
410410
add_to_wait_set(rcl_wait_set_t * wait_set) override
411411
{
412412
rclcpp::detail::add_guard_condition_to_rcl_wait_set(*wait_set, gc_);
413+
if (retrigger_guard_condition && num_unprocessed_triggers > 0) {
414+
gc_.trigger();
415+
}
413416
}
414417

415418
void trigger()
416419
{
420+
num_unprocessed_triggers++;
417421
gc_.trigger();
418422
}
419423

424+
void trigger_and_hold_execute()
425+
{
426+
hold_execute = true;
427+
428+
trigger();
429+
}
430+
431+
void release_execute()
432+
{
433+
hold_execute = false;
434+
cv.notify_one();
435+
}
436+
420437
bool
421438
is_ready(rcl_wait_set_t * wait_set) override
422439
{
440+
is_ready_called_before_take_data = true;
423441
for (size_t i = 0; i < wait_set->size_of_guard_conditions; ++i) {
424442
if (&gc_.get_rcl_guard_condition() == wait_set->guard_conditions[i]) {
425-
is_ready_called_before_take_data = true;
426443
return true;
427444
}
428445
}
@@ -438,14 +455,17 @@ class TestWaitable : public rclcpp::Waitable
438455
}
439456

440457
is_ready_called_before_take_data = false;
458+
459+
num_unprocessed_triggers--;
460+
441461
return nullptr;
442462
}
443463

444464
std::shared_ptr<void>
445465
take_data_by_entity_id(size_t id) override
446466
{
447467
(void) id;
448-
return nullptr;
468+
return take_data();
449469
}
450470

451471
void
@@ -462,6 +482,12 @@ class TestWaitable : public rclcpp::Waitable
462482
throw;
463483
}
464484
}
485+
486+
if(hold_execute)
487+
{
488+
std::unique_lock<std::mutex> lk(cv_m);
489+
cv.wait(lk);
490+
}
465491
}
466492

467493
void
@@ -496,11 +522,22 @@ class TestWaitable : public rclcpp::Waitable
496522
return execute_promise_.get_future();
497523
}
498524

525+
void enable_retriggering(bool enabled)
526+
{
527+
retrigger_guard_condition = enabled;
528+
}
529+
499530
private:
500531
bool is_ready_called_before_take_data = false;
532+
bool retrigger_guard_condition = true;
501533
std::promise<void> execute_promise_;
502534
std::mutex execute_promise_mutex_;
535+
std::atomic<uint> num_unprocessed_triggers = 0;
536+
std::atomic<bool> hold_execute = false;
503537
size_t count_ = 0;
538+
std::condition_variable cv;
539+
std::mutex cv_m;
540+
504541
rclcpp::GuardCondition gc_;
505542
};
506543

@@ -547,136 +584,209 @@ TYPED_TEST(TestExecutors, spinAll)
547584
spinner.join();
548585
}
549586

550-
TEST(TestExecutorsOnlyNode, double_take_data)
587+
TEST(TestExecutors, double_take_data)
551588
{
552589
rclcpp::init(0, nullptr);
553590

591+
rclcpp::executors::MultiThreadedExecutor executor;
592+
554593
const auto test_info = ::testing::UnitTest::GetInstance()->current_test_info();
555594
std::stringstream test_name;
556595
test_name << test_info->test_case_name() << "_" << test_info->name();
557596
rclcpp::Node::SharedPtr node = std::make_shared<rclcpp::Node>("node", test_name.str());
558597

559-
class MyExecutor : public rclcpp::executors::SingleThreadedExecutor
560-
{
561-
public:
562-
/**
563-
* This is a copy of Executor::get_next_executable with a callback, to test
564-
* for a special race condition
565-
*/
566-
bool get_next_executable_with_callback(
567-
rclcpp::AnyExecutable & any_executable,
568-
std::chrono::nanoseconds timeout,
569-
std::function<void(void)> inbetween)
570-
{
571-
bool success = false;
572-
// Check to see if there are any subscriptions or timers needing service
573-
// TODO(wjwwood): improve run to run efficiency of this function
574-
success = get_next_ready_executable(any_executable);
575-
// If there are none
576-
if (!success) {
577-
578-
inbetween();
579-
580-
// Wait for subscriptions or timers to work on
581-
wait_for_work(timeout);
582-
if (!spinning.load()) {
583-
return false;
584-
}
585-
// Try again
586-
success = get_next_ready_executable(any_executable);
587-
}
588-
return success;
589-
}
598+
auto waitable_interfaces = node->get_node_waitables_interface();
590599

591-
void spin_once_with_callback(
592-
std::chrono::nanoseconds timeout,
593-
std::function<void(void)> inbetween)
594-
{
595-
rclcpp::AnyExecutable any_exec;
596-
if (get_next_executable_with_callback(any_exec, timeout, inbetween)) {
597-
execute_any_executable(any_exec);
598-
}
599-
}
600+
auto first_cbg = node->create_callback_group(
601+
rclcpp::CallbackGroupType::MutuallyExclusive,
602+
true);
600603

601-
};
604+
auto third_cbg = node->create_callback_group(
605+
rclcpp::CallbackGroupType::MutuallyExclusive,
606+
true);
602607

603-
MyExecutor executor;
608+
// these waitable have one job, to make the MemoryStrategy::collect_enties method take
609+
// a long time, in order to force our race condition
610+
std::vector<std::shared_ptr<TestWaitable>> stuffing_waitables;
611+
std::vector<std::shared_ptr<rclcpp::CallbackGroup>> stuffing_cbgs;
604612

613+
for (size_t i = 0; i < 50; i++) {
614+
auto cbg = node->create_callback_group(
615+
rclcpp::CallbackGroupType::MutuallyExclusive,
616+
true);
617+
618+
stuffing_cbgs.push_back(cbg);
619+
620+
for (int j = 0; j < 200; j++) {
621+
auto waitable = std::make_shared<TestWaitable>();
622+
stuffing_waitables.push_back(waitable);
623+
waitable_interfaces->add_waitable(waitable, cbg);
624+
}
625+
}
626+
627+
// this is the callback group were wo introduce the double take
605628
auto callback_group = node->create_callback_group(
606629
rclcpp::CallbackGroupType::MutuallyExclusive,
607630
true);
608631

632+
609633
std::vector<std::shared_ptr<TestWaitable>> waitables;
610634

611-
auto waitable_interfaces = node->get_node_waitables_interface();
635+
auto w3 = std::make_shared<TestWaitable>();
636+
waitable_interfaces->add_waitable(w3, third_cbg);
637+
638+
// First group of waitables, that gets processed.
639+
// We use the shared count of these waitables and the callback group,
640+
// to estimate when MemoryStrategy::collect_enties is called in the spinner thread
641+
std::vector<std::shared_ptr<TestWaitable>> first_waitables;
642+
auto non_triggered_in_first_cbg = std::make_shared<TestWaitable>();
643+
waitable_interfaces->add_waitable(non_triggered_in_first_cbg, first_cbg);
644+
645+
auto non_triggered_in_first_cbg2 = std::make_shared<TestWaitable>();
646+
waitable_interfaces->add_waitable(non_triggered_in_first_cbg2, first_cbg);
647+
612648

613-
for (int i = 0; i < 3; i++) {
649+
auto cbg_start = std::make_shared<TestWaitable>();
650+
waitable_interfaces->add_waitable(cbg_start, callback_group);
651+
652+
// These waitables will get triggered while cbg_start is beeing executed
653+
for (int i = 0; i < 20; i++) {
614654
auto waitable = std::make_shared<TestWaitable>();
615655
waitables.push_back(waitable);
616656
waitable_interfaces->add_waitable(waitable, callback_group);
617657
}
658+
659+
// used to detect if all triggers were processed
660+
auto cbg_end = std::make_shared<TestWaitable>();
661+
waitable_interfaces->add_waitable(cbg_end, callback_group);
662+
618663
executor.add_node(node);
619664

620-
for (auto & waitable : waitables) {
621-
waitable->trigger();
665+
// ref count if not reference by the internals of the executor
666+
auto min_ref_cnt = non_triggered_in_first_cbg.use_count();
667+
auto cbg_min_ref_cnt = first_cbg.use_count();
668+
669+
for (auto & w : waitables) {
670+
EXPECT_EQ(w->get_count(), 0);
622671
}
623672

624-
// a node has some default subscribers, that need to get executed first, therefore the loop
625-
for (int i = 0; i < 10; i++) {
626-
executor.spin_once(std::chrono::milliseconds(10));
627-
if (waitables.front()->get_count() > 0) {
628-
// stop execution, after the first waitable has been executed
629-
break;
673+
std::atomic_bool exception = false;
674+
675+
std::thread t([&executor, &exception]() {
676+
try {
677+
executor.spin();
678+
} catch (const std::exception & e) {
679+
exception = true;
680+
} catch (...) {
681+
exception = true;
682+
}
683+
});
684+
685+
size_t start_count = cbg_start->get_count();
686+
cbg_start->trigger_and_hold_execute();
687+
688+
//wait until the first waitable is executed and blocks the callback_group
689+
while (cbg_start->get_count() == start_count) {
690+
std::this_thread::sleep_for(1ms);
691+
}
692+
693+
for (auto & w : waitables) {
694+
w->trigger();
695+
}
696+
697+
// trigger w3 to make sure, the MemoryStrategy clears its internal list of ready entities
698+
{
699+
auto cnt = w3->get_count();
700+
w3->trigger();
701+
while (w3->get_count() == cnt) {
702+
std::this_thread::sleep_for(1ms);
630703
}
631704
}
632705

633-
EXPECT_EQ(waitables.front()->get_count(), 1);
706+
// observe the use counts of non_triggered_in_first_cbg, non_triggered_in_first_cbg2 and first_cbg
707+
// in order to fugure out if MemoryStrategy::collect_enties is beeing calles
708+
while (true) {
709+
w3->trigger();
710+
bool restart = false;
634711

635-
// block the callback group, this is something that may happen during multi threaded execution
636-
// This removes my_waitable2 from the list of ready events, and triggers a call to wait_for_work
637-
callback_group->can_be_taken_from().exchange(false);
712+
// There should be no reference to our waitiable
713+
while (min_ref_cnt != non_triggered_in_first_cbg.use_count() ||
714+
min_ref_cnt != non_triggered_in_first_cbg2.use_count())
715+
{
716+
}
638717

639-
bool no_ready_executable = false;
718+
// wait for the callback group to be taken
719+
while (true) {
720+
// node and callback group ptrs are referenced
721+
if (cbg_min_ref_cnt != first_cbg.use_count()) {
722+
break;
723+
}
640724

641-
//now there should be no ready events now,
642-
executor.spin_once_with_callback(
643-
std::chrono::milliseconds(10), [&]() {
644-
no_ready_executable = true;
645-
});
725+
// is we got more references to the waitable, while the group pointer is not referenced, this is the wrong spot
726+
if (min_ref_cnt != non_triggered_in_first_cbg.use_count() ||
727+
min_ref_cnt != non_triggered_in_first_cbg2.use_count())
728+
{
729+
restart = true;
730+
break;
731+
}
732+
}
646733

647-
EXPECT_TRUE(no_ready_executable);
734+
if (restart) {
735+
continue;
736+
}
648737

649-
//rearm, so that rmw_wait will push a second entry into the queue
650-
for (auto & waitable : waitables) {
651-
waitable->trigger();
652-
}
738+
// callback group pointer is referenced
739+
while (true) {
740+
// trigger criteria, both pointers were collected
741+
if (min_ref_cnt != non_triggered_in_first_cbg.use_count() &&
742+
min_ref_cnt != non_triggered_in_first_cbg2.use_count())
743+
{
744+
break;
745+
}
653746

654-
no_ready_executable = false;
747+
// invalid, second pointer is referenced, but not first one
748+
if (min_ref_cnt == non_triggered_in_first_cbg.use_count() &&
749+
min_ref_cnt != non_triggered_in_first_cbg2.use_count())
750+
{
751+
restart = true;
752+
break;
753+
}
655754

656-
while (!no_ready_executable) {
657-
executor.spin_once_with_callback(
658-
std::chrono::milliseconds(10), [&]() {
659-
//unblock the callback group
660-
callback_group->can_be_taken_from().exchange(true);
755+
// group or node pointer was released, while waitable poitner were not taken.
756+
if (cbg_min_ref_cnt == first_cbg.use_count()) {
757+
restart = true;
758+
break;
759+
}
661760

662-
no_ready_executable = true;
761+
}
762+
if (restart) {
763+
continue;
764+
}
663765

664-
});
766+
break;
665767
}
666-
EXPECT_TRUE(no_ready_executable);
667768

668-
// now we process all events from get_next_ready_executable
669-
EXPECT_NO_THROW(
670-
for (int i = 0; i < 10; i++) {
671-
executor.spin_once(std::chrono::milliseconds(1));
769+
//we unblock the callback_group now, this should force the race condition
770+
cbg_start->release_execute();
771+
772+
std::this_thread::yield();
773+
std::this_thread::sleep_for(10ms);
774+
775+
size_t end_count = cbg_end->get_count();
776+
cbg_end->trigger();
777+
778+
// wait for all triggers to be executed, or for an exception to occur
779+
while (end_count == cbg_end->get_count() && !exception) {
780+
std::this_thread::sleep_for(10ms);
672781
}
673-
);
674782

675-
node.reset();
783+
EXPECT_FALSE(exception);
676784

785+
node.reset();
677786
rclcpp::shutdown();
678-
}
679787

788+
t.join();
789+
}
680790

681791
TYPED_TEST(TestExecutorsOnlyNode, missing_event)
682792
{
@@ -732,6 +842,11 @@ TYPED_TEST(TestExecutorsOnlyNode, missing_event)
732842
executor.spin_until_future_complete(my_waitable2_execute_future, max_spin_duration);
733843
}
734844

845+
EXPECT_EQ(1u, my_waitable->get_count());
846+
EXPECT_EQ(1u, my_waitable2->get_count());
847+
//now the second waitable should get processed
848+
executor.spin_once(std::chrono::milliseconds(10));
849+
735850
EXPECT_EQ(1u, my_waitable->get_count());
736851
EXPECT_EQ(1u, my_waitable2->get_count());
737852
}

0 commit comments

Comments
 (0)