Skip to content

Commit e0d5aa6

Browse files
nebraszkaprybicki
andauthored
Improve ROS2 node handling with new Ros2Node structure (#254)
- Add a new Ros2Node structure that checks if rclcpp::ok() and acquires Ros2InitGuard - Implement format validation for pointclouds in Ros2PublishPointsNode to ensure they meet the required specifications - Catch pending exceptions in rgl_cleanup - Introduce tests to verify the mentioned changes --------- Co-authored-by: Piotr Rybicki <[email protected]>
1 parent 501695f commit e0d5aa6

File tree

9 files changed

+217
-41
lines changed

9 files changed

+217
-41
lines changed

extensions/ros2/src/graph/NodesRos2.hpp

Lines changed: 40 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -27,7 +27,34 @@
2727
#include <Ros2InitGuard.hpp>
2828
#include <radar_msgs/msg/radar_scan.hpp>
2929

30-
struct Ros2PublishPointsNode : IPointsNodeSingleInput
30+
struct Ros2Node : IPointsNodeSingleInput
31+
{
32+
Ros2Node() { ros2InitGuard = Ros2InitGuard::acquire(); }
33+
34+
void enqueueExecImpl() final
35+
{
36+
if (!rclcpp::ok()) {
37+
throw InvalidPipeline("Unable to execute Ros2Node because ROS2 has been shut down.");
38+
}
39+
ros2EnqueueExecImpl();
40+
}
41+
42+
void validateImpl() final
43+
{
44+
IPointsNodeSingleInput::validateImpl();
45+
ros2ValidateImpl();
46+
}
47+
48+
virtual ~Ros2Node() = default;
49+
50+
protected:
51+
std::shared_ptr<Ros2InitGuard> ros2InitGuard;
52+
53+
virtual void ros2EnqueueExecImpl() = 0;
54+
virtual void ros2ValidateImpl() = 0;
55+
};
56+
57+
struct Ros2PublishPointsNode : Ros2Node
3158
{
3259
using Ptr = std::shared_ptr<Ros2PublishPointsNode>;
3360

@@ -36,39 +63,37 @@ struct Ros2PublishPointsNode : IPointsNodeSingleInput
3663
rgl_qos_policy_durability_t qosDurability = QOS_POLICY_DURABILITY_SYSTEM_DEFAULT,
3764
rgl_qos_policy_history_t qosHistory = QOS_POLICY_HISTORY_SYSTEM_DEFAULT, int32_t qosHistoryDepth = 10);
3865

39-
// Node
40-
void validateImpl() override;
41-
void enqueueExecImpl() override;
66+
// Ros2Node
67+
void ros2ValidateImpl() override;
68+
void ros2EnqueueExecImpl() override;
4269

4370
~Ros2PublishPointsNode() override = default;
4471

4572
private:
4673
DeviceAsyncArray<char>::Ptr inputFmtData = DeviceAsyncArray<char>::create(arrayMgr);
4774

48-
std::shared_ptr<Ros2InitGuard> ros2InitGuard;
4975
rclcpp::Publisher<sensor_msgs::msg::PointCloud2>::SharedPtr ros2Publisher;
5076
sensor_msgs::msg::PointCloud2 ros2Message;
5177

5278
void updateRos2Message(const std::vector<rgl_field_t>& fields, bool isDense);
5379
};
5480

5581

56-
struct Ros2PublishPointVelocityMarkersNode : IPointsNodeSingleInput
82+
struct Ros2PublishPointVelocityMarkersNode : Ros2Node
5783
{
5884
using Ptr = std::shared_ptr<Ros2PublishPointVelocityMarkersNode>;
5985

6086
void setParameters(const char* topicName, const char* frameId, rgl_field_t velocityField);
6187
std::vector<rgl_field_t> getRequiredFieldList() const override { return {XYZ_VEC3_F32, velocityField}; }
6288

63-
// Node
64-
void validateImpl() override;
65-
void enqueueExecImpl() override;
89+
// Ros2Node
90+
void ros2ValidateImpl() override;
91+
void ros2EnqueueExecImpl() override;
6692

6793
~Ros2PublishPointVelocityMarkersNode() override = default;
6894

6995
private:
7096
std::string frameId;
71-
std::shared_ptr<Ros2InitGuard> ros2InitGuard;
7297
rclcpp::Publisher<visualization_msgs::msg::Marker>::SharedPtr linesPublisher;
7398
HostPinnedArray<Vec3f>::Ptr pos = HostPinnedArray<Vec3f>::create();
7499
HostPinnedArray<Vec3f>::Ptr vel = HostPinnedArray<Vec3f>::create();
@@ -78,21 +103,22 @@ struct Ros2PublishPointVelocityMarkersNode : IPointsNodeSingleInput
78103
const visualization_msgs::msg::Marker& makeLinesMarker();
79104
};
80105

