Skip to content

Commit c47affa

Browse files
authored
Add new API for the RealtimePublisher (#323)
1 parent 077324d commit c47affa

File tree

5 files changed

+132
-21
lines changed

5 files changed

+132
-21
lines changed

doc/migration.rst

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -7,3 +7,4 @@ This list summarizes important changes between Jazzy (previous) and Kilted (curr
77
RealtimeBox
88
*******************************
99
* ``RealtimeBox`` is deprecated. Update your code to use ``realtime_thread_safe_box.hpp`` header, and class name ``RealtimeThreadSafeBox`` instead. (`#318 <https://github.com/ros-controls/realtime_tools/pull/318>`__, `#342 <https://github.com/ros-controls/realtime_tools/pull/342>`__).
10+
* ``RealtimePublisher`` is updated with a new ``try_publish`` API. Update your code with a local message variable and call ``try_publish`` with that variable. The old API is deprecated and will be removed in a future release. (`#323 <https://github.com/ros-controls/realtime_tools/pull/323>`__).

doc/release_notes.rst

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -7,3 +7,4 @@ This list summarizes the changes between Jazzy (previous) and Kilted (current) r
77
RealtimeBox
88
*******************************
99
* ``RealtimeBox`` got renamed to ``RealtimeThreadSafeBox`` and uses real-time mutexes now (`#318 <https://github.com/ros-controls/realtime_tools/pull/318>`__, `#342 <https://github.com/ros-controls/realtime_tools/pull/342>`__).
10+
* ``RealtimePublisher`` is updated with a new ``try_publish`` API and few methods are deprecated (`#323 <https://github.com/ros-controls/realtime_tools/pull/323>`__).

realtime_tools/doc/realtime_publisher.rst

Lines changed: 7 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,7 @@ The ``realtime_tools::RealtimePublisher`` allows users that write C++ ros2_contr
1212
private:
1313
std::shared_ptr<realtime_tools::RealtimePublisher<my_msgs::msg::MyMsg>> state_publisher_;
1414
std::shared_ptr<rclcpp::Publisher<my_msgs::msg::MyMsg>> s_publisher_;
15+
my_msgs::msg::MyMsg some_msg_;
1516
}
1617
1718
controller_interface::CallbackReturn MyController::on_configure(
@@ -21,7 +22,8 @@ The ``realtime_tools::RealtimePublisher`` allows users that write C++ ros2_contr
2122
s_publisher_ = get_node()->create_publisher<my_msgs::msg::MyMsg>(
2223
"~/status", rclcpp::SystemDefaultsQoS());
2324
state_publisher_ =
24-
std::make_unique<realtime_tools::RealtimePublisher<ControllerStateMsg>>(s_publisher_);
25+
std::make_unique<realtime_tools::RealtimePublisher<my_msgs::msg::MyMsg>>(s_publisher_);
26+
some_msg_.header.frame_id = params_.frame_id;
2527
...
2628
}
2729
@@ -30,7 +32,8 @@ The ``realtime_tools::RealtimePublisher`` allows users that write C++ ros2_contr
3032
{
3133
...
3234
// Publish controller state
33-
state_publisher_->lock();
34-
state_publisher_->msg_ = some_msg;
35-
state_publisher_->unlockAndPublish();
35+
some_msg_.header.stamp = get_node()->now();
36+
// Fill in the rest of the message
37+
....
38+
state_publisher_->try_publish(some_msg_);
3639
}

realtime_tools/include/realtime_tools/realtime_publisher.hpp

Lines changed: 81 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -44,6 +44,7 @@
4444
#include <mutex>
4545
#include <string>
4646
#include <thread>
47+
#include <utility>
4748

4849
#include "rclcpp/publisher.hpp"
4950

@@ -62,7 +63,8 @@ class RealtimePublisher
6263

6364
RCLCPP_SMART_PTR_DEFINITIONS(RealtimePublisher<MessageT>)
6465

65-
/// The msg_ variable contains the data that will get published on the ROS topic.
66+
[[deprecated(
67+
"This variable is deprecated, it is recommended to use the try_publish() method instead.")]]
6668
MessageT msg_;
6769

6870
/**
@@ -127,33 +129,66 @@ class RealtimePublisher
127129
*
128130
* \return true if the lock was successfully acquired, false otherwise
129131
*/
132+
[[deprecated(
133+
"Use try_publish() method instead of this method. This method may be removed in future "
134+
"versions.")]]
130135
bool trylock()
131136
{
132-
if (turn_.load(std::memory_order_acquire) == State::REALTIME && msg_mutex_.try_lock()) {
137+
return turn_.load(std::memory_order_acquire) == State::REALTIME && msg_mutex_.try_lock();
138+
}
139+
140+
/**
141+
* \brief Check if the realtime publisher is in a state to publish messages
142+
* \return true if the publisher is in a state to publish messages
143+
* \note The msg_ variable can be safely accessed if this function returns true
144+
*/
145+
bool can_publish() const
146+
{
147+
std::unique_lock<std::mutex> lock(msg_mutex_, std::try_to_lock);
148+
return can_publish(lock);
149+
}
150+
151+
/**
152+
* \brief Try to publish the given message
153+
*
154+
* This method attempts to publish the given message if the publisher is in a state to do so.
155+
* It uses a try_lock to avoid blocking if the mutex is already held by another thread.
156+
*
157+
* \param [in] msg The message to publish
158+
* \return true if the message was successfully published, false otherwise
159+
*/
160+
bool try_publish(const MessageT & msg)
161+
{
162+
std::unique_lock<std::mutex> lock(msg_mutex_, std::try_to_lock);
163+
if (can_publish(lock)) {
164+
{
165+
std::unique_lock<std::mutex> scoped_lock(std::move(lock));
166+
msg_ = msg;
167+
turn_.store(State::NON_REALTIME, std::memory_order_release);
168+
}
169+
updated_cond_.notify_one(); // Notify the publishing thread
133170
return true;
134-
} else {
135-
return false;
136171
}
172+
return false;
137173
}
138174

139175
/**
140-
* \brief Try to get the data lock from realtime and publish the given message
176+
* \brief Try to publish the given message (deprecated)
177+
* \deprecated This method is deprecated and should be replaced with try_publish()
141178
*
142-
* Tries to gain unique access to msg_ variable. If this succeeds
143-
* update the msg_ variable and call unlockAndPublish
179+
* This method is deprecated and should be replaced with try_publish().
180+
* It attempts to publish the given message if the publisher is in a state to do so.
181+
* It uses a try_lock to avoid blocking if the mutex is already held by another thread.
144182
*
145183
* \param [in] msg The message to publish
146-
* \return false in case no lock for the realtime variable is acquired. This implies the message will not be published.
184+
* \return true if the message was successfully published, false otherwise
147185
*/
186+
[[deprecated(
187+
"Use try_publish() method instead of this method. This method may be removed in future "
188+
"versions.")]]
148189
bool tryPublish(const MessageT & msg)
149190
{
150-
if (!trylock()) {
151-
return false;
152-
}
153-
154-
msg_ = msg;
155-
unlockAndPublish();
156-
return true;
191+
return try_publish(msg);
157192
}
158193

159194
/**
@@ -163,6 +198,9 @@ class RealtimePublisher
163198
* variable, the lock has to be released for the message to get
164199
* published on the specified topic.
165200
*/
201+
[[deprecated(
202+
"Use the try_publish() method to publish the message instead of using this method. This method "
203+
"may be removed in future versions.")]]
166204
void unlockAndPublish()
167205
{
168206
turn_.store(State::NON_REALTIME, std::memory_order_release);
@@ -175,12 +213,21 @@ class RealtimePublisher
175213
* This blocking call acquires exclusive access to the msg_ variable.
176214
* Use trylock() for non-blocking attempts to acquire the lock.
177215
*/
178-
void lock() { msg_mutex_.lock(); }
216+
[[deprecated(
217+
"Use the try_publish() method to publish the message instead of using this method. This method "
218+
"may be removed in future versions.")]]
219+
void lock()
220+
{
221+
msg_mutex_.lock();
222+
}
179223

180224
/**
181225
* \brief Unlocks the data without publishing anything
182226
*
183227
*/
228+
[[deprecated(
229+
"Use the try_publish() method to publish the message instead of using this method. This method "
230+
"may be removed in future versions.")]]
184231
void unlock()
185232
{
186233
msg_mutex_.unlock();
@@ -191,7 +238,24 @@ class RealtimePublisher
191238

192239
const std::thread & get_thread() const { return thread_; }
193240

241+
const MessageT & get_msg() const { return msg_; }
242+
243+
std::mutex & get_mutex() { return msg_mutex_; }
244+
245+
const std::mutex & get_mutex() const { return msg_mutex_; }
246+
194247
private:
248+
/**
249+
* \brief Check if the realtime publisher is in a state to publish messages
250+
* \param lock A unique_lock that is already acquired on the msg_mutex_
251+
* \return true if the publisher is in a state to publish messages
252+
* \note The msg_ variable can be safely accessed if this function returns true
253+
*/
254+
bool can_publish(std::unique_lock<std::mutex> & lock) const
255+
{
256+
return turn_.load(std::memory_order_acquire) == State::REALTIME && lock.owns_lock();
257+
}
258+
195259
// non-copyable
196260
RealtimePublisher(const RealtimePublisher &) = delete;
197261
RealtimePublisher & operator=(const RealtimePublisher &) = delete;
@@ -238,7 +302,7 @@ class RealtimePublisher
238302

239303
std::thread thread_;
240304

241-
std::mutex msg_mutex_; // Protects msg_
305+
mutable std::mutex msg_mutex_; // Protects msg_
242306
std::condition_variable updated_cond_;
243307

244308
enum class State : int { REALTIME, NON_REALTIME, LOOP_NOT_STARTED };

realtime_tools/test/realtime_publisher_tests.cpp

Lines changed: 42 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -130,3 +130,45 @@ TEST(RealtimePublisher, rt_try_publish)
130130
EXPECT_STREQ(expected_msg, str_callback.msg_.string_value.c_str());
131131
rclcpp::shutdown();
132132
}
133+
134+
TEST(RealtimePublisher, rt_can_try_publish)
135+
{
136+
rclcpp::init(0, nullptr);
137+
const size_t ATTEMPTS = 10;
138+
const std::chrono::milliseconds DELAY(250);
139+
140+
const char * expected_msg = "Hello World";
141+
auto node = std::make_shared<rclcpp::Node>("construct_move_destruct");
142+
rclcpp::QoS qos(10);
143+
qos.reliable().transient_local();
144+
auto pub = node->create_publisher<StringMsg>("~/rt_publish", qos);
145+
RealtimePublisher<StringMsg> rt_pub(pub);
146+
ASSERT_TRUE(rt_pub.can_publish());
147+
148+
// try publish a latched message
149+
bool publish_success = false;
150+
for (std::size_t i = 0; i < ATTEMPTS; ++i) {
151+
StringMsg msg;
152+
msg.string_value = expected_msg;
153+
154+
if (rt_pub.can_publish()) {
155+
ASSERT_TRUE(rt_pub.try_publish(msg));
156+
publish_success = true;
157+
}
158+
std::this_thread::sleep_for(DELAY);
159+
}
160+
ASSERT_TRUE(publish_success);
161+
162+
// make sure subscriber gets it
163+
StringCallback str_callback;
164+
165+
auto sub = node->create_subscription<StringMsg>(
166+
"~/rt_publish", qos,
167+
std::bind(&StringCallback::callback, &str_callback, std::placeholders::_1));
168+
for (size_t i = 0; i < ATTEMPTS && str_callback.msg_.string_value.empty(); ++i) {
169+
rclcpp::spin_some(node);
170+
std::this_thread::sleep_for(DELAY);
171+
}
172+
EXPECT_STREQ(expected_msg, str_callback.msg_.string_value.c_str());
173+
rclcpp::shutdown();
174+
}

0 commit comments

Comments
 (0)