Skip to content

Commit bb43fa6

Browse files
Generate callbacks after updating message_ (#274) (#353)
* Generate callbacks after updating message_ Signed-off-by: ymd-stella <[email protected]> * Add test for multithread TransformListener to test_message_filter Signed-off-by: ymd-stella <[email protected]> Co-authored-by: ymd-stella <[email protected]>
1 parent aa09426 commit bb43fa6

File tree

2 files changed

+59
-51
lines changed

2 files changed

+59
-51
lines changed

test_tf2/test/test_message_filter.cpp

Lines changed: 41 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -179,6 +179,47 @@ TEST(MessageFilter, postTransforms)
179179

180180
EXPECT_EQ(1, n.count_);
181181
}
182+
183+
TEST(MessageFilter, concurrentTransforms)
184+
{
185+
const int messages = 30;
186+
const int buffer_size = messages;
187+
auto node = rclcpp::Node::make_shared("tf2_ros_message_filter");
188+
auto create_timer_interface = std::make_shared<tf2_ros::CreateTimerROS>(
189+
node->get_node_base_interface(),
190+
node->get_node_timers_interface());
191+
192+
rclcpp::Clock::SharedPtr clock = std::make_shared<rclcpp::Clock>(RCL_SYSTEM_TIME);
193+
194+
builtin_interfaces::msg::Time stamp = tf2_ros::toMsg(tf2::timeFromSec(1));
195+
196+
std::shared_ptr<geometry_msgs::msg::PointStamped> msg = std::make_shared<geometry_msgs::msg::PointStamped>();
197+
msg->header.stamp = stamp;
198+
msg->header.frame_id = "frame2";
199+
200+
tf2_ros::Buffer buffer(clock);
201+
for (int i = 0; i < 50; i++) {
202+
buffer.setCreateTimerInterface(create_timer_interface);
203+
tf2_ros::MessageFilter<geometry_msgs::msg::PointStamped> filter(buffer, "frame1", buffer_size, node);
204+
Notification n(1);
205+
filter.registerCallback(std::bind(&Notification::notify, &n, std::placeholders::_1));
206+
207+
std::thread t([&](){
208+
std::this_thread::sleep_for(std::chrono::milliseconds(1));
209+
buffer.setTransform(createTransform(tf2::Quaternion(0,0,0,1), tf2::Vector3(1,2,3), stamp, "frame1", "frame2"), "me");
210+
});
211+
for (int j = 0; j < messages; j++) {
212+
filter.add(msg);
213+
}
214+
t.join();
215+
std::this_thread::sleep_for(std::chrono::milliseconds(10));
216+
217+
EXPECT_EQ(messages, n.count_);
218+
219+
buffer.clear();
220+
}
221+
}
222+
182223
// TODO (ahcorde): For some unknown reason message_filters::Connection registerFailureCallback is disable
183224
// with #if 0 https://github.com/ros2/geometry2/blob/ros2/tf2_ros/include/tf2_ros/message_filter.h#L463
184225
// rework this part when this is available

tf2_ros/include/tf2_ros/message_filter.h

Lines changed: 18 additions & 51 deletions
Original file line numberDiff line numberDiff line change
@@ -338,6 +338,7 @@ class MessageFilter : public MessageFilterBase, public message_filters::SimpleFi
338338
return;
339339
}
340340

341+
std::vector<std::tuple<uint64_t, tf2::TimePoint, std::string>> wait_params;
341342
// iterate through the target frames and add requests for each of them
342343
MessageInfo info;
343344
info.handles.reserve(expected_success_count_);
@@ -353,63 +354,17 @@ class MessageFilter : public MessageFilterBase, public message_filters::SimpleFi
353354
V_string::iterator end = target_frames_copy.end();
354355
for (; it != end; ++it) {
355356
const std::string & target_frame = *it;
356-
auto future = buffer_.waitForTransform(
357-
target_frame,
358-
frame_id,
359-
tf2::timeFromSec(stamp.seconds()),
360-
buffer_timeout_,
361-
std::bind(&MessageFilter::transformReadyCallback, this, std::placeholders::_1, next_handle_index_));
362-
363-
try {
364-
const auto status = future.wait_for(std::chrono::seconds(0));
365-
if (status == std::future_status::ready) {
366-
future.get();
367-
// Transform is available
368-
++info.success_count;
369-
}
370-
else {
371-
info.handles.push_back(next_handle_index_++);
372-
}
373-
}
374-
catch (const std::exception & e) {
375-
TF2_ROS_MESSAGEFILTER_WARN("Message dropped because: %s", e.what());
376-
messageDropped(evt, filter_failure_reasons::OutTheBack);
377-
return;
378-
}
357+
wait_params.emplace_back(next_handle_index_, tf2::timeFromSec(stamp.seconds()), target_frame);
358+
info.handles.push_back(next_handle_index_++);
379359

380360
if (time_tolerance_.nanoseconds()) {
381-
future = buffer_.waitForTransform(
382-
target_frame,
383-
frame_id,
384-
tf2::timeFromSec((stamp + time_tolerance_).seconds()),
385-
buffer_timeout_,
386-
std::bind(&MessageFilter::transformReadyCallback, this, std::placeholders::_1, next_handle_index_));
387-
try {
388-
const auto status = future.wait_for(std::chrono::seconds(0));
389-
if (status == std::future_status::ready) {
390-
future.get();
391-
// Transform is available
392-
++info.success_count;
393-
}
394-
else {
395-
info.handles.push_back(next_handle_index_++);
396-
}
397-
}
398-
catch (const std::exception & e) {
399-
TF2_ROS_MESSAGEFILTER_WARN("Message dropped because: %s", e.what());
400-
messageDropped(evt, filter_failure_reasons::OutTheBack);
401-
return;
402-
}
361+
wait_params.emplace_back(next_handle_index_, tf2::timeFromSec((stamp + time_tolerance_).seconds()), target_frame);
362+
info.handles.push_back(next_handle_index_++);
403363
}
404364
}
405365
}
406366

407-
408-
// We can transform already
409-
if (info.success_count == expected_success_count_) {
410-
messageReady(evt);
411-
} else {
412-
367+
{
413368
// Keep a lock on the messages
414369
std::unique_lock<std::mutex> unique_lock(messages_mutex_);
415370

@@ -439,6 +394,18 @@ class MessageFilter : public MessageFilterBase, public message_filters::SimpleFi
439394
TF2_ROS_MESSAGEFILTER_DEBUG("Added message in frame %s at time %.3f, count now %d",
440395
frame_id.c_str(), stamp.seconds(), message_count_);
441396
++incoming_message_count_;
397+
398+
for (const auto& param : wait_params) {
399+
const auto& handle = std::get<0>(param);
400+
const auto& stamp = std::get<1>(param);
401+
const auto& target_frame = std::get<2>(param);
402+
buffer_.waitForTransform(
403+
target_frame,
404+
frame_id,
405+
stamp,
406+
buffer_timeout_,
407+
std::bind(&MessageFilter::transformReadyCallback, this, std::placeholders::_1, handle));
408+
}
442409
}
443410

444411
/**

0 commit comments

Comments
 (0)