|
26 | 26 | #include "rcl/allocator.h" |
27 | 27 | #include "rcl/error_handling.h" |
28 | 28 | #include "rclcpp/executors/executor_notify_waitable.hpp" |
| 29 | +#include "rclcpp/logging.hpp" |
29 | 30 | #include "rclcpp/subscription_wait_set_mask.hpp" |
30 | 31 | #include "rcpputils/scope_exit.hpp" |
31 | 32 |
|
@@ -480,9 +481,18 @@ Executor::execute_any_executable(AnyExecutable & any_exec) |
480 | 481 | const std::shared_ptr<void> & const_data = any_exec.data; |
481 | 482 | any_exec.waitable->execute(const_data); |
482 | 483 | } |
| 484 | +} |
483 | 485 |
|
484 | | - // Reset the callback_group, regardless of type |
485 | | - any_exec.callback_group->can_be_taken_from().store(true); |
| 486 | +void |
| 487 | +Executor::trigger_executor_notify() |
| 488 | +{ |
| 489 | + try { |
| 490 | + interrupt_guard_condition_->trigger(); |
| 491 | + } catch (const rclcpp::exceptions::RCLError & ex) { |
| 492 | + throw std::runtime_error( |
| 493 | + std::string( |
| 494 | + "Failed to trigger guard condition on callback group change: ") + ex.what()); |
| 495 | + } |
486 | 496 | } |
487 | 497 |
|
488 | 498 | template<typename Taker, typename Handler> |
@@ -770,6 +780,53 @@ Executor::wait_for_work(std::chrono::nanoseconds timeout) |
770 | 780 | } |
771 | 781 | } |
772 | 782 |
|
| 783 | +void Executor::prepare_work() |
| 784 | +{ |
| 785 | + { |
| 786 | + std::lock_guard<std::mutex> guard(mutex_); |
| 787 | + if (this->entities_need_rebuild_.exchange(false) || current_collection_.empty()) { |
| 788 | + RCLCPP_INFO(rclcpp::get_logger("rclcpp"), "Rebuilding executor entities"); |
| 789 | + this->collect_entities(); |
| 790 | + } |
| 791 | + |
| 792 | + auto callback_groups = this->collector_.get_all_callback_groups(); |
| 793 | + wait_for_work_cbgs_.resize(callback_groups.size()); |
| 794 | + for(const auto & w_ptr : callback_groups) { |
| 795 | + auto shr_ptr = w_ptr.lock(); |
| 796 | + if(shr_ptr) { |
| 797 | + wait_for_work_cbgs_.push_back(std::move(shr_ptr)); |
| 798 | + } |
| 799 | + } |
| 800 | + } |
| 801 | +} |
| 802 | + |
| 803 | +void |
| 804 | +Executor::wait_for_work_simple(std::chrono::nanoseconds timeout) |
| 805 | +{ |
| 806 | + TRACETOOLS_TRACEPOINT(rclcpp_executor_wait_for_work, timeout.count()); |
| 807 | + |
| 808 | + this->wait_result_.reset(); |
| 809 | + this->wait_result_.emplace(wait_set_.wait(timeout)); |
| 810 | + |
| 811 | + // drop references to the callback groups, before trying to execute anything |
| 812 | + wait_for_work_cbgs_.clear(); |
| 813 | + |
| 814 | + if (!this->wait_result_ || this->wait_result_->kind() == WaitResultKind::Empty) { |
| 815 | + RCUTILS_LOG_WARN_NAMED( |
| 816 | + "rclcpp", |
| 817 | + "empty wait set received in wait(). This should never happen."); |
| 818 | + } else { |
| 819 | + if (this->wait_result_->kind() == WaitResultKind::Ready && current_notify_waitable_) { |
| 820 | + RCLCPP_INFO(rclcpp::get_logger("rclcpp"), "Getting wait set"); |
| 821 | + auto & rcl_wait_set = this->wait_result_->get_wait_set().get_rcl_wait_set(); |
| 822 | + if (current_notify_waitable_->is_ready(rcl_wait_set)) { |
| 823 | + RCLCPP_INFO(rclcpp::get_logger("rclcpp"), "Executing notify waitable"); |
| 824 | + current_notify_waitable_->execute(current_notify_waitable_->take_data()); |
| 825 | + } |
| 826 | + } |
| 827 | + } |
| 828 | +} |
| 829 | + |
773 | 830 | bool |
774 | 831 | Executor::get_next_ready_executable(AnyExecutable & any_executable) |
775 | 832 | { |
@@ -896,20 +953,26 @@ Executor::get_next_ready_executable(AnyExecutable & any_executable) |
896 | 953 | bool |
897 | 954 | Executor::get_next_executable(AnyExecutable & any_executable, std::chrono::nanoseconds timeout) |
898 | 955 | { |
| 956 | + notify_mutex_.lock(); |
899 | 957 | bool success = false; |
900 | 958 | // Check to see if there are any subscriptions or timers needing service |
901 | 959 | // TODO(wjwwood): improve run to run efficiency of this function |
902 | 960 | success = get_next_ready_executable(any_executable); |
903 | 961 | // If there are none |
904 | 962 | if (!success) { |
905 | 963 | // Wait for subscriptions or timers to work on |
| 964 | + prepare_work(); |
| 965 | + notify_mutex_.unlock(); |
906 | 966 | wait_for_work(timeout); |
907 | 967 | if (!spinning.load()) { |
908 | 968 | return false; |
909 | 969 | } |
910 | 970 | // Try again |
911 | 971 | success = get_next_ready_executable(any_executable); |
912 | 972 | } |
| 973 | + else { |
| 974 | + notify_mutex_.unlock(); |
| 975 | + } |
913 | 976 | return success; |
914 | 977 | } |
915 | 978 |
|
|
0 commit comments