Skip to content

Commit 8a470c3

Browse files
committed
Revert "Avoid losing waitable handles while using MultiThreadedExecutor (#2109)"
This reverts commit 232262c. Signed-off-by: William Woodall <[email protected]>
1 parent f9c4894 commit 8a470c3

File tree

2 files changed

+17
-178
lines changed

2 files changed

+17
-178
lines changed

rclcpp/include/rclcpp/strategies/allocator_memory_strategy.hpp

Lines changed: 9 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -17,7 +17,6 @@
1717

1818
#include <memory>
1919
#include <vector>
20-
#include <utility>
2120

2221
#include "rcl/allocator.h"
2322

@@ -146,7 +145,10 @@ class AllocatorMemoryStrategy : public memory_strategy::MemoryStrategy
146145
timer_handles_.end()
147146
);
148147

149-
waitable_handles_.clear();
148+
waitable_handles_.erase(
149+
std::remove(waitable_handles_.begin(), waitable_handles_.end(), nullptr),
150+
waitable_handles_.end()
151+
);
150152
}
151153

152154
bool collect_entities(const WeakCallbackGroupsToNodesMap & weak_groups_to_nodes) override
@@ -390,17 +392,16 @@ class AllocatorMemoryStrategy : public memory_strategy::MemoryStrategy
390392
rclcpp::AnyExecutable & any_exec,
391393
const WeakCallbackGroupsToNodesMap & weak_groups_to_nodes) override
392394
{
393-
auto & waitable_handles = waitable_triggered_handles_;
394-
auto it = waitable_handles.begin();
395-
while (it != waitable_handles.end()) {
395+
auto it = waitable_handles_.begin();
396+
while (it != waitable_handles_.end()) {
396397
std::shared_ptr<Waitable> & waitable = *it;
397398
if (waitable) {
398399
// Find the group for this handle and see if it can be serviced
399400
auto group = get_group_by_waitable(waitable, weak_groups_to_nodes);
400401
if (!group) {
401402
// Group was not found, meaning the waitable is not valid...
402403
// Remove it from the ready list and continue looking
403-
it = waitable_handles.erase(it);
404+
it = waitable_handles_.erase(it);
404405
continue;
405406
}
406407
if (!group->can_be_taken_from().load()) {
@@ -413,11 +414,11 @@ class AllocatorMemoryStrategy : public memory_strategy::MemoryStrategy
413414
any_exec.waitable = waitable;
414415
any_exec.callback_group = group;
415416
any_exec.node_base = get_node_by_group(group, weak_groups_to_nodes);
416-
waitable_handles.erase(it);
417+
waitable_handles_.erase(it);
417418
return;
418419
}
419420
// Else, the waitable is no longer valid, remove it and continue
420-
it = waitable_handles.erase(it);
421+
it = waitable_handles_.erase(it);
421422
}
422423
}
423424

@@ -498,8 +499,6 @@ class AllocatorMemoryStrategy : public memory_strategy::MemoryStrategy
498499
VectorRebind<std::shared_ptr<const rcl_timer_t>> timer_handles_;
499500
VectorRebind<std::shared_ptr<Waitable>> waitable_handles_;
500501

501-
VectorRebind<std::shared_ptr<Waitable>> waitable_triggered_handles_;
502-
503502
std::shared_ptr<VoidAlloc> allocator_;
504503
};
505504

rclcpp/test/rclcpp/strategies/test_allocator_memory_strategy.cpp

Lines changed: 8 additions & 168 deletions
Original file line numberDiff line numberDiff line change
@@ -60,57 +60,6 @@ class TestWaitable : public rclcpp::Waitable
6060
void execute(const std::shared_ptr<void> &) override {}
6161
};
6262