81-
struct Ros2PublishRadarScanNode : IPointsNodeSingleInput
106+
struct Ros2PublishRadarScanNode : Ros2Node
82107
{
83108
void setParameters(const char* topicName, const char* frameId, rgl_qos_policy_reliability_t qosReliability,
84109
rgl_qos_policy_durability_t qosDurability, rgl_qos_policy_history_t qosHistory, int32_t qosHistoryDepth);
85110
std::vector<rgl_field_t> getRequiredFieldList() const override
86111
{
87112
return {DISTANCE_F32, AZIMUTH_F32, ELEVATION_F32, RADIAL_SPEED_F32, /* placeholder for amplitude */ PADDING_32};
88113
}
89-
void validateImpl() override;
90-
void enqueueExecImpl() override;
114+
115+
// Ros2Node
116+
void ros2ValidateImpl() override;
117+
void ros2EnqueueExecImpl() override;
91118

92119
private:
93120
radar_msgs::msg::RadarScan ros2Message;
94121
rclcpp::Publisher<radar_msgs::msg::RadarScan>::SharedPtr ros2Publisher;
95122
DeviceAsyncArray<char>::Ptr formattedData = DeviceAsyncArray<char>::create(arrayMgr);
96-
std::shared_ptr<Ros2InitGuard> ros2InitGuard;
97123
GPUFieldDescBuilder fieldDescBuilder;
98124
};

extensions/ros2/src/graph/Ros2PublishPointVelocityMarkersNode.cpp

Lines changed: 7 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -24,28 +24,26 @@ void Ros2PublishPointVelocityMarkersNode::setParameters(const char* topicName, c
2424
throw InvalidAPIArgument(msg);
2525
}
2626
this->frameId = frameId;
27-
ros2InitGuard = Ros2InitGuard::acquire();
2827
auto qos = rclcpp::QoS(10); // Use system default QoS
2928
linesPublisher = ros2InitGuard->createUniquePublisher<visualization_msgs::msg::Marker>(topicName, qos);
3029
this->velocityField = velocityField;
3130
}
3231

33-
void Ros2PublishPointVelocityMarkersNode::validateImpl()
32+
void Ros2PublishPointVelocityMarkersNode::ros2ValidateImpl()
3433
{
35-
IPointsNodeSingleInput::validateImpl();
3634
if (!input->isDense()) {
3735
throw InvalidPipeline(fmt::format("{} requires a compacted point cloud (dense)", getName()));
3836
}
37+
38+
if (!input->hasField(RGL_FIELD_DYNAMIC_FORMAT)) {
39+
auto msg = fmt::format("{} requires a formatted point cloud", getName());
40+
throw InvalidPipeline(msg);
41+
}
3942
}
40-
void Ros2PublishPointVelocityMarkersNode::enqueueExecImpl()
43+
void Ros2PublishPointVelocityMarkersNode::ros2EnqueueExecImpl()
4144
{
4245
pos->copyFrom(input->getFieldData(RGL_FIELD_XYZ_VEC3_F32));
4346
vel->copyFrom(input->getFieldData(velocityField));
44-
45-
if (!rclcpp::ok()) {
46-
// TODO: This should be handled by the Graph.
47-
throw std::runtime_error("Unable to publish a message because ROS2 has been shut down.");
48-
}
4947
linesPublisher->publish(makeLinesMarker());
5048
}
5149

extensions/ros2/src/graph/Ros2PublishPointsNode.cpp

Lines changed: 8 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -21,27 +21,29 @@ void Ros2PublishPointsNode::setParameters(const char* topicName, const char* fra
2121
rgl_qos_policy_durability_t qosDurability, rgl_qos_policy_history_t qosHistory,
2222
int32_t qosHistoryDepth)
2323
{
24-
ros2InitGuard = Ros2InitGuard::acquire();
25-
2624
ros2Message.header.frame_id = frameId;
27-
2825
auto qos = rclcpp::QoS(qosHistoryDepth);
2926
qos.reliability(static_cast<rmw_qos_reliability_policy_t>(qosReliability));
3027
qos.durability(static_cast<rmw_qos_durability_policy_t>(qosDurability));
3128
qos.history(static_cast<rmw_qos_history_policy_t>(qosHistory));
3229
ros2Publisher = ros2InitGuard->createUniquePublisher<sensor_msgs::msg::PointCloud2>(topicName, qos);
3330
}
3431

