Skip to content
Merged
Show file tree
Hide file tree
Changes from 4 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 3 additions & 3 deletions rclcpp/src/rclcpp/executor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -275,7 +275,7 @@ Executor::spin_until_future_complete_impl(
if (spinning.exchange(true)) {
throw std::runtime_error("spin_until_future_complete() called while already spinning");
}
RCPPUTILS_SCOPE_EXIT(this->spinning.store(false); );
RCPPUTILS_SCOPE_EXIT(wait_result_.reset();this->spinning.store(false););
while (rclcpp::ok(this->context_) && spinning.load()) {
// Do one item of work.
spin_once_impl(timeout_left);
Expand Down Expand Up @@ -364,7 +364,7 @@ Executor::spin_some_impl(std::chrono::nanoseconds max_duration, bool exhaustive)
if (spinning.exchange(true)) {
throw std::runtime_error("spin_some() called while already spinning");
}
RCPPUTILS_SCOPE_EXIT(this->spinning.store(false); );
RCPPUTILS_SCOPE_EXIT(wait_result_.reset();this->spinning.store(false););

// clear the wait result and wait for work without blocking to collect the work
// for the first time
Expand Down Expand Up @@ -431,7 +431,7 @@ Executor::spin_once(std::chrono::nanoseconds timeout)
if (spinning.exchange(true)) {
throw std::runtime_error("spin_once() called while already spinning");
}
RCPPUTILS_SCOPE_EXIT(this->spinning.store(false); );
RCPPUTILS_SCOPE_EXIT(wait_result_.reset();this->spinning.store(false););
spin_once_impl(timeout);
}

Expand Down
2 changes: 1 addition & 1 deletion rclcpp/src/rclcpp/executors/multi_threaded_executor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,7 @@ MultiThreadedExecutor::spin()
if (spinning.exchange(true)) {
throw std::runtime_error("spin() called while already spinning");
}
RCPPUTILS_SCOPE_EXIT(this->spinning.store(false););
RCPPUTILS_SCOPE_EXIT(wait_result_.reset();this->spinning.store(false););
std::vector<std::thread> threads;
size_t thread_id = 0;
{
Expand Down
2 changes: 1 addition & 1 deletion rclcpp/src/rclcpp/executors/single_threaded_executor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ SingleThreadedExecutor::spin()
if (spinning.exchange(true)) {
throw std::runtime_error("spin() called while already spinning");
}
RCPPUTILS_SCOPE_EXIT(this->spinning.store(false); );
RCPPUTILS_SCOPE_EXIT(wait_result_.reset();this->spinning.store(false););

// Clear any previous result and rebuild the waitset
this->wait_result_.reset();
Expand Down
2 changes: 1 addition & 1 deletion rclcpp/test/rclcpp/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -641,7 +641,7 @@ ament_add_gtest(test_executor test_executor.cpp
TIMEOUT 120)
ament_add_test_label(test_executor mimick)
if(TARGET test_executor)
target_link_libraries(test_executor ${PROJECT_NAME} mimick)
target_link_libraries(test_executor ${PROJECT_NAME} mimick ${test_msgs_TARGETS})
endif()

ament_add_gtest(test_graph_listener test_graph_listener.cpp)
Expand Down
61 changes: 61 additions & 0 deletions rclcpp/test/rclcpp/test_executor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -21,8 +21,10 @@

#include "rclcpp/executor.hpp"
#include "rclcpp/memory_strategy.hpp"
#include "rclcpp/executors/multi_threaded_executor.hpp"
#include "rclcpp/executors/single_threaded_executor.hpp"
#include "rclcpp/strategies/allocator_memory_strategy.hpp"
#include "test_msgs/srv/empty.hpp"

#include "../mocking_utils/patch.hpp"
#include "../utils/rclcpp_gtest_macros.hpp"
Expand Down Expand Up @@ -508,3 +510,62 @@ TEST_F(TestExecutor, is_spinning) {

ASSERT_TRUE(timer_called);
}

class TestAllThreadedExecutor
: public ::testing::Test, public ::testing::WithParamInterface<std::string>
{
public:
void SetUp() override
{
rclcpp::init(0, nullptr);
}

void TearDown() override
{
rclcpp::shutdown();
}
};

TEST_P(TestAllThreadedExecutor, release_ownership_entity_after_spinning_cancel) {
using namespace std::chrono_literals;
const std::string single_threaded_executor(
typeid(rclcpp::executors::SingleThreadedExecutor).name());
const std::string multi_threaded_executor(
typeid(rclcpp::executors::MultiThreadedExecutor).name());

// Create an Executor
auto type_name = GetParam();
std::shared_ptr<rclcpp::Executor> executor;
if (single_threaded_executor == type_name) {
executor = std::make_shared<rclcpp::executors::SingleThreadedExecutor>();
} else if (multi_threaded_executor == type_name) {
executor = std::make_shared<rclcpp::executors::MultiThreadedExecutor>();
} else {
FAIL() << "Unsupported Executor Type !";
}

auto future = std::async(std::launch::async, [&executor] {executor->spin();});

auto node = std::make_shared<rclcpp::Node>("test_node");
auto callback = [](
const test_msgs::srv::Empty::Request::SharedPtr, test_msgs::srv::Empty::Response::SharedPtr) {
};
auto server = node->create_service<test_msgs::srv::Empty>("test_service", callback);
while (!executor->is_spinning()) {
std::this_thread::sleep_for(50ms);
}
executor->add_node(node);
std::this_thread::sleep_for(50ms);
executor->cancel();
std::future_status future_status = future.wait_for(1s);
EXPECT_EQ(future_status, std::future_status::ready);

EXPECT_EQ(server.use_count(), 1);
}

INSTANTIATE_TEST_SUITE_P(
TestAllThreadedExecutorWithParam,
TestAllThreadedExecutor,
::testing::Values(
std::string(typeid(rclcpp::executors::SingleThreadedExecutor).name()),
std::string(typeid(rclcpp::executors::MultiThreadedExecutor).name())));