Skip to content

Commit 7ac0f83

Browse files
committed
Avoid losing waitable handles while using MultiThreadedExecutor
Signed-off-by: Barry Xu <[email protected]>
1 parent 28e4b1b commit 7ac0f83

File tree

4 files changed

+194
-19
lines changed

4 files changed

+194
-19
lines changed

rclcpp/include/rclcpp/memory_strategy.hpp

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -93,7 +93,8 @@ class RCLCPP_PUBLIC MemoryStrategy
9393
virtual void
9494
get_next_waitable(
9595
rclcpp::AnyExecutable & any_exec,
96-
const WeakCallbackGroupsToNodesMap & weak_groups_to_nodes) = 0;
96+
const WeakCallbackGroupsToNodesMap & weak_groups_to_nodes,
97+
rcl_wait_set_t * wait_set) = 0;
9798

9899
virtual rcl_allocator_t
99100
get_allocator() = 0;

rclcpp/include/rclcpp/strategies/allocator_memory_strategy.hpp

Lines changed: 15 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -122,6 +122,8 @@ class AllocatorMemoryStrategy : public memory_strategy::MemoryStrategy
122122
for (size_t i = 0; i < waitable_handles_.size(); ++i) {
123123
if (!waitable_handles_[i]->is_ready(wait_set)) {
124124
waitable_handles_[i].reset();
125+
} else {
126+
waitable_triggered_handles_.emplace_back(waitable_handles_[i]);
125127
}
126128
}
127129

@@ -390,18 +392,19 @@ class AllocatorMemoryStrategy : public memory_strategy::MemoryStrategy
390392
void
391393
get_next_waitable(
392394
rclcpp::AnyExecutable & any_exec,
393-
const WeakCallbackGroupsToNodesMap & weak_groups_to_nodes) override
395+
const WeakCallbackGroupsToNodesMap & weak_groups_to_nodes,
396+
rcl_wait_set_t * wait_set) override
394397
{
395-
auto it = waitable_handles_.begin();
396-
while (it != waitable_handles_.end()) {
398+
auto it = waitable_triggered_handles_.begin();
399+
while (it != waitable_triggered_handles_.end()) {
397400
std::shared_ptr<Waitable> & waitable = *it;
398401
if (waitable) {
399402
// Find the group for this handle and see if it can be serviced
400403
auto group = get_group_by_waitable(waitable, weak_groups_to_nodes);
401404
if (!group) {
402405
// Group was not found, meaning the waitable is not valid...
403406
// Remove it from the ready list and continue looking
404-
it = waitable_handles_.erase(it);
407+
it = waitable_triggered_handles_.erase(it);
405408
continue;
406409
}
407410
if (!group->can_be_taken_from().load()) {
@@ -410,15 +413,19 @@ class AllocatorMemoryStrategy : public memory_strategy::MemoryStrategy
410413
++it;
411414
continue;
412415
}
416+
if (!waitable->is_ready(wait_set)) {
417+
it = waitable_triggered_handles_.erase(it);
418+
continue;
419+
}
413420
// Otherwise it is safe to set and return the any_exec
414421
any_exec.waitable = waitable;
415422
any_exec.callback_group = group;
416423
any_exec.node_base = get_node_by_group(group, weak_groups_to_nodes);
417-
waitable_handles_.erase(it);
424+
waitable_triggered_handles_.erase(it);
418425
return;
419426
}
420427
// Else, the waitable is no longer valid, remove it and continue
421-
it = waitable_handles_.erase(it);
428+
it = waitable_triggered_handles_.erase(it);
422429
}
423430
}
424431

@@ -499,6 +506,8 @@ class AllocatorMemoryStrategy : public memory_strategy::MemoryStrategy
499506
VectorRebind<std::shared_ptr<const rcl_timer_t>> timer_handles_;
500507
VectorRebind<std::shared_ptr<Waitable>> waitable_handles_;
501508

509+
VectorRebind<std::shared_ptr<Waitable>> waitable_triggered_handles_;
510+
502511
std::shared_ptr<VoidAlloc> allocator_;
503512
};
504513