35-
void Ros2PublishPointsNode::validateImpl()
32+
void Ros2PublishPointsNode::ros2ValidateImpl()
3633
{
37-
IPointsNodeSingleInput::validateImpl();
3834
if (input->getHeight() != 1) {
3935
throw InvalidPipeline("ROS2 publish supports unorganized pointclouds only");
4036
}
37+
38+
if (!input->hasField(RGL_FIELD_DYNAMIC_FORMAT)) {
39+
auto msg = fmt::format("{} requires a formatted point cloud", getName());
40+
throw InvalidPipeline(msg);
41+
}
42+
4143
updateRos2Message(input->getRequiredFieldList(), input->isDense());
4244
}
4345

44-
void Ros2PublishPointsNode::enqueueExecImpl()
46+
void Ros2PublishPointsNode::ros2EnqueueExecImpl()
4547
{
4648
auto fieldData = input->getFieldData(RGL_FIELD_DYNAMIC_FORMAT)->asTyped<char>()->asSubclass<HostArray>();
4749
int count = input->getPointCount();
@@ -57,10 +59,6 @@ void Ros2PublishPointsNode::enqueueExecImpl()
5759
ros2Message.header.stamp = Scene::instance().getTime().has_value() ?
5860
Scene::instance().getTime()->asRos2Msg() :
5961
static_cast<builtin_interfaces::msg::Time>(ros2InitGuard->getNode().get_clock()->now());
60-
if (!rclcpp::ok()) {
61-
// TODO: This should be handled by the Graph.
62-
throw std::runtime_error("Unable to publish a message because ROS2 has been shut down.");
63-
}
6462
ros2Publisher->publish(ros2Message);
6563
}
6664

extensions/ros2/src/graph/Ros2PublishRadarScanNode.cpp

Lines changed: 2 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -20,26 +20,22 @@ void Ros2PublishRadarScanNode::setParameters(const char* topicName, const char*
2020
rgl_qos_policy_durability_t qosDurability, rgl_qos_policy_history_t qosHistory,
2121
int32_t qosHistoryDepth)
2222
{
23-
ros2InitGuard = Ros2InitGuard::acquire();
24-
2523
ros2Message.header.frame_id = frameId;
26-
2724
auto qos = rclcpp::QoS(qosHistoryDepth);
2825
qos.reliability(static_cast<rmw_qos_reliability_policy_t>(qosReliability));
2926
qos.durability(static_cast<rmw_qos_durability_policy_t>(qosDurability));
3027
qos.history(static_cast<rmw_qos_history_policy_t>(qosHistory));
3128
ros2Publisher = ros2InitGuard->createUniquePublisher<radar_msgs::msg::RadarScan>(topicName, qos);
3229
}
3330

34-
void Ros2PublishRadarScanNode::validateImpl()
31+
void Ros2PublishRadarScanNode::ros2ValidateImpl()
3532
{
36-
IPointsNodeSingleInput::validateImpl();
3733
if (input->getHeight() != 1) {
3834
throw InvalidPipeline("ROS2 radar publish supports unorganized pointclouds only");
3935
}
4036
}
4137

