Skip to content

Commit 802bfc2

Browse files
Add wait_for_all_acked support (#1662)
* Add wait_for_all_acked support Signed-off-by: Barry Xu <[email protected]> * Update the description of wait_for_all_acked Signed-off-by: Barry Xu <[email protected]> * Use rcpputils::convert_to_nanoseconds() for time conversion Signed-off-by: Barry Xu <[email protected]>
1 parent 152dbc6 commit 802bfc2

File tree

2 files changed

+120
-1
lines changed

2 files changed

+120
-1
lines changed

rclcpp/include/rclcpp/publisher_base.hpp

Lines changed: 42 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@
1818
#include <rmw/error_handling.h>
1919
#include <rmw/rmw.h>
2020

21+
#include <chrono>
2122
#include <functional>
2223
#include <iostream>
2324
#include <memory>
@@ -33,6 +34,7 @@
3334
#include "rclcpp/qos_event.hpp"
3435
#include "rclcpp/type_support_decl.hpp"
3536
#include "rclcpp/visibility_control.hpp"
37+
#include "rcpputils/time.hpp"
3638

3739
namespace rclcpp
3840
{
@@ -203,6 +205,46 @@ class PublisherBase : public std::enable_shared_from_this<PublisherBase>
203205
std::vector<rclcpp::NetworkFlowEndpoint>
204206
get_network_flow_endpoints() const;
205207

208+
/// Wait until all published messages are acknowledged or until the specified timeout elapses.
209+
/**
210+
* This method waits until all published messages are acknowledged by all matching
211+
* subscriptions or the given timeout elapses.
212+
*
213+
* If the timeout is negative then this method will block indefinitely until all published
214+
* messages are acknowledged.
215+
* If the timeout is zero then this method will not block, it will check if all published
216+
* messages are acknowledged and return immediately.
217+
* If the timeout is greater than zero, this method will wait until all published messages are
218+
* acknowledged or the timeout elapses.
219+
*
220+
* This method only waits for acknowledgments if the publisher's QoS profile is RELIABLE.
221+
* Otherwise this method will immediately return `true`.
222+
*
223+
* \param[in] timeout the duration to wait for all published messages to be acknowledged.
224+
* \return `true` if all published messages were acknowledged before the given timeout
225+
* elapsed, otherwise `false`.
226+
* \throws rclcpp::exceptions::RCLError if middleware doesn't support or internal error occurs
227+
* \throws std::invalid_argument if timeout is greater than std::chrono::nanoseconds::max() or
228+
* less than std::chrono::nanoseconds::min()
229+
*/
230+
template<typename DurationRepT = int64_t, typename DurationT = std::milli>
231+
bool
232+
wait_for_all_acked(
233+
std::chrono::duration<DurationRepT, DurationT> timeout =
234+
std::chrono::duration<DurationRepT, DurationT>(-1)) const
235+
{
236+
rcl_duration_value_t rcl_timeout = rcpputils::convert_to_nanoseconds(timeout).count();
237+
238+
rcl_ret_t ret = rcl_publisher_wait_for_all_acked(publisher_handle_.get(), rcl_timeout);
239+
if (ret == RCL_RET_OK) {
240+
return true;
241+
} else if (ret == RCL_RET_TIMEOUT) {
242+
return false;
243+
} else {
244+
rclcpp::exceptions::throw_from_rcl_error(ret);
245+
}
246+
}
247+
206248
protected:
207249
template<typename EventCallbackT>
208250
void

rclcpp/test/rclcpp/test_publisher.cpp

Lines changed: 78 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -14,8 +14,9 @@
1414

1515
#include <gtest/gtest.h>
1616

17-
#include <string>
17+
#include <chrono>
1818
#include <memory>
19+
#include <string>
1920
#include <utility>
2021
#include <vector>
2122

@@ -28,6 +29,7 @@
2829
#include "../utils/rclcpp_gtest_macros.hpp"
2930

3031
#include "test_msgs/msg/empty.hpp"
32+
#include "test_msgs/msg/strings.hpp"
3133

3234
class TestPublisher : public ::testing::Test
3335
{
@@ -536,3 +538,78 @@ TEST_F(TestPublisher, get_network_flow_endpoints_errors) {
536538
EXPECT_NO_THROW(publisher->get_network_flow_endpoints());
537539
}
538540
}
541+
542+
TEST_F(TestPublisher, check_wait_for_all_acked_return) {
543+
initialize();
544+
const rclcpp::QoS publisher_qos(1);
545+
auto publisher = node->create_publisher<test_msgs::msg::Empty>("topic", publisher_qos);
546+
547+
{
548+
// Using 'self' instead of 'lib:rclcpp' because `rcl_publisher_wait_for_all_acked` is entirely
549+
// defined in a header
550+
auto mock = mocking_utils::patch_and_return(
551+
"self", rcl_publisher_wait_for_all_acked, RCL_RET_OK);
552+
EXPECT_TRUE(publisher->wait_for_all_acked(std::chrono::milliseconds(-1)));
553+
}
554+
555+
{
556+
// Using 'self' instead of 'lib:rclcpp' because `rcl_publisher_wait_for_all_acked` is entirely
557+
// defined in a header
558+
auto mock = mocking_utils::patch_and_return(
559+
"self", rcl_publisher_wait_for_all_acked, RCL_RET_TIMEOUT);
560+
EXPECT_FALSE(publisher->wait_for_all_acked(std::chrono::milliseconds(-1)));
561+
}
562+
563+
{
564+
// Using 'self' instead of 'lib:rclcpp' because `rcl_publisher_wait_for_all_acked` is entirely
565+
// defined in a header
566+
auto mock = mocking_utils::patch_and_return(
567+
"self", rcl_publisher_wait_for_all_acked, RCL_RET_UNSUPPORTED);
568+
EXPECT_THROW(
569+
publisher->wait_for_all_acked(std::chrono::milliseconds(-1)),
570+
rclcpp::exceptions::RCLError);
571+
}
572+
573+
{
574+
// Using 'self' instead of 'lib:rclcpp' because `rcl_publisher_wait_for_all_acked` is entirely
575+
// defined in a header
576+
auto mock = mocking_utils::patch_and_return(
577+
"self", rcl_publisher_wait_for_all_acked, RCL_RET_ERROR);
578+
EXPECT_THROW(
579+
publisher->wait_for_all_acked(std::chrono::milliseconds(-1)),
580+
rclcpp::exceptions::RCLError);
581+
}
582+
}
583+
584+
class TestPublisherWaitForAllAcked
585+
: public TestPublisher, public ::testing::WithParamInterface<std::pair<rclcpp::QoS, rclcpp::QoS>>
586+
{
587+
};
588+
589+
TEST_P(TestPublisherWaitForAllAcked, check_wait_for_all_acked_with_QosPolicy) {
590+
initialize();
591+
592+
auto do_nothing = [](std::shared_ptr<const test_msgs::msg::Strings>) {};
593+
auto pub = node->create_publisher<test_msgs::msg::Strings>("topic", std::get<0>(GetParam()));
594+
auto sub = node->create_subscription<test_msgs::msg::Strings>(
595+
"topic",
596+
std::get<1>(GetParam()),
597+
do_nothing);
598+
599+
auto msg = std::make_shared<test_msgs::msg::Strings>();
600+
for (int i = 0; i < 20; i++) {
601+
ASSERT_NO_THROW(pub->publish(*msg));
602+
}
603+
EXPECT_TRUE(pub->wait_for_all_acked(std::chrono::milliseconds(500)));
604+
}
605+
606+
INSTANTIATE_TEST_SUITE_P(
607+
TestWaitForAllAckedWithParm,
608+
TestPublisherWaitForAllAcked,
609+
::testing::Values(
610+
std::pair<rclcpp::QoS, rclcpp::QoS>(
611+
rclcpp::QoS(1).reliable(), rclcpp::QoS(1).reliable()),
612+
std::pair<rclcpp::QoS, rclcpp::QoS>(
613+
rclcpp::QoS(1).best_effort(), rclcpp::QoS(1).best_effort()),
614+
std::pair<rclcpp::QoS, rclcpp::QoS>(
615+
rclcpp::QoS(1).reliable(), rclcpp::QoS(1).best_effort())));

0 commit comments

Comments
 (0)