63-
static bool test_waitable_result2 = false;
64-
65-
class TestWaitable2 : public rclcpp::Waitable
66-
{
67-
public:
68-
explicit TestWaitable2(rcl_publisher_t * pub_ptr)
69-
: pub_ptr_(pub_ptr),
70-
pub_event_(rcl_get_zero_initialized_event())
71-
{
72-
EXPECT_EQ(
73-
rcl_publisher_event_init(&pub_event_, pub_ptr_, RCL_PUBLISHER_OFFERED_DEADLINE_MISSED),
74-
RCL_RET_OK);
75-
}
76-
77-
~TestWaitable2()
78-
{
79-
EXPECT_EQ(rcl_event_fini(&pub_event_), RCL_RET_OK);
80-
}
81-
82-
void add_to_wait_set(rcl_wait_set_t & wait_set) override
83-
{
84-
EXPECT_EQ(rcl_wait_set_add_event(&wait_set, &pub_event_, &wait_set_event_index_), RCL_RET_OK);
85-
}
86-
87-
bool is_ready(const rcl_wait_set_t &) override
88-
{
89-
return test_waitable_result2;
90-
}
91-
92-
std::shared_ptr<void>
93-
take_data() override
94-
{
95-
return nullptr;
96-
}
97-
98-
void execute(const std::shared_ptr<void> & data) override
99-
{
100-
(void) data;
101-
}
102-
103-
size_t get_number_of_ready_events() override
104-
{
105-
return 1;
106-
}
107-
108-
private:
109-
rcl_publisher_t * pub_ptr_;
110-
rcl_event_t pub_event_;
111-
size_t wait_set_event_index_;
112-
};
113-
11463
struct RclWaitSetSizes
11564
{
11665
size_t size_of_subscriptions = 0;
@@ -705,129 +654,20 @@ TEST_F(TestAllocatorMemoryStrategy, get_next_timer) {
705654
}
706655

707656
TEST_F(TestAllocatorMemoryStrategy, get_next_waitable) {
657+
auto node1 = std::make_shared<rclcpp::Node>("waitable_node", "ns");
658+
auto node2 = std::make_shared<rclcpp::Node>("waitable_node2", "ns");
659+
rclcpp::Waitable::SharedPtr waitable1 = std::make_shared<TestWaitable>();
660+
rclcpp::Waitable::SharedPtr waitable2 = std::make_shared<TestWaitable>();
661+
node1->get_node_waitables_interface()->add_waitable(waitable1, nullptr);
662+
node2->get_node_waitables_interface()->add_waitable(waitable2, nullptr);
663+
708664
auto get_next_entity = [this](const WeakCallbackGroupsToNodesMap & weak_groups_to_nodes) {
709665
rclcpp::AnyExecutable result;
710666
allocator_memory_strategy()->get_next_waitable(result, weak_groups_to_nodes);
711667
return result;
712668
};
713669

714-
{
715-
auto node1 = std::make_shared<rclcpp::Node>(
716-
"waitable_node", "ns",
717-
rclcpp::NodeOptions()
718-
.start_parameter_event_publisher(false)
719-
.start_parameter_services(false));
720-
721-
rclcpp::PublisherOptions pub_options;
722-
pub_options.use_default_callbacks = false;
723-
724-
auto pub1 = node1->create_publisher<test_msgs::msg::Empty>(
725-
"test_topic_1", rclcpp::QoS(10), pub_options);
726-
727-
auto waitable1 =
728-
std::make_shared<TestWaitable2>(pub1->get_publisher_handle().get());
729-
node1->get_node_waitables_interface()->add_waitable(waitable1, nullptr);
730-
731-
auto basic_node = create_node_with_disabled_callback_groups("basic_node");
732-
WeakCallbackGroupsToNodesMap weak_groups_to_nodes;
733-
basic_node->for_each_callback_group(
734-
[basic_node, &weak_groups_to_nodes](rclcpp::CallbackGroup::SharedPtr group_ptr)
735-
{
736-
weak_groups_to_nodes.insert(
737-
std::pair<rclcpp::CallbackGroup::WeakPtr,
738-
rclcpp::node_interfaces::NodeBaseInterface::WeakPtr>(
739-
group_ptr,
740-
basic_node->get_node_base_interface()));
741-
});
742-
node1->for_each_callback_group(
743-
[node1, &weak_groups_to_nodes](rclcpp::CallbackGroup::SharedPtr group_ptr)
744-
{
745-
weak_groups_to_nodes.insert(
746-
std::pair<rclcpp::CallbackGroup::WeakPtr,
747-
rclcpp::node_interfaces::NodeBaseInterface::WeakPtr>(
748-
group_ptr,
749-
node1->get_node_base_interface()));
750-
});
751-
allocator_memory_strategy()->collect_entities(weak_groups_to_nodes);
752-
753-
rcl_wait_set_t wait_set = rcl_get_zero_initialized_wait_set();
754-
ASSERT_EQ(
755-
rcl_wait_set_init(
756-
&wait_set,
757-
allocator_memory_strategy()->number_of_ready_subscriptions(),
758-
allocator_memory_strategy()->number_of_guard_conditions(),
759-
allocator_memory_strategy()->number_of_ready_timers(),
760-
allocator_memory_strategy()->number_of_ready_clients(),
761-
allocator_memory_strategy()->number_of_ready_services(),
762-
allocator_memory_strategy()->number_of_ready_events(),
763-
rclcpp::contexts::get_global_default_context()->get_rcl_context().get(),
764-
allocator_memory_strategy()->get_allocator()),
765-
RCL_RET_OK);
766-
767-
ASSERT_TRUE(allocator_memory_strategy()->add_handles_to_wait_set(&wait_set));
768-
769-
ASSERT_EQ(
770-
rcl_wait(
771-
&wait_set,
772-
std::chrono::duration_cast<std::chrono::nanoseconds>(std::chrono::milliseconds(100))
773-
.count()),
774-
RCL_RET_OK);
775-
test_waitable_result2 = true;
776-
allocator_memory_strategy()->remove_null_handles(&wait_set);
777-
778-
rclcpp::AnyExecutable result = get_next_entity(weak_groups_to_nodes);
779-
EXPECT_EQ(result.node_base, node1->get_node_base_interface());
780-
test_waitable_result2 = false;
781-
782-
EXPECT_EQ(rcl_wait_set_fini(&wait_set), RCL_RET_OK);
783-
}
784-
785-
{
786-
auto node2 = std::make_shared<rclcpp::Node>(
787-
"waitable_node2", "ns",
788-
rclcpp::NodeOptions()
789-
.start_parameter_services(false)
790-
.start_parameter_event_publisher(false));
791-
792-
rclcpp::PublisherOptions pub_options;
793-
pub_options.use_default_callbacks = false;
794-
795-
auto pub2 = node2->create_publisher<test_msgs::msg::Empty>(
796-
"test_topic_2", rclcpp::QoS(10), pub_options);
797-
798-
auto waitable2 =
799-
std::make_shared<TestWaitable2>(pub2->get_publisher_handle().get());
800-
node2->get_node_waitables_interface()->add_waitable(waitable2, nullptr);
801-
802-
auto basic_node2 = std::make_shared<rclcpp::Node>(
803-
"basic_node2", "ns",
804-
rclcpp::NodeOptions()
805-
.start_parameter_services(false)
806-
.start_parameter_event_publisher(false));
807-
WeakCallbackGroupsToNodesMap weak_groups_to_uncollected_nodes;
808-
basic_node2->for_each_callback_group(
809-
[basic_node2, &weak_groups_to_uncollected_nodes](rclcpp::CallbackGroup::SharedPtr group_ptr)
810-
{
811-
weak_groups_to_uncollected_nodes.insert(
812-
std::pair<rclcpp::CallbackGroup::WeakPtr,
813-
rclcpp::node_interfaces::NodeBaseInterface::WeakPtr>(
814-
group_ptr,
815-
basic_node2->get_node_base_interface()));
816-
});
817-
node2->for_each_callback_group(
818-
[node2,
819-
&weak_groups_to_uncollected_nodes](rclcpp::CallbackGroup::SharedPtr group_ptr)
820-
{
821-
weak_groups_to_uncollected_nodes.insert(
822-
std::pair<rclcpp::CallbackGroup::WeakPtr,
823-
rclcpp::node_interfaces::NodeBaseInterface::WeakPtr>(
824-
group_ptr,
825-
node2->get_node_base_interface()));
826-
});
827-
828-
rclcpp::AnyExecutable failed_result = get_next_entity(weak_groups_to_uncollected_nodes);
829-
EXPECT_EQ(failed_result.node_base, nullptr);
830-
}
670+
EXPECT_TRUE(TestGetNextEntity(node1, node2, get_next_entity));
831671
}
832672

833673
TEST_F(TestAllocatorMemoryStrategy, get_next_subscription_mutually_exclusive) {

0 commit comments

Comments
 (0)