42-
void Ros2PublishRadarScanNode::enqueueExecImpl()
38+
void Ros2PublishRadarScanNode::ros2EnqueueExecImpl()
4339
{
4440
ros2Message.header.stamp = Scene::instance().getTime().has_value() ?
4541
Scene::instance().getTime().value().asRos2Msg() :

src/api/apiCore.cpp

Lines changed: 8 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -140,7 +140,14 @@ RGL_API rgl_status_t rgl_cleanup(void)
140140
while (!Node::instances.empty()) {
141141
auto node = Node::instances.begin()->second;
142142
if (node->hasGraphRunCtx()) {
143-
node->getGraphRunCtx()->detachAndDestroy();
143+
try {
144+
// This iterates over all nodes and may trigger pending exceptions
145+
node->getGraphRunCtx()->detachAndDestroy();
146+
}
147+
catch (std::exception& e) {
148+
RGL_WARN("rgl_cleanup: caught pending exception in Node {}: {}", node->getName(), e.what());
149+
continue; // Some node has thrown exception so this graph has not been detached, try again
150+
}
144151
}
145152
auto connectedNodes = node->disconnectConnectedNodes();
146153
for (auto&& nodeToRelease : connectedNodes) {

test/CMakeLists.txt

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -51,7 +51,9 @@ set(RGL_TEST_FILES
5151

5252
if (RGL_BUILD_ROS2_EXTENSION)
5353
list(APPEND RGL_TEST_FILES
54+
src/graph/Ros2NodesTest.cpp
5455
src/graph/nodes/Ros2PublishRadarScanNodeTest.cpp
56+
src/graph/nodes/Ros2PublishPointsNodeTest.cpp
5557
src/scene/rcsAngleDistributionTest.cpp
5658
)
5759
endif()

test/src/graph/Ros2NodesTest.cpp

Lines changed: 44 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,44 @@
1+
#include <helpers/commonHelpers.hpp>
2+
#include <helpers/graphHelpers.hpp>
3+
#include <helpers/testPointCloud.hpp>
4+
5+
#include <rgl/api/extensions/ros2.h>
6+
7+
#include <rclcpp/rclcpp.hpp>
8+
9+
class Ros2NodesTest : public RGLTest
10+
{};
11+
12+
TEST_F(Ros2NodesTest, should_throw_invalid_pipeline_when_ros2_shutdown)
13+
{
14+
std::vector<rgl_field_t> fields{XYZ_VEC3_F32, DISTANCE_F32, AZIMUTH_F32, ELEVATION_F32, RADIAL_SPEED_F32};
15+
TestPointCloud pointCloud(fields, 10);
16+
rgl_node_t points = pointCloud.createUsePointsNode();
17+
18+
rgl_node_t format = nullptr;
19+
EXPECT_RGL_SUCCESS(rgl_node_points_format(&format, fields.data(), fields.size()));
20+
21+
rgl_node_t ros2pub = nullptr;
22+
EXPECT_RGL_SUCCESS(rgl_node_points_ros2_publish(&ros2pub, "pointcloud", "rglFrame"));
23+
24+
rgl_node_t ros2pubWithQos = nullptr;
25+
rgl_qos_policy_reliability_t qos_r = QOS_POLICY_RELIABILITY_BEST_EFFORT;
26+
rgl_qos_policy_durability_t qos_d = QOS_POLICY_DURABILITY_VOLATILE;
27+
rgl_qos_policy_history_t qos_h = QOS_POLICY_HISTORY_KEEP_LAST;
28+
EXPECT_RGL_SUCCESS(
29+
rgl_node_points_ros2_publish_with_qos(&ros2pubWithQos, "pointcloud_ex", "rglFrame", qos_r, qos_d, qos_h, 10));
30+
31+
rgl_node_t ros2radarscan = nullptr;
32+
EXPECT_RGL_SUCCESS(rgl_node_publish_ros2_radarscan(&ros2radarscan, "radarscan", "rglFrame", qos_r, qos_d, qos_h, 10));
33+
34+
EXPECT_RGL_SUCCESS(rgl_graph_node_add_child(points, format));
35+
EXPECT_RGL_SUCCESS(rgl_graph_node_add_child(format, ros2pub));
36+
EXPECT_RGL_SUCCESS(rgl_graph_node_add_child(format, ros2pubWithQos));
37+
EXPECT_RGL_SUCCESS(rgl_graph_node_add_child(points, ros2radarscan));
38+
39+
EXPECT_RGL_SUCCESS(rgl_graph_run(points));
40+
41+
rclcpp::shutdown();
42+
43+
EXPECT_RGL_INVALID_PIPELINE(rgl_graph_run(points), "Unable to execute Ros2Node because ROS2 has been shut down.");
44+
}
Lines changed: 85 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,85 @@
1+
#include <helpers/commonHelpers.hpp>
2+
#include <helpers/graphHelpers.hpp>
3+
#include <helpers/testPointCloud.hpp>
4+
5+
#include <rgl/api/extensions/ros2.h>
6+
7+
#include <rclcpp/rclcpp.hpp>
8+
9+
class Ros2PublishPointsNodeTest : public RGLTest
10+
{};
11+
12+
TEST_F(Ros2PublishPointsNodeTest, should_throw_invalid_pipeline_when_ros2_shutdown)
13+
{
14+
std::vector<rgl_field_t> fields{XYZ_VEC3_F32};
15+
TestPointCloud pointCloud(fields, 10);
16+
rgl_node_t points = pointCloud.createUsePointsNode();
17+
18+
rgl_node_t format = nullptr;
19+
EXPECT_RGL_SUCCESS(rgl_node_points_format(&format, fields.data(), fields.size()));
20+
21+
rgl_node_t ros2pub = nullptr;
22+
23+
EXPECT_RGL_SUCCESS(rgl_node_points_ros2_publish(&ros2pub, "pointcloud", "rglFrame"));
24+
25+
EXPECT_RGL_SUCCESS(rgl_graph_node_add_child(points, format));
26+
EXPECT_RGL_SUCCESS(rgl_graph_node_add_child(format, ros2pub));
27+
28+
EXPECT_RGL_SUCCESS(rgl_graph_run(points));
29+
30+
rclcpp::shutdown();
31+
EXPECT_RGL_INVALID_PIPELINE(rgl_graph_run(points), "Unable to execute Ros2Node because ROS2 has been shut down.");
32+
}
33+
34+
TEST_F(Ros2PublishPointsNodeTest, should_throw_invalid_pipeline_when_ros2_shutdown_with_qos)
35+
{
36+
std::vector<rgl_field_t> fields{XYZ_VEC3_F32};
37+
TestPointCloud pointCloud(fields, 10);
38+
rgl_node_t points = pointCloud.createUsePointsNode();
39+
40+
rgl_node_t format = nullptr;
41+
EXPECT_RGL_SUCCESS(rgl_node_points_format(&format, fields.data(), fields.size()));
42+
43+
rgl_node_t ros2pubWithQos = nullptr;
44+
rgl_qos_policy_reliability_t qos_r = QOS_POLICY_RELIABILITY_BEST_EFFORT;
45+
rgl_qos_policy_durability_t qos_d = QOS_POLICY_DURABILITY_VOLATILE;
46+
rgl_qos_policy_history_t qos_h = QOS_POLICY_HISTORY_KEEP_LAST;
47+
48+
EXPECT_RGL_SUCCESS(
49+
rgl_node_points_ros2_publish_with_qos(&ros2pubWithQos, "pointcloud_ex", "rglFrame", qos_r, qos_d, qos_h, 10));
50+
51+
EXPECT_RGL_SUCCESS(rgl_graph_node_add_child(points, format));
52+
EXPECT_RGL_SUCCESS(rgl_graph_node_add_child(format, ros2pubWithQos));
53+
54+
EXPECT_RGL_SUCCESS(rgl_graph_run(points));
55+
56+
rclcpp::shutdown();
57+
EXPECT_RGL_INVALID_PIPELINE(rgl_graph_run(points), "Unable to execute Ros2Node because ROS2 has been shut down.");
58+
}
59+
60+
61+
TEST_F(Ros2PublishPointsNodeTest, should_throw_invalid_pipeline_when_no_formatted_point_cloud)
62+
{
63+
std::vector<rgl_field_t> fields{XYZ_VEC3_F32};
64+
TestPointCloud pointCloud(fields, 10);
65+
rgl_node_t points = pointCloud.createUsePointsNode();
66+
67+
rgl_node_t ros2pub = nullptr;
68+
EXPECT_RGL_SUCCESS(rgl_node_points_ros2_publish(&ros2pub, "pointcloud", "rglFrame"));
69+
70+
EXPECT_RGL_SUCCESS(rgl_graph_node_add_child(points, ros2pub));
71+
72+
EXPECT_RGL_INVALID_PIPELINE(rgl_graph_run(ros2pub), "requires a formatted point cloud");
73+
74+
rgl_node_t ros2pubWithQos = nullptr;
75+
rgl_qos_policy_reliability_t qos_r = QOS_POLICY_RELIABILITY_BEST_EFFORT;
76+
rgl_qos_policy_durability_t qos_d = QOS_POLICY_DURABILITY_VOLATILE;
77+
rgl_qos_policy_history_t qos_h = QOS_POLICY_HISTORY_KEEP_LAST;
78+
79+
EXPECT_RGL_SUCCESS(
80+
rgl_node_points_ros2_publish_with_qos(&ros2pubWithQos, "pointcloud_ex", "rglFrame", qos_r, qos_d, qos_h, 10));
81+
82+
EXPECT_RGL_SUCCESS(rgl_graph_node_add_child(points, ros2pubWithQos));
83+
84+
EXPECT_RGL_INVALID_PIPELINE(rgl_graph_run(ros2pubWithQos), "requires a formatted point cloud");
85+
}

0 commit comments

Comments
 (0)