rclcpp/src/rclcpp/executor.cpp

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -884,7 +884,7 @@ Executor::get_next_ready_executable_from_map(
884884
}
885885
if (!success) {
886886
// Check the waitables to see if there are any that are ready
887-
memory_strategy_->get_next_waitable(any_executable, weak_groups_to_nodes);
887+
memory_strategy_->get_next_waitable(any_executable, weak_groups_to_nodes, &wait_set_);
888888
if (any_executable.waitable) {
889889
any_executable.data = any_executable.waitable->take_data();
890890
success = true;

rclcpp/test/rclcpp/strategies/test_allocator_memory_strategy.cpp

Lines changed: 176 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -63,6 +63,60 @@ class TestWaitable : public rclcpp::Waitable
6363
}
6464
};
6565

66+
static bool test_waitable_result2 = false;
67+
68+
/**
69+
* Mock Waitable class, with a globally setable boolean result.
70+
*/
71+
class TestWaitable2 : public rclcpp::Waitable
72+
{
73+
public:
74+
explicit TestWaitable2(rcl_publisher_t * pub_ptr)
75+
: pub_ptr_(pub_ptr),
76+
pub_event_(rcl_get_zero_initialized_event())
77+
{
78+
EXPECT_EQ(
79+
rcl_publisher_event_init(&pub_event_, pub_ptr_, RCL_PUBLISHER_OFFERED_DEADLINE_MISSED),
80+
RCL_RET_OK);
81+
}
82+
83+
~TestWaitable2()
84+
{
85+
EXPECT_EQ(rcl_event_fini(&pub_event_), RCL_RET_OK);
86+
}
87+
88+
void add_to_wait_set(rcl_wait_set_t * wait_set) override
89+
{
90+
EXPECT_EQ(rcl_wait_set_add_event(wait_set, &pub_event_, &wait_set_event_index_), RCL_RET_OK);
91+
}
92+
93+
bool is_ready(rcl_wait_set_t *) override
94+
{
95+
return test_waitable_result2;
96+
}
97+
98+
std::shared_ptr<void>
99+
take_data() override
100+
{
101+
return nullptr;
102+
}
103+
104+
void execute(std::shared_ptr<void> & data) override
105+
{
106+
(void) data;
107+
}
108+
109+
size_t get_number_of_ready_events() override
110+
{
111+
return 1;
112+
}
113+
114+
private:
115+
rcl_publisher_t * pub_ptr_;
116+
rcl_event_t pub_event_;
117+
size_t wait_set_event_index_;
118+
};
119+
66120
struct RclWaitSetSizes
67121
{
68122
size_t size_of_subscriptions = 0;
@@ -657,20 +711,129 @@ TEST_F(TestAllocatorMemoryStrategy, get_next_timer) {
657711
}
658712

659713
TEST_F(TestAllocatorMemoryStrategy, get_next_waitable) {
660-
auto node1 = std::make_shared<rclcpp::Node>("waitable_node", "ns");
661-
auto node2 = std::make_shared<rclcpp::Node>("waitable_node2", "ns");
662-
rclcpp::Waitable::SharedPtr waitable1 = std::make_shared<TestWaitable>();
663-
rclcpp::Waitable::SharedPtr waitable2 = std::make_shared<TestWaitable>();
664-
node1->get_node_waitables_interface()->add_waitable(waitable1, nullptr);
665-
node2->get_node_waitables_interface()->add_waitable(waitable2, nullptr);
666-
667714
auto get_next_entity = [this](const WeakCallbackGroupsToNodesMap & weak_groups_to_nodes) {
668715
rclcpp::AnyExecutable result;
669-
allocator_memory_strategy()->get_next_waitable(result, weak_groups_to_nodes);
716+
allocator_memory_strategy()->get_next_waitable(result, weak_groups_to_nodes, nullptr);
670717
return result;
671718
};
672719

673-
EXPECT_TRUE(TestGetNextEntity(node1, node2, get_next_entity));
720+
{
721+
auto node1 = std::make_shared<rclcpp::Node>(
722+
"waitable_node", "ns",
723+
rclcpp::NodeOptions()
724+
.start_parameter_event_publisher(false)
725+
.start_parameter_services(false));
726+
727+
rclcpp::PublisherOptions pub_options;
728+
pub_options.use_default_callbacks = false;
729+
730+
auto pub1 = node1->create_publisher<test_msgs::msg::Empty>(
731+
"test_topic_1", rclcpp::QoS(10), pub_options);
732+
733+
auto waitable1 =
734+
std::make_shared<TestWaitable2>(pub1->get_publisher_handle().get());
735+
node1->get_node_waitables_interface()->add_waitable(waitable1, nullptr);
736+
737+
auto basic_node = create_node_with_disabled_callback_groups("basic_node");
738+
WeakCallbackGroupsToNodesMap weak_groups_to_nodes;
739+
basic_node->for_each_callback_group(
740+
[basic_node, &weak_groups_to_nodes](rclcpp::CallbackGroup::SharedPtr group_ptr)
741+
{
742+
weak_groups_to_nodes.insert(
743+
std::pair<rclcpp::CallbackGroup::WeakPtr,
744+
rclcpp::node_interfaces::NodeBaseInterface::WeakPtr>(
745+
group_ptr,
746+
basic_node->get_node_base_interface()));
747+
});
748+
node1->for_each_callback_group(
749+
[node1, &weak_groups_to_nodes](rclcpp::CallbackGroup::SharedPtr group_ptr)
750+
{
751+
weak_groups_to_nodes.insert(
752+
std::pair<rclcpp::CallbackGroup::WeakPtr,
753+
rclcpp::node_interfaces::NodeBaseInterface::WeakPtr>(
754+
group_ptr,
755+
node1->get_node_base_interface()));
756+
});
757+
allocator_memory_strategy()->collect_entities(weak_groups_to_nodes);
758+
759+
rcl_wait_set_t wait_set = rcl_get_zero_initialized_wait_set();
760+
ASSERT_EQ(
761+
rcl_wait_set_init(
762+
&wait_set,
763+
allocator_memory_strategy()->number_of_ready_subscriptions(),
764+
allocator_memory_strategy()->number_of_guard_conditions(),
765+
allocator_memory_strategy()->number_of_ready_timers(),
766+
allocator_memory_strategy()->number_of_ready_clients(),
767+
allocator_memory_strategy()->number_of_ready_services(),
768+
allocator_memory_strategy()->number_of_ready_events(),
769+
rclcpp::contexts::get_global_default_context()->get_rcl_context().get(),
770+
allocator_memory_strategy()->get_allocator()),
771+
RCL_RET_OK);
772+
773+
ASSERT_TRUE(allocator_memory_strategy()->add_handles_to_wait_set(&wait_set));
774+
775+
ASSERT_EQ(
776+
rcl_wait(
777+
&wait_set,
778+
std::chrono::duration_cast<std::chrono::nanoseconds>(std::chrono::milliseconds(100))
779+
.count()),
780+
RCL_RET_OK);
781+
test_waitable_result2 = true;
782+
allocator_memory_strategy()->remove_null_handles(&wait_set);
783+
784+
rclcpp::AnyExecutable result = get_next_entity(weak_groups_to_nodes);
785+
EXPECT_EQ(result.node_base, node1->get_node_base_interface());
786+
test_waitable_result2 = false;
787+
788+
EXPECT_EQ(rcl_wait_set_fini(&wait_set), RCL_RET_OK);
789+
}
790+
791+
{
792+
auto node2 = std::make_shared<rclcpp::Node>(
793+
"waitable_node2", "ns",
794+
rclcpp::NodeOptions()
795+
.start_parameter_services(false)
796+
.start_parameter_event_publisher(false));
797+
798+
rclcpp::PublisherOptions pub_options;
799+
pub_options.use_default_callbacks = false;
800+
801+
auto pub2 = node2->create_publisher<test_msgs::msg::Empty>(
802+
"test_topic_2", rclcpp::QoS(10), pub_options);
803+
804+
auto waitable2 =
805+
std::make_shared<TestWaitable2>(pub2->get_publisher_handle().get());
806+
node2->get_node_waitables_interface()->add_waitable(waitable2, nullptr);
807+
808+
auto basic_node2 = std::make_shared<rclcpp::Node>(
809+
"basic_node2", "ns",
810+
rclcpp::NodeOptions()
811+
.start_parameter_services(false)
812+
.start_parameter_event_publisher(false));
813+
WeakCallbackGroupsToNodesMap weak_groups_to_uncollected_nodes;
814+
basic_node2->for_each_callback_group(
815+
[basic_node2, &weak_groups_to_uncollected_nodes](rclcpp::CallbackGroup::SharedPtr group_ptr)
816+
{
817+
weak_groups_to_uncollected_nodes.insert(
818+
std::pair<rclcpp::CallbackGroup::WeakPtr,
819+
rclcpp::node_interfaces::NodeBaseInterface::WeakPtr>(
820+
group_ptr,
821+
basic_node2->get_node_base_interface()));
822+
});
823+
node2->for_each_callback_group(
824+
[node2,
825+
&weak_groups_to_uncollected_nodes](rclcpp::CallbackGroup::SharedPtr group_ptr)
826+
{
827+
weak_groups_to_uncollected_nodes.insert(
828+
std::pair<rclcpp::CallbackGroup::WeakPtr,
829+
rclcpp::node_interfaces::NodeBaseInterface::WeakPtr>(
830+
group_ptr,
831+
node2->get_node_base_interface()));
832+
});
833+
834+
rclcpp::AnyExecutable failed_result = get_next_entity(weak_groups_to_uncollected_nodes);
835+
EXPECT_EQ(failed_result.node_base, nullptr);
836+
}
674837
}
675838

676839
TEST_F(TestAllocatorMemoryStrategy, get_next_subscription_mutually_exclusive) {
@@ -735,11 +898,13 @@ TEST_F(TestAllocatorMemoryStrategy, get_next_waitable_mutually_exclusive) {
735898
callback_group->can_be_taken_from() = false;
736899

737900
rclcpp::AnyExecutable result;
738-
allocator_memory_strategy()->get_next_waitable(result, weak_groups_to_nodes);
901+
allocator_memory_strategy()->get_next_waitable(result, weak_groups_to_nodes, nullptr);
739902
return result;
740903
};
741904

905+
test_waitable_result = true;
742906
EXPECT_TRUE(TestGetNextEntityMutuallyExclusive(node, get_next_entity));
907+
test_waitable_result = false;
743908
}
744909

745910
TEST_F(TestAllocatorMemoryStrategy, get_next_subscription_out_of_scope) {
@@ -902,6 +1067,6 @@ TEST_F(TestAllocatorMemoryStrategy, get_next_waitable_out_of_scope) {
9021067
EXPECT_EQ(1u, allocator_memory_strategy()->number_of_waitables());
9031068

9041069
rclcpp::AnyExecutable result;
905-
allocator_memory_strategy()->get_next_waitable(result, weak_groups_to_nodes);
1070+
allocator_memory_strategy()->get_next_waitable(result, weak_groups_to_nodes, nullptr);
9061071
EXPECT_EQ(nullptr, result.node_base);
9071072
}

0 commit comments

Comments
 (0)