From cc484b6fe7cefda2791985c140260b6aae466e97 Mon Sep 17 00:00:00 2001 From: Chuck-Ellison Date: Fri, 15 Aug 2025 17:12:13 -0500 Subject: [PATCH] Create pointcloud2 subscriber --- .gitignore | 1 + CMakeLists.txt | 11 + README.md | 1 + include/web_video_server/image_streamer.hpp | 45 +- include/web_video_server/jpeg_streamers.hpp | 4 +- include/web_video_server/libav_streamer.hpp | 2 +- include/web_video_server/png_streamers.hpp | 4 +- .../ros_compressed_streamer.hpp | 8 +- .../image_transport_subscriber.hpp | 36 + .../subscribers/pointcloud2_subscriber.hpp | 80 ++ .../subscribers/ros_subscriber.hpp | 49 ++ include/web_video_server/web_video_server.hpp | 5 +- log/COLCON_IGNORE | 0 package.xml | 5 + src/image_streamer.cpp | 107 +-- src/jpeg_streamers.cpp | 8 +- src/libav_streamer.cpp | 4 +- src/png_streamers.cpp | 10 +- src/ros_compressed_streamer.cpp | 17 +- .../image_transport_subscriber.cpp | 61 ++ src/subscribers/pointcloud2_subscriber.cpp | 746 ++++++++++++++++++ src/subscribers/ros_subscriber.cpp | 51 ++ src/vp8_streamer.cpp | 1 + src/vp9_streamer.cpp | 1 + src/web_video_server.cpp | 131 ++- 25 files changed, 1242 insertions(+), 146 deletions(-) create mode 100644 include/web_video_server/subscribers/image_transport_subscriber.hpp create mode 100644 include/web_video_server/subscribers/pointcloud2_subscriber.hpp create mode 100644 include/web_video_server/subscribers/ros_subscriber.hpp create mode 100644 log/COLCON_IGNORE create mode 100644 src/subscribers/image_transport_subscriber.cpp create mode 100644 src/subscribers/pointcloud2_subscriber.cpp create mode 100644 src/subscribers/ros_subscriber.cpp diff --git a/.gitignore b/.gitignore index 86b18f5..931a9bb 100644 --- a/.gitignore +++ b/.gitignore @@ -1,2 +1,3 @@ .cproject .project +.vscode/ diff --git a/CMakeLists.txt b/CMakeLists.txt index 766b4a2..98b4a9a 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -14,6 +14,11 @@ find_package(rclcpp REQUIRED) find_package(rclcpp_components REQUIRED) find_package(sensor_msgs REQUIRED) +find_package(tf2 REQUIRED) +find_package(tf2_ros REQUIRED) +find_package(tf2_sensor_msgs REQUIRED) +find_package(tf2_geometry_msgs REQUIRED) + find_package(OpenCV REQUIRED) find_package(Boost REQUIRED COMPONENTS system) @@ -60,8 +65,14 @@ add_library(${PROJECT_NAME} SHARED src/jpeg_streamers.cpp src/png_streamers.cpp src/utils.cpp + + src/subscribers/ros_subscriber.cpp + src/subscribers/pointcloud2_subscriber.cpp + src/subscribers/image_transport_subscriber.cpp ) +ament_target_dependencies(${PROJECT_NAME} rclcpp tf2 tf2_ros tf2_sensor_msgs tf2_geometry_msgs) + ## Specify libraries to link a library or executable target against target_link_libraries(${PROJECT_NAME} async_web_server_cpp::async_web_server_cpp diff --git a/README.md b/README.md index 2738767..bb40d4b 100644 --- a/README.md +++ b/README.md @@ -15,6 +15,7 @@ This node provides HTTP streaming of ROS image topics in various formats, making - Web interface to browse available image topics - Single image snapshot capability - Support for different QoS profiles in ROS 2 +- Support for PointCloud2 topics (converted to depth images) ## Installation diff --git a/include/web_video_server/image_streamer.hpp b/include/web_video_server/image_streamer.hpp index 0e001c5..6df3b05 100644 --- a/include/web_video_server/image_streamer.hpp +++ b/include/web_video_server/image_streamer.hpp @@ -37,12 +37,13 @@ #include #include "rclcpp/rclcpp.hpp" -#include "image_transport/image_transport.hpp" -#include "image_transport/transport_hints.hpp" -#include "web_video_server/utils.hpp" + #include "async_web_server_cpp/http_server.hpp" #include "async_web_server_cpp/http_request.hpp" +#include "web_video_server/subscribers/pointcloud2_subscriber.hpp" +#include "web_video_server/subscribers/image_transport_subscriber.hpp" + namespace web_video_server { @@ -54,9 +55,11 @@ class ImageStreamer async_web_server_cpp::HttpConnectionPtr connection, rclcpp::Node::SharedPtr node); - virtual void start() = 0; + virtual void start(); virtual ~ImageStreamer(); + std::map > subscriber_types_; + bool isInactive() { return inactive_; @@ -65,7 +68,7 @@ class ImageStreamer /** * Restreams the last received image frame if older than max_age. */ - virtual void restreamFrame(std::chrono::duration max_age) = 0; + virtual void restreamFrame(std::chrono::duration max_age); std::string getTopic() { @@ -76,44 +79,24 @@ class ImageStreamer async_web_server_cpp::HttpConnectionPtr connection_; async_web_server_cpp::HttpRequest request_; rclcpp::Node::SharedPtr node_; - bool inactive_; - image_transport::Subscriber image_sub_; - std::string topic_; -}; - - -class ImageTransportImageStreamer : public ImageStreamer -{ -public: - ImageTransportImageStreamer( - const async_web_server_cpp::HttpRequest & request, - async_web_server_cpp::HttpConnectionPtr connection, - rclcpp::Node::SharedPtr node); - virtual ~ImageTransportImageStreamer(); + std::shared_ptr subscriber_; - virtual void start(); - -protected: virtual cv::Mat decodeImage(const sensor_msgs::msg::Image::ConstSharedPtr & msg); - virtual void sendImage(const cv::Mat &, const std::chrono::steady_clock::time_point & time) = 0; - virtual void restreamFrame(std::chrono::duration max_age); + virtual void sendImage(const cv::Mat &, const std::chrono::steady_clock::time_point &time) = 0; virtual void initialize(const cv::Mat &); - image_transport::Subscriber image_sub_; + std::string topic_; + bool inactive_; int output_width_; int output_height_; bool invert_; - std::string default_transport_; - std::string qos_profile_name_; + bool initialized_; std::chrono::steady_clock::time_point last_frame_; - cv::Mat output_size_image; + cv::Mat output_size_image_; std::mutex send_mutex_; private: - image_transport::ImageTransport it_; - bool initialized_; - void imageCallback(const sensor_msgs::msg::Image::ConstSharedPtr & msg); }; diff --git a/include/web_video_server/jpeg_streamers.hpp b/include/web_video_server/jpeg_streamers.hpp index 364357f..d837140 100644 --- a/include/web_video_server/jpeg_streamers.hpp +++ b/include/web_video_server/jpeg_streamers.hpp @@ -42,7 +42,7 @@ namespace web_video_server { -class MjpegStreamer : public ImageTransportImageStreamer +class MjpegStreamer : public ImageStreamer { public: MjpegStreamer( @@ -69,7 +69,7 @@ class MjpegStreamerType : public ImageStreamerType std::string create_viewer(const async_web_server_cpp::HttpRequest & request); }; -class JpegSnapshotStreamer : public ImageTransportImageStreamer +class JpegSnapshotStreamer : public ImageStreamer { public: JpegSnapshotStreamer( diff --git a/include/web_video_server/libav_streamer.hpp b/include/web_video_server/libav_streamer.hpp index 12914bb..4cabb47 100644 --- a/include/web_video_server/libav_streamer.hpp +++ b/include/web_video_server/libav_streamer.hpp @@ -54,7 +54,7 @@ extern "C" namespace web_video_server { -class LibavStreamer : public ImageTransportImageStreamer +class LibavStreamer : public ImageStreamer { public: LibavStreamer( diff --git a/include/web_video_server/png_streamers.hpp b/include/web_video_server/png_streamers.hpp index c261fe6..488b9f2 100644 --- a/include/web_video_server/png_streamers.hpp +++ b/include/web_video_server/png_streamers.hpp @@ -41,7 +41,7 @@ namespace web_video_server { -class PngStreamer : public ImageTransportImageStreamer +class PngStreamer : public ImageStreamer { public: PngStreamer( @@ -69,7 +69,7 @@ class PngStreamerType : public ImageStreamerType std::string create_viewer(const async_web_server_cpp::HttpRequest & request); }; -class PngSnapshotStreamer : public ImageTransportImageStreamer +class PngSnapshotStreamer : public ImageStreamer { public: PngSnapshotStreamer( diff --git a/include/web_video_server/ros_compressed_streamer.hpp b/include/web_video_server/ros_compressed_streamer.hpp index 9fc51bd..44ad91d 100644 --- a/include/web_video_server/ros_compressed_streamer.hpp +++ b/include/web_video_server/ros_compressed_streamer.hpp @@ -54,17 +54,16 @@ class RosCompressedStreamer : public ImageStreamer virtual void restreamFrame(std::chrono::duration max_age); protected: - virtual void sendImage( + virtual void sendImage(const cv::Mat &, const std::chrono::steady_clock::time_point & time); + virtual void sendCompressedImage( const sensor_msgs::msg::CompressedImage::ConstSharedPtr msg, const std::chrono::steady_clock::time_point & time); private: - void imageCallback(const sensor_msgs::msg::CompressedImage::ConstSharedPtr msg); + void compressedImageCallback(const sensor_msgs::msg::CompressedImage::ConstSharedPtr msg); MultipartStream stream_; rclcpp::Subscription::SharedPtr image_sub_; - std::chrono::steady_clock::time_point last_frame_; sensor_msgs::msg::CompressedImage::ConstSharedPtr last_msg; - std::mutex send_mutex_; std::string qos_profile_name_; }; @@ -75,6 +74,7 @@ class RosCompressedStreamerType : public ImageStreamerType const async_web_server_cpp::HttpRequest & request, async_web_server_cpp::HttpConnectionPtr connection, rclcpp::Node::SharedPtr node); + std::string create_viewer(const async_web_server_cpp::HttpRequest & request); }; diff --git a/include/web_video_server/subscribers/image_transport_subscriber.hpp b/include/web_video_server/subscribers/image_transport_subscriber.hpp new file mode 100644 index 0000000..b4ceb66 --- /dev/null +++ b/include/web_video_server/subscribers/image_transport_subscriber.hpp @@ -0,0 +1,36 @@ + +#pragma once + +#include +#include + +#include + +namespace web_video_server +{ + +class ImageTransportSubscriber : public RosSubscriber +{ + public: + ImageTransportSubscriber(rclcpp::Node::SharedPtr node); + + ~ImageTransportSubscriber(); + + virtual void subscribe(const async_web_server_cpp::HttpRequest &request, + const std::string& topic, + const ImageCallback& callback); + + void subscriberCallback(const sensor_msgs::msg::Image::ConstSharedPtr &input_msg); + + private: + image_transport::Subscriber ros_sub_; + +}; + +class ImageTransportSubscriberType : public SubscriberType +{ + public: + std::shared_ptr create_subscriber(rclcpp::Node::SharedPtr node); +}; + +} //web_video_server diff --git a/include/web_video_server/subscribers/pointcloud2_subscriber.hpp b/include/web_video_server/subscribers/pointcloud2_subscriber.hpp new file mode 100644 index 0000000..dc81acf --- /dev/null +++ b/include/web_video_server/subscribers/pointcloud2_subscriber.hpp @@ -0,0 +1,80 @@ + +#pragma once + +#include +#include +#include +#include + +#include +#include +#include +#include +#include + +#include "web_video_server/subscribers/ros_subscriber.hpp" + +#ifdef CV_BRIDGE_USES_OLD_HEADERS +#include +#else +#include +#endif + + +namespace web_video_server +{ + +class PointCloud2Subscriber : public RosSubscriber +{ + public: + PointCloud2Subscriber(rclcpp::Node::SharedPtr node); + + ~PointCloud2Subscriber(); + + virtual void subscribe(const async_web_server_cpp::HttpRequest &request, + const std::string& topic, + const ImageCallback& callback); + + void subscriberCallback(const sensor_msgs::msg::PointCloud2::ConstSharedPtr &input_msg); + + static bool compareFieldsOffset(sensor_msgs::msg::PointField& field1, sensor_msgs::msg::PointField& field2); + static inline int sizeOfPointField(int datatype); + + bool FindFields(const sensor_msgs::msg::PointCloud2::ConstSharedPtr &input_msg, sensor_msgs::msg::PointField &userField, + sensor_msgs::msg::PointField &xField, sensor_msgs::msg::PointField &yField, sensor_msgs::msg::PointField &zField); + + sensor_msgs::msg::PointCloud2 TransformFrame(const sensor_msgs::msg::PointCloud2::ConstSharedPtr &input_msg, std::string frame_id); + void GatherCameraInfo(cv::Mat &intrinsic_matrix, cv::Mat &distortion_coefficients); + bool CreateUserImage(const std_msgs::msg::Header &cloud_header, const sensor_msgs::msg::PointField &userField, cv_bridge::CvImage &userImage); + bool CreateDepthImage(const std_msgs::msg::Header &cloud_header, cv_bridge::CvImage& depthImage); + std::vector ProjectPoints(const sensor_msgs::msg::PointCloud2 &output_cloud, + const sensor_msgs::msg::PointField &xField, const sensor_msgs::msg::PointField &yField, const sensor_msgs::msg::PointField &zField, + const cv::Mat &intrinsic_matrix, const cv::Mat &distortion_coefficients, std::vector &obj_pts); + cv_bridge::CvImage NormalizeImage(const cv_bridge::CvImage &inputImage); + cv_bridge::CvImage ConvertToColor(const cv::Mat &depthMask, const cv_bridge::CvImage &inputImage); + + private: + rclcpp::Subscription::SharedPtr ros_sub_; + + std::unique_ptr tf_listener_{nullptr}; + std::unique_ptr tf_buffer_; + + geometry_msgs::msg::TransformStamped transform_optical_; + + std::string frame_id_; + double wait_for_tf_delay_; + std::string field_; + int height_, width_, pixel_size_; + double focal_length_; + bool normalize_; + bool colorize_; + rclcpp::CallbackGroup::SharedPtr cbg_sub_; +}; + +class PointCloud2SubscriberType : public SubscriberType +{ + public: + std::shared_ptr create_subscriber(rclcpp::Node::SharedPtr node); +}; + +} //web_video_server diff --git a/include/web_video_server/subscribers/ros_subscriber.hpp b/include/web_video_server/subscribers/ros_subscriber.hpp new file mode 100644 index 0000000..3dcf816 --- /dev/null +++ b/include/web_video_server/subscribers/ros_subscriber.hpp @@ -0,0 +1,49 @@ + +#pragma once + +#include +#include + +#include +#include +#include +#include + +#include "web_video_server/utils.hpp" + +namespace web_video_server +{ + typedef std::function ImageCallback; + +class RosSubscriber +{ + public: + + RosSubscriber(rclcpp::Node::SharedPtr node); + + ~RosSubscriber(); + + virtual void subscribe(const async_web_server_cpp::HttpRequest &request, + const std::string& topic, + const ImageCallback& callback); + + virtual void subscriberCallback(const sensor_msgs::msg::Image::ConstSharedPtr &input_msg); + + std::mutex subscriber_mutex_; + + protected: + rclcpp::Node::SharedPtr node_; + std::string qos_profile_name_; + + ImageCallback callback_; + + rclcpp::Subscription::SharedPtr ros_sub_; +}; + +class SubscriberType +{ + public: + virtual std::shared_ptr create_subscriber(rclcpp::Node::SharedPtr node) = 0; +}; + +} // web_video_server diff --git a/include/web_video_server/web_video_server.hpp b/include/web_video_server/web_video_server.hpp index 6e9bd95..4cf776b 100644 --- a/include/web_video_server/web_video_server.hpp +++ b/include/web_video_server/web_video_server.hpp @@ -96,6 +96,7 @@ class WebVideoServer : public rclcpp::Node private: void restreamFrames(std::chrono::duration max_age); void cleanup_inactive_streams(); + void initializeHttpServer(int server_threads); rclcpp::TimerBase::SharedPtr cleanup_timer_; @@ -110,9 +111,9 @@ class WebVideoServer : public rclcpp::Node std::shared_ptr server_; async_web_server_cpp::HttpRequestHandlerGroup handler_group_; - std::vector> image_subscribers_; + std::vector> image_streamers_; std::map> stream_types_; - std::mutex subscriber_mutex_; + std::mutex streamer_mutex_; }; } // namespace web_video_server diff --git a/log/COLCON_IGNORE b/log/COLCON_IGNORE new file mode 100644 index 0000000..e69de29 diff --git a/package.xml b/package.xml index f85aa7b..bce1d02 100644 --- a/package.xml +++ b/package.xml @@ -36,6 +36,11 @@ sensor_msgs boost + tf2 + tf2_ros + tf2_sensor_msgs + tf2_geometry_msgs + ament_lint_auto ament_cmake_copyright ament_cmake_cpplint diff --git a/src/image_streamer.cpp b/src/image_streamer.cpp index 564e1dc..c50e2ff 100644 --- a/src/image_streamer.cpp +++ b/src/image_streamer.cpp @@ -44,34 +44,36 @@ namespace web_video_server ImageStreamer::ImageStreamer( const async_web_server_cpp::HttpRequest & request, async_web_server_cpp::HttpConnectionPtr connection, rclcpp::Node::SharedPtr node) -: connection_(connection), request_(request), node_(node), inactive_(false) +: connection_(connection), request_(request), node_(node), inactive_(false), initialized_(false) { + subscriber_types_["image_transport"] = std::shared_ptr(new ImageTransportSubscriberType()); + subscriber_types_["pointcloud2"] = std::shared_ptr(new PointCloud2SubscriberType()); + topic_ = request.get_query_param_value_or_default("topic", ""); -} - -ImageStreamer::~ImageStreamer() -{ -} -ImageTransportImageStreamer::ImageTransportImageStreamer( - const async_web_server_cpp::HttpRequest & request, - async_web_server_cpp::HttpConnectionPtr connection, rclcpp::Node::SharedPtr node) -: ImageStreamer(request, connection, node), it_(node), initialized_(false) -{ output_width_ = request.get_query_param_value_or_default("width", -1); output_height_ = request.get_query_param_value_or_default("height", -1); invert_ = request.has_query_param("invert"); - default_transport_ = request.get_query_param_value_or_default("default_transport", "raw"); - qos_profile_name_ = request.get_query_param_value_or_default("qos_profile", "default"); } -ImageTransportImageStreamer::~ImageTransportImageStreamer() +ImageStreamer::~ImageStreamer() { + std::scoped_lock lock(send_mutex_); // protects sendImage. + + // Clear the subscriber callback to prevent use-after-free + if (subscriber_) { + // Set callback to empty function to avoid calling destroyed ImageStreamer + subscriber_->subscribe( + request_, topic_, + [](const sensor_msgs::msg::Image::ConstSharedPtr&) { + // Empty callback - do nothing + } + ); + } } -void ImageTransportImageStreamer::start() +void ImageStreamer::start() { - image_transport::TransportHints hints(node_.get(), default_transport_); auto tnat = node_->get_topic_names_and_types(); inactive_ = true; for (auto topic_and_types : tnat) { @@ -80,37 +82,34 @@ void ImageTransportImageStreamer::start() continue; } auto & topic_name = topic_and_types.first; + auto & topic_type = topic_and_types.second[0]; if (topic_name == topic_ || (topic_name.find("/") == 0 && topic_name.substr(1) == topic_)) { inactive_ = false; + + if (topic_type == "sensor_msgs/msg/Image") { + subscriber_ = subscriber_types_["image_transport"]->create_subscriber(node_); + subscriber_->subscribe(request_, topic_, + std::bind(&ImageStreamer::imageCallback, this, std::placeholders::_1)); + } + else if (topic_type == "sensor_msgs/msg/PointCloud2") { + subscriber_ = subscriber_types_["pointcloud2"]->create_subscriber(node_); + subscriber_->subscribe(request_, topic_, + std::bind(&ImageStreamer::imageCallback, this, std::placeholders::_1)); + } + break; } } - // Get QoS profile from query parameter - RCLCPP_INFO( - node_->get_logger(), "Streaming topic %s with QoS profile %s", topic_.c_str(), - qos_profile_name_.c_str()); - auto qos_profile = get_qos_profile_from_name(qos_profile_name_); - if (!qos_profile) { - qos_profile = rmw_qos_profile_default; - RCLCPP_ERROR( - node_->get_logger(), - "Invalid QoS profile %s specified. Using default profile.", - qos_profile_name_.c_str()); - } + - // Create subscriber - image_sub_ = image_transport::create_subscription( - node_.get(), topic_, - std::bind(&ImageTransportImageStreamer::imageCallback, this, std::placeholders::_1), - default_transport_, qos_profile.value()); } -void ImageTransportImageStreamer::initialize(const cv::Mat &) +void ImageStreamer::initialize(const cv::Mat &) { } -void ImageTransportImageStreamer::restreamFrame(std::chrono::duration max_age) +void ImageStreamer::restreamFrame(std::chrono::duration max_age) { if (inactive_ || !initialized_) { return; @@ -119,7 +118,7 @@ void ImageTransportImageStreamer::restreamFrame(std::chrono::duration ma if (last_frame_ + max_age < std::chrono::steady_clock::now()) { std::scoped_lock lock(send_mutex_); // don't update last_frame, it may remain an old value. - sendImage(output_size_image, std::chrono::steady_clock::now()); + sendImage(output_size_image_, std::chrono::steady_clock::now()); } } catch (boost::system::system_error & e) { // happens when client disconnects @@ -139,15 +138,31 @@ void ImageTransportImageStreamer::restreamFrame(std::chrono::duration ma } } -cv::Mat ImageTransportImageStreamer::decodeImage( +cv::Mat ImageStreamer::decodeImage( const sensor_msgs::msg::Image::ConstSharedPtr & msg) { - if (msg->encoding.find("F") != std::string::npos) { + // Handle grayscale formats + if (msg->encoding == sensor_msgs::image_encodings::TYPE_8UC1|| + msg->encoding == sensor_msgs::image_encodings::TYPE_8SC1|| + msg->encoding == sensor_msgs::image_encodings::TYPE_16UC1|| + msg->encoding == sensor_msgs::image_encodings::TYPE_16SC1) { + RCLCPP_DEBUG(node_->get_logger(), "Greyscale format: %s", msg->encoding.c_str()); + cv::Mat image_bridge = cv_bridge::toCvCopy(msg, msg->encoding.c_str())->image; + cv::Mat normalized_image; + cv::normalize(image_bridge, normalized_image, 0, 255, cv::NORM_MINMAX); + + return normalized_image; + } + // Handle floating point images + else if (msg->encoding.find("F") != std::string::npos) { + RCLCPP_DEBUG(node_->get_logger(), "Floating point format: %s", msg->encoding.c_str()); // scale floating point images cv::Mat float_image_bridge = cv_bridge::toCvCopy(msg, msg->encoding)->image; cv::Mat_ float_image = float_image_bridge; + cv::Mat nonInfMask = (float_image < std::numeric_limits::max()); double max_val; - cv::minMaxIdx(float_image, 0, &max_val); + int minLoc, maxLoc; + cv::minMaxIdx(float_image, 0, &max_val, &minLoc, &maxLoc, nonInfMask); if (max_val > 0) { float_image *= (255 / max_val); @@ -155,11 +170,12 @@ cv::Mat ImageTransportImageStreamer::decodeImage( return float_image; } else { // Convert to OpenCV native BGR color + auto & clk = *node_->get_clock(); return cv_bridge::toCvCopy(msg, "bgr8")->image; } } -void ImageTransportImageStreamer::imageCallback(const sensor_msgs::msg::Image::ConstSharedPtr & msg) +void ImageStreamer::imageCallback(const sensor_msgs::msg::Image::ConstSharedPtr & msg) { if (inactive_) { return; @@ -168,6 +184,7 @@ void ImageTransportImageStreamer::imageCallback(const sensor_msgs::msg::Image::C cv::Mat img; try { img = decodeImage(msg); + int input_width = img.cols; int input_height = img.rows; @@ -184,23 +201,23 @@ void ImageTransportImageStreamer::imageCallback(const sensor_msgs::msg::Image::C cv::flip(img, img, true); } - std::scoped_lock lock(send_mutex_); // protects output_size_image + std::scoped_lock lock(send_mutex_); // protects output_size_image_ if (output_width_ != input_width || output_height_ != input_height) { cv::Mat img_resized; cv::Size new_size(output_width_, output_height_); cv::resize(img, img_resized, new_size); - output_size_image = img_resized; + output_size_image_ = img_resized; } else { - output_size_image = img; + output_size_image_ = img; } if (!initialized_) { - initialize(output_size_image); + initialize(output_size_image_); initialized_ = true; } last_frame_ = std::chrono::steady_clock::now(); - sendImage(output_size_image, last_frame_); + sendImage(output_size_image_, last_frame_); } catch (cv_bridge::Exception & e) { auto & clk = *node_->get_clock(); RCLCPP_ERROR_THROTTLE(node_->get_logger(), clk, 40, "cv_bridge exception: %s", e.what()); diff --git a/src/jpeg_streamers.cpp b/src/jpeg_streamers.cpp index bfc6c62..919ea56 100644 --- a/src/jpeg_streamers.cpp +++ b/src/jpeg_streamers.cpp @@ -37,7 +37,7 @@ namespace web_video_server MjpegStreamer::MjpegStreamer( const async_web_server_cpp::HttpRequest & request, async_web_server_cpp::HttpConnectionPtr connection, rclcpp::Node::SharedPtr node) -: ImageTransportImageStreamer(request, connection, node), +: ImageStreamer(request, connection, node), stream_(connection) { quality_ = request.get_query_param_value_or_default("quality", 95); @@ -46,8 +46,8 @@ MjpegStreamer::MjpegStreamer( MjpegStreamer::~MjpegStreamer() { + std::scoped_lock lock(send_mutex_); this->inactive_ = true; - std::scoped_lock lock(send_mutex_); // protects sendImage. } void MjpegStreamer::sendImage( @@ -85,15 +85,15 @@ JpegSnapshotStreamer::JpegSnapshotStreamer( const async_web_server_cpp::HttpRequest & request, async_web_server_cpp::HttpConnectionPtr connection, rclcpp::Node::SharedPtr node) -: ImageTransportImageStreamer(request, connection, node) +: ImageStreamer(request, connection, node) { quality_ = request.get_query_param_value_or_default("quality", 95); } JpegSnapshotStreamer::~JpegSnapshotStreamer() { + std::scoped_lock lock(send_mutex_); this->inactive_ = true; - std::scoped_lock lock(send_mutex_); // protects sendImage. } void JpegSnapshotStreamer::sendImage( diff --git a/src/libav_streamer.cpp b/src/libav_streamer.cpp index 62f293f..815787a 100644 --- a/src/libav_streamer.cpp +++ b/src/libav_streamer.cpp @@ -43,7 +43,7 @@ LibavStreamer::LibavStreamer( async_web_server_cpp::HttpConnectionPtr connection, rclcpp::Node::SharedPtr node, const std::string & format_name, const std::string & codec_name, const std::string & content_type) -: ImageTransportImageStreamer(request, connection, node), format_context_(0), codec_(0), +: ImageStreamer(request, connection, node), format_context_(0), codec_(0), codec_context_(0), video_stream_(0), opt_(0), frame_(0), sws_context_(0), first_image_received_(false), first_image_time_(), format_name_(format_name), codec_name_(codec_name), content_type_(content_type), io_buffer_(0) @@ -56,6 +56,8 @@ LibavStreamer::LibavStreamer( LibavStreamer::~LibavStreamer() { + std::scoped_lock lock(send_mutex_); + if (codec_context_) { avcodec_free_context(&codec_context_); } diff --git a/src/png_streamers.cpp b/src/png_streamers.cpp index c3c9327..cb23969 100644 --- a/src/png_streamers.cpp +++ b/src/png_streamers.cpp @@ -42,7 +42,7 @@ namespace web_video_server PngStreamer::PngStreamer( const async_web_server_cpp::HttpRequest & request, async_web_server_cpp::HttpConnectionPtr connection, rclcpp::Node::SharedPtr node) -: ImageTransportImageStreamer(request, connection, node), stream_(connection) +: ImageStreamer(request, connection, node), stream_(connection) { quality_ = request.get_query_param_value_or_default("quality", 3); stream_.sendInitialHeader(); @@ -61,7 +61,7 @@ cv::Mat PngStreamer::decodeImage(const sensor_msgs::msg::Image::ConstSharedPtr & return cv_bridge::toCvCopy(msg, "bgra8")->image; } else { // Use the normal decode otherwise - return ImageTransportImageStreamer::decodeImage(msg); + return ImageStreamer::decodeImage(msg); } } @@ -98,15 +98,15 @@ PngSnapshotStreamer::PngSnapshotStreamer( const async_web_server_cpp::HttpRequest & request, async_web_server_cpp::HttpConnectionPtr connection, rclcpp::Node::SharedPtr node) -: ImageTransportImageStreamer(request, connection, node) +: ImageStreamer(request, connection, node) { quality_ = request.get_query_param_value_or_default("quality", 3); } PngSnapshotStreamer::~PngSnapshotStreamer() { - this->inactive_ = true; std::scoped_lock lock(send_mutex_); // protects sendImage. + this->inactive_ = true; } cv::Mat PngSnapshotStreamer::decodeImage(const sensor_msgs::msg::Image::ConstSharedPtr & msg) @@ -116,7 +116,7 @@ cv::Mat PngSnapshotStreamer::decodeImage(const sensor_msgs::msg::Image::ConstSha return cv_bridge::toCvCopy(msg, "bgra8")->image; } else { // Use the normal decode otherwise - return ImageTransportImageStreamer::decodeImage(msg); + return ImageStreamer::decodeImage(msg); } } diff --git a/src/ros_compressed_streamer.cpp b/src/ros_compressed_streamer.cpp index 76c965d..2ec6bab 100644 --- a/src/ros_compressed_streamer.cpp +++ b/src/ros_compressed_streamer.cpp @@ -44,8 +44,8 @@ RosCompressedStreamer::RosCompressedStreamer( RosCompressedStreamer::~RosCompressedStreamer() { - this->inactive_ = true; std::scoped_lock lock(send_mutex_); // protects sendImage. + this->inactive_ = true; } void RosCompressedStreamer::start() @@ -71,7 +71,7 @@ void RosCompressedStreamer::start() qos_profile.value()); image_sub_ = node_->create_subscription( compressed_topic, qos, - std::bind(&RosCompressedStreamer::imageCallback, this, std::placeholders::_1)); + std::bind(&RosCompressedStreamer::compressedImageCallback, this, std::placeholders::_1)); } void RosCompressedStreamer::restreamFrame(std::chrono::duration max_age) @@ -83,11 +83,16 @@ void RosCompressedStreamer::restreamFrame(std::chrono::duration max_age) if (last_frame_ + max_age < std::chrono::steady_clock::now()) { std::scoped_lock lock(send_mutex_); // don't update last_frame, it may remain an old value. - sendImage(last_msg, std::chrono::steady_clock::now()); + sendCompressedImage(last_msg, std::chrono::steady_clock::now()); } } -void RosCompressedStreamer::sendImage( +void RosCompressedStreamer::sendImage(const cv::Mat & img, const std::chrono::steady_clock::time_point & time) +{ + /// sendImage is replaced by sendCompressedImage for this streamer +} + +void RosCompressedStreamer::sendCompressedImage( const sensor_msgs::msg::CompressedImage::ConstSharedPtr msg, const std::chrono::steady_clock::time_point & time) { @@ -126,13 +131,13 @@ void RosCompressedStreamer::sendImage( } -void RosCompressedStreamer::imageCallback( +void RosCompressedStreamer::compressedImageCallback( const sensor_msgs::msg::CompressedImage::ConstSharedPtr msg) { std::scoped_lock lock(send_mutex_); // protects last_msg and last_frame last_msg = msg; last_frame_ = std::chrono::steady_clock::now(); - sendImage(last_msg, last_frame_); + sendCompressedImage(last_msg, last_frame_); } diff --git a/src/subscribers/image_transport_subscriber.cpp b/src/subscribers/image_transport_subscriber.cpp new file mode 100644 index 0000000..8fcc640 --- /dev/null +++ b/src/subscribers/image_transport_subscriber.cpp @@ -0,0 +1,61 @@ +#include "web_video_server/subscribers/image_transport_subscriber.hpp" + +namespace web_video_server +{ + +ImageTransportSubscriber::ImageTransportSubscriber(rclcpp::Node::SharedPtr node) +: RosSubscriber(node) +{ +} + +ImageTransportSubscriber::~ImageTransportSubscriber() +{ + std::scoped_lock lock(subscriber_mutex_); +} + +void ImageTransportSubscriber::subscribe(const async_web_server_cpp::HttpRequest &request, + const std::string& topic, + const ImageCallback& callback) +{ + std::scoped_lock lock(subscriber_mutex_); + + callback_ = callback; + std::string transport = request.get_query_param_value_or_default("transport", "raw"); + qos_profile_name_ = request.get_query_param_value_or_default("qos_profile", "default"); + + // Get QoS profile from query parameter + RCLCPP_INFO( + node_->get_logger(), "Streaming topic %s with QoS profile %s", topic.c_str(), + qos_profile_name_.c_str()); + auto qos_profile = get_qos_profile_from_name(qos_profile_name_); + if (!qos_profile) { + qos_profile = rmw_qos_profile_default; + RCLCPP_ERROR( + node_->get_logger(), + "Invalid QoS profile %s specified. Using default profile.", + qos_profile_name_.c_str()); + } + + const auto qos = qos_profile.value(); + + ros_sub_ = image_transport::create_subscription(node_.get(), topic, + std::bind(&ImageTransportSubscriber::subscriberCallback, this, std::placeholders::_1), + transport, qos); +} + +void ImageTransportSubscriber::subscriberCallback(const sensor_msgs::msg::Image::ConstSharedPtr &input_msg) +{ + std::scoped_lock lock(subscriber_mutex_); + + callback_(input_msg); +} + + +std::shared_ptr ImageTransportSubscriberType::create_subscriber(rclcpp::Node::SharedPtr node) +{ + return std::shared_ptr( + new ImageTransportSubscriber(node)); +} + + +} \ No newline at end of file diff --git a/src/subscribers/pointcloud2_subscriber.cpp b/src/subscribers/pointcloud2_subscriber.cpp new file mode 100644 index 0000000..43052bf --- /dev/null +++ b/src/subscribers/pointcloud2_subscriber.cpp @@ -0,0 +1,746 @@ +#include "web_video_server/subscribers/pointcloud2_subscriber.hpp" + +namespace web_video_server +{ + +PointCloud2Subscriber::PointCloud2Subscriber(rclcpp::Node::SharedPtr node) +: RosSubscriber(node) +{ + std::scoped_lock lock(subscriber_mutex_); + RCLCPP_INFO_STREAM(node_->get_logger(),"Create PointCloud2 subscriber: "); + // Initialize our TF items + tf_buffer_ = std::make_unique(node->get_clock(), std::chrono::seconds(10)); + tf_listener_ = std::make_unique(*tf_buffer_, node, true); + + tf2::Quaternion q; + q.setRPY(M_PI / 2.0, - M_PI / 2.0, 0.0); + transform_optical_.transform.translation.x = 0.0; + transform_optical_.transform.translation.y = 0.0; + transform_optical_.transform.translation.z = 0.0; + transform_optical_.transform.rotation.x = q.x(); + transform_optical_.transform.rotation.y = q.y(); + transform_optical_.transform.rotation.z = q.z(); + transform_optical_.transform.rotation.w = q.w(); +} + +PointCloud2Subscriber::~PointCloud2Subscriber() +{ + std::scoped_lock lock(subscriber_mutex_); +} + +void PointCloud2Subscriber::subscribe(const async_web_server_cpp::HttpRequest &request, + const std::string& topic, + const ImageCallback& callback) +{ + std::scoped_lock lock(subscriber_mutex_); + callback_ = callback; + qos_profile_name_ = request.get_query_param_value_or_default("qos_profile", "default"); + + wait_for_tf_delay_ = node_->get_parameter("wait_for_tf_delay").as_double(); + wait_for_tf_delay_ = 0.10; + + std::string default_frame_id = node_->get_parameter("frame_id").as_string(); + frame_id_ = request.get_query_param_value_or_default("frame_id", default_frame_id); + + bool default_color = node_->get_parameter("colorize").as_bool(); + colorize_ = request.get_query_param_value_or_default("colorize", true); + + bool default_normalize = node_->get_parameter("normalize").as_bool(); + normalize_ = request.get_query_param_value_or_default("normalize", default_normalize); + + std::string default_field = node_->get_parameter("field").as_string(); + field_ = request.get_query_param_value_or_default("field", default_field); + + height_ = request.get_query_param_value_or_default("height", 600); + width_ = request.get_query_param_value_or_default("width", 800); + pixel_size_ = request.get_query_param_value_or_default("pixel_size", 5); + focal_length_ = request.get_query_param_value_or_default("focal_length", 300.0); + + RCLCPP_INFO( + node_->get_logger(), "Streaming topic %s with QoS profile %s", topic.c_str(), + qos_profile_name_.c_str()); + auto qos_profile = get_qos_profile_from_name(qos_profile_name_); + if (!qos_profile) { + qos_profile = rmw_qos_profile_default; + RCLCPP_ERROR( + node_->get_logger(), + "Invalid QoS profile %s specified. Using default profile.", + qos_profile_name_.c_str()); + } + + const auto qos = rclcpp::QoS( + rclcpp::QoSInitialization(qos_profile.value().history, 1), + qos_profile.value()); + + cbg_sub_ = node_->create_callback_group(rclcpp::CallbackGroupType::MutuallyExclusive); + rclcpp::SubscriptionOptions options; + options.callback_group = cbg_sub_; + ros_sub_ = node_->create_subscription(topic, qos, std::bind(&PointCloud2Subscriber::subscriberCallback, this, std::placeholders::_1), options); +} + +void PointCloud2Subscriber::subscriberCallback(const sensor_msgs::msg::PointCloud2::ConstSharedPtr &input_msg) +{ + RCLCPP_DEBUG_STREAM(node_->get_logger(),"Received PointCloud2: "); + std::scoped_lock lock(subscriber_mutex_); + + if (input_msg->data.size() == 0) + { + RCLCPP_WARN_STREAM_THROTTLE(node_->get_logger(), *node_->get_clock(), 1000, "No data in pointcloud!"); + return; + } + + // Start timer + auto beginTime = std::chrono::steady_clock::now(); + + //transform + sensor_msgs::msg::PointCloud2 output_cloud; + output_cloud = TransformFrame(input_msg, frame_id_); + + // Find relevant fields + sensor_msgs::msg::PointField xField, yField, zField, userField; + userField.name = field_; + if(!FindFields(input_msg, userField, xField, yField, zField)) return; + + // Setup camera_info + RCLCPP_DEBUG_STREAM(node_->get_logger()," Camera Info"); + cv::Mat intrinsic_matrix, distortion_coefficients; + GatherCameraInfo(intrinsic_matrix, distortion_coefficients); + + // Setup depth image + RCLCPP_DEBUG_STREAM(node_->get_logger()," Depth"); + cv_bridge::CvImage depthImage; + CreateDepthImage(output_cloud.header, depthImage); + + // Setup user image + RCLCPP_DEBUG_STREAM(node_->get_logger()," User-Datatype: " << +userField.datatype); + cv_bridge::CvImage userImage; + if(!CreateUserImage(output_cloud.header, userField, userImage)) return; + + // Project Points + RCLCPP_DEBUG_STREAM(node_->get_logger()," Project points"); + std::vector obj_pts; + std::vector img_pts = ProjectPoints(output_cloud, xField, yField, zField, intrinsic_matrix, distortion_coefficients, obj_pts); + + // Process projected points and render to image + cv::Mat depthMask = cv::Mat::zeros(height_, width_, CV_8UC1); + // Loop through points and fillout user and depth images + RCLCPP_DEBUG_STREAM(node_->get_logger()," Process projected points: " << img_pts.size()); + for (size_t i = 0; i < img_pts.size(); ++i) + { + int u = int(img_pts[i].x); + int v = int(img_pts[i].y); + + // Check if point is inside field of view and has valid projection + // Filter out rear points that are incorrectly projected into front view + // Keep points that are properly in front of sensor for current camera view + // RELAXED BOUNDS: Allow points slightly outside FOV to capture more lidar data + if ((u >= -pixel_size_) && (u < width_ + pixel_size_) && + (v >= -pixel_size_) && (v < height_ + pixel_size_) && + obj_pts[i].z > 0.1) // Filter out points behind/very close to sensor that cause incorrect projection + { + // update depth image + if(depthImage.image.at(v, u) > obj_pts[i].z) + { + // draw box around each pixel based on pixel_size param + int shift = pixel_size_ / 2; + int lowerIndex1 = v - shift; + if(lowerIndex1 < 0) lowerIndex1 = 0; + int upperIndex1 = v + shift; + if(upperIndex1 > height_ - 1) upperIndex1 = height_ - 1; + int lowerIndex2 = u - shift; + if(lowerIndex2 < 0) lowerIndex2 = 0; + int upperIndex2 = u + shift; + if(upperIndex2 > width_ - 1) upperIndex2 = width_ - 1; + for(int j = lowerIndex1; j <= upperIndex1; j++) + { + for(int k = lowerIndex2; k <= upperIndex2; k++) + { + // Update the depth image if point is closer to camera + depthImage.image.at(j, k) = obj_pts[i].z; + depthMask.at(j, k) = 255; // 255 = has data, 0 = no data + + // user image - write data for the closest point (depth buffer was already updated) + if( (userField.datatype == sensor_msgs::msg::PointField::UINT8 && userField.count == 4) || + (userField.datatype == sensor_msgs::msg::PointField::UINT32 && (userField.name == "rgb" || userField.name == "rgba")) || + (userField.datatype == sensor_msgs::msg::PointField::FLOAT32 && (userField.name == "rgb" || userField.name == "rgba")) + ) + { + // Handle RGB/RGBA fields - these are packed color data + uint8_t user1, user2, user3; + std::memcpy(&user1, &output_cloud.data[i * output_cloud.point_step + userField.offset + 0], sizeof(uint8_t)); + std::memcpy(&user2, &output_cloud.data[i * output_cloud.point_step + userField.offset + 1], sizeof(uint8_t)); + std::memcpy(&user3, &output_cloud.data[i * output_cloud.point_step + userField.offset + 2], sizeof(uint8_t)); + cv::Vec3b pixel; + pixel[0] = user1; + pixel[1] = user2; + pixel[2] = user3; + userImage.image.at(j, k) = pixel; + } + else if((userField.datatype == sensor_msgs::msg::PointField::UINT16 && userField.count == 4) || + (userField.datatype == sensor_msgs::msg::PointField::FLOAT64 && (userField.name == "rgb" || userField.name == "rgba")) + ) + { + uint16_t user1, user2, user3; + std::memcpy(&user1, &output_cloud.data[i * output_cloud.point_step + userField.offset + 0], sizeof(uint16_t)); + std::memcpy(&user2, &output_cloud.data[i * output_cloud.point_step + userField.offset + 2], sizeof(uint16_t)); + std::memcpy(&user3, &output_cloud.data[i * output_cloud.point_step + userField.offset + 4], sizeof(uint16_t)); + cv::Vec3w bgr_pixel; + bgr_pixel[0] = user1; + bgr_pixel[1] = user2; + bgr_pixel[2] = user3; + userImage.image.at(j, k) = bgr_pixel; + } + else if((userField.datatype == sensor_msgs::msg::PointField::UINT8)) + { + uint8_t value; + std::memcpy(&value, &output_cloud.data[i * output_cloud.point_step + userField.offset],sizeof(uint8_t)); + userImage.image.at(j, k) = value; + } + else if((userField.datatype == sensor_msgs::msg::PointField::INT8)) + { + int8_t value; + std::memcpy(&value, &output_cloud.data[i * output_cloud.point_step + userField.offset],sizeof(int8_t)); + userImage.image.at(j, k) = value; + } + else if((userField.datatype == sensor_msgs::msg::PointField::UINT16)) + { + uint16_t value; + std::memcpy(&value, &output_cloud.data[i * output_cloud.point_step + userField.offset],sizeof(uint16_t)); + userImage.image.at(j, k) = value; + } + else if((userField.datatype == sensor_msgs::msg::PointField::INT16)) + { + int16_t value; + std::memcpy(&value, &output_cloud.data[i * output_cloud.point_step + userField.offset],sizeof(int16_t)); + userImage.image.at(j, k) = value; + } + else if((userField.datatype == sensor_msgs::msg::PointField::FLOAT32)) + { + float value; + std::memcpy(&value, &output_cloud.data[i * output_cloud.point_step + userField.offset],sizeof(float)); + userImage.image.at(j, k) = value; + } + else if((userField.datatype == sensor_msgs::msg::PointField::FLOAT64)) + { + double value; + std::memcpy(&value, &output_cloud.data[i * output_cloud.point_step + userField.offset],sizeof(double)); + userImage.image.at(j, k) = value; + } + } + } + } + } + } + + // Normalize + cv_bridge::CvImage normalized_image; + if (field_ == "depth") { + normalized_image = depthImage; + } else if (normalize_) { + normalized_image = NormalizeImage(userImage); + } else { + normalized_image = userImage; + } + + // Convert to color + cv_bridge::CvImage colorImage; + if (colorize_) { + colorImage = ConvertToColor(depthMask, normalized_image); + } else { + colorImage = normalized_image; + } + + // Performance timing + auto endTime = std::chrono::steady_clock::now(); + auto totalTime = endTime - beginTime; + auto timeMS = std::chrono::duration_cast(totalTime); + RCLCPP_DEBUG_STREAM(node_->get_logger(), "Processing time: " << timeMS.count() << "ms"); + sensor_msgs::msg::Image output_msg; + colorImage.toImageMsg(output_msg); + sensor_msgs::msg::Image::ConstSharedPtr output_ptr = std::make_shared(output_msg); + callback_(output_ptr); + + return; +} + +bool PointCloud2Subscriber::compareFieldsOffset(sensor_msgs::msg::PointField& field1, sensor_msgs::msg::PointField& field2) +{ + return (field1.offset < field2.offset); +} + +inline int sizeOfPointField(int datatype) +{ + if ((datatype == sensor_msgs::msg::PointField::INT8) || (datatype == sensor_msgs::msg::PointField::UINT8)) + return 1; + else if ((datatype == sensor_msgs::msg::PointField::INT16) || (datatype == sensor_msgs::msg::PointField::UINT16)) + return 2; + else if ((datatype == sensor_msgs::msg::PointField::INT32) || (datatype == sensor_msgs::msg::PointField::UINT32) || + (datatype == sensor_msgs::msg::PointField::FLOAT32)) + return 4; + else if (datatype == sensor_msgs::msg::PointField::FLOAT64) + return 8; + else + { + std::stringstream err; + err << "PointField of type " << datatype << " does not exist"; + throw std::runtime_error(err.str()); + } + return -1; +} + +bool PointCloud2Subscriber::FindFields(const sensor_msgs::msg::PointCloud2::ConstSharedPtr &input_msg, sensor_msgs::msg::PointField &userField, + sensor_msgs::msg::PointField &xField, sensor_msgs::msg::PointField &yField, sensor_msgs::msg::PointField &zField) +{ + std::vector sortedFields(input_msg->fields); + std::sort(sortedFields.begin(), sortedFields.end(), PointCloud2Subscriber::compareFieldsOffset); + + // Find fields we need in the cloud + bool xFound = false, yFound = false, zFound = false; + bool userFound = false; + for (size_t i = 0; i < sortedFields.size(); i++) + { + sensor_msgs::msg::PointField currentField = sortedFields[i]; + + if(currentField.name == "x") + { + xFound = true; + xField = currentField; + } + else if(currentField.name == "y") + { + yFound = true; + yField = currentField; + } + else if(currentField.name == "z") + { + zFound = true; + zField = currentField; + } + if(currentField.name == userField.name) + { + userFound = true; + userField = currentField; + } + } + + //Check found fields + if(!xFound || !yFound || !zFound) + { + RCLCPP_WARN_STREAM(node_->get_logger(),"Cloud does not contain XYZ data!"); + return false; + } + else if(xField.datatype != 7 || yField.datatype != 7 || zField.datatype != 7) + { + RCLCPP_WARN_STREAM(node_->get_logger(),"X, Y, Z fields do not contain floats!"); + return false; + } + + return true; +} + +sensor_msgs::msg::PointCloud2 PointCloud2Subscriber::TransformFrame(const sensor_msgs::msg::PointCloud2::ConstSharedPtr &input_msg, std::string frame_id) +{ + RCLCPP_DEBUG_STREAM(node_->get_logger()," Transform"); + sensor_msgs::msg::PointCloud2 output_cloud; + geometry_msgs::msg::TransformStamped transform; + try + { + transform_optical_.header = transform.header; + if(frame_id_ == "") { + transform = transform_optical_; + } else { + if(tf_buffer_->canTransform(frame_id, input_msg->header.frame_id, input_msg->header.stamp, rclcpp::Duration::from_seconds(wait_for_tf_delay_))) + { + transform = tf_buffer_->lookupTransform(frame_id, input_msg->header.frame_id, input_msg->header.stamp); + + // Transform into a z-forward orientation of requested frame for opencv + tf2::doTransform(transform.transform, transform.transform, transform_optical_); + } + else + { + RCLCPP_WARN_STREAM(node_->get_logger(), " PointCloud2 subscriber is waiting for transform from " << input_msg->header.frame_id << " to " << frame_id << " to become available."); + transform = transform_optical_; + } + } + tf2::doTransform(*input_msg, output_cloud, transform); + } + catch (tf2::TransformException &ex) + { + RCLCPP_WARN_STREAM(node_->get_logger(), " PointCloud2 subscriber: " << ex.what()); + output_cloud = *input_msg; + } + + return output_cloud; +} + +void PointCloud2Subscriber::GatherCameraInfo(cv::Mat &intrinsic_matrix, cv::Mat &distortion_coefficients) +{ + // Setup camera_info + distortion_coefficients = cv::Mat::zeros(1, 5, CV_64F); + + // Intrinsic Matrix + intrinsic_matrix = cv::Mat::zeros(3, 3, CV_64F); + intrinsic_matrix.at(0,0) = focal_length_; + intrinsic_matrix.at(1,1) = focal_length_; + intrinsic_matrix.at(0,2) = width_/2; + intrinsic_matrix.at(1,2) = height_/2; + intrinsic_matrix.at(2,2) = 1.0; +} + +bool PointCloud2Subscriber::CreateUserImage(const std_msgs::msg::Header &cloud_header, const sensor_msgs::msg::PointField &userField, cv_bridge::CvImage& userImage) +{ + userImage.header = cloud_header; + if(frame_id_ != "") userImage.header.frame_id = frame_id_; + if((userField.datatype == sensor_msgs::msg::PointField::UINT8 && userField.count == 4) || + (userField.datatype == sensor_msgs::msg::PointField::UINT32 && (userField.name == "rgb" || userField.name == "rgba")) || + (userField.datatype == sensor_msgs::msg::PointField::FLOAT32 && (userField.name == "rgb" || userField.name == "rgba")) + ) + { + RCLCPP_DEBUG_STREAM(node_->get_logger()," 4 8-bit unsigned integers"); + userImage.encoding = sensor_msgs::image_encodings::TYPE_8UC3; + userImage.image = cv::Mat::zeros(height_, width_, CV_8UC3); + } + else if((userField.datatype == sensor_msgs::msg::PointField::UINT16 && userField.count == 4) || + (userField.datatype == sensor_msgs::msg::PointField::FLOAT64 && (userField.name == "rgb" || userField.name == "rgba")) + ) + { + RCLCPP_DEBUG_STREAM(node_->get_logger()," 4 16-bit unsigned integers"); + userImage.encoding = sensor_msgs::image_encodings::TYPE_16UC3; + userImage.image = cv::Mat::zeros(height_, width_, CV_16UC3); + } + else if((userField.datatype == sensor_msgs::msg::PointField::UINT8)) + { + RCLCPP_DEBUG_STREAM(node_->get_logger()," 1 8-bit unsigned integer"); + userImage.encoding = sensor_msgs::image_encodings::TYPE_8UC1; + userImage.image = cv::Mat::zeros(height_, width_, CV_8UC1); + } + else if((userField.datatype == sensor_msgs::msg::PointField::INT8)) + { + RCLCPP_DEBUG_STREAM(node_->get_logger()," 1 8-bit signed integer"); + userImage.encoding = sensor_msgs::image_encodings::TYPE_8SC1; + userImage.image = cv::Mat::zeros(height_, width_, CV_8SC1); + } + else if((userField.datatype == sensor_msgs::msg::PointField::UINT16)) + { + RCLCPP_DEBUG_STREAM(node_->get_logger()," 1 16-bit unsigned integer"); + userImage.encoding = sensor_msgs::image_encodings::TYPE_16UC1; + userImage.image = cv::Mat::zeros(height_, width_, CV_16UC1); + } + else if((userField.datatype == sensor_msgs::msg::PointField::INT16)) + { + RCLCPP_DEBUG_STREAM(node_->get_logger()," 1 16-bit signed integer"); + userImage.encoding = sensor_msgs::image_encodings::TYPE_16SC1; + userImage.image = cv::Mat::zeros(height_, width_, CV_16SC1); + } + else if((userField.datatype == sensor_msgs::msg::PointField::FLOAT32)) + { + RCLCPP_DEBUG_STREAM(node_->get_logger()," 1 32-bit float"); + userImage.encoding = sensor_msgs::image_encodings::TYPE_32FC1; + userImage.image = cv::Mat::zeros(height_, width_, CV_32FC1); + } + else if((userField.datatype == sensor_msgs::msg::PointField::FLOAT64)) + { + RCLCPP_DEBUG_STREAM(node_->get_logger()," 1 64-bit float"); + userImage.encoding = sensor_msgs::image_encodings::TYPE_64FC1; + userImage.image = cv::Mat::zeros(height_, width_, CV_64FC1); + } + else if(field_ != "depth") + { + RCLCPP_WARN_STREAM(node_->get_logger(), "Requested field '" << field_ << "' not found in point cloud!"); + return false; + } + return true; +} + +bool PointCloud2Subscriber::CreateDepthImage(const std_msgs::msg::Header &cloud_header, cv_bridge::CvImage& depthImage) +{ + depthImage.header = cloud_header; + if(frame_id_ != "") depthImage.header.frame_id = frame_id_; + depthImage.encoding = sensor_msgs::image_encodings::TYPE_32FC1; + depthImage.image = cv::Mat::ones(height_, width_, CV_32FC1) * std::numeric_limits::max(); + + return true; +} + +std::vector PointCloud2Subscriber::ProjectPoints(const sensor_msgs::msg::PointCloud2 &output_cloud, + const sensor_msgs::msg::PointField &xField, const sensor_msgs::msg::PointField &yField, const sensor_msgs::msg::PointField &zField, + const cv::Mat &intrinsic_matrix, const cv::Mat &distortion_coefficients, std::vector &obj_pts) +{ + // Create 3D points for projection + int size = output_cloud.height * output_cloud.width; + RCLCPP_DEBUG_STREAM(node_->get_logger()," Create 3D points for projection: " << size); + obj_pts.reserve(size); // Reserve space for better performance + + for (int i = 0; i < size; ++i) + { + // Find index of xyz data + size_t point_start = i * output_cloud.point_step; + size_t x_access = point_start + xField.offset; + size_t y_access = point_start + yField.offset; + size_t z_access = point_start + zField.offset; + + // Validate buffer bounds for coordinate access using correct field sizes (fields previously confirmed to exist and be floats) + if (x_access + 4 > output_cloud.data.size() || + y_access + 4 > output_cloud.data.size() || + z_access + 4 > output_cloud.data.size()) { + RCLCPP_WARN_STREAM_THROTTLE(node_->get_logger(), *node_->get_clock(), 1000, + "Buffer access out of bounds for point " << i << ", skipping point"); + continue; + } + + // Extract X, Y, Z coordinates + float X,Y,Z; + std::memcpy(&X, &output_cloud.data[x_access], sizeof(float)); + std::memcpy(&Y, &output_cloud.data[y_access], sizeof(float)); + std::memcpy(&Z, &output_cloud.data[z_access], sizeof(float)); + obj_pts.push_back(cv::Point3f(X, Y, Z)); + } + + cv::Mat rvec = cv::Mat::zeros(3, 1, cv::DataType::type); + cv::Mat tvec = cv::Mat::zeros(3, 1, cv::DataType::type); + std::vector img_pts; + cv::projectPoints(obj_pts, rvec, tvec, intrinsic_matrix, distortion_coefficients, img_pts); + return img_pts; +} + +cv_bridge::CvImage PointCloud2Subscriber::NormalizeImage(const cv_bridge::CvImage &inputImage) +{ + cv_bridge::CvImage normalized_image; + if(inputImage.encoding == sensor_msgs::image_encodings::TYPE_16UC3) { + cv::normalize(inputImage.image, normalized_image.image, 0, 65535, cv::NORM_MINMAX); + normalized_image.header = inputImage.header; + normalized_image.encoding = inputImage.encoding; + } else if(inputImage.encoding == sensor_msgs::image_encodings::TYPE_8UC3) { + cv::normalize(inputImage.image, normalized_image.image, 0, 255, cv::NORM_MINMAX); + normalized_image.header = inputImage.header; + normalized_image.encoding = inputImage.encoding; + } else if(inputImage.encoding == sensor_msgs::image_encodings::TYPE_16UC1) { + cv::normalize(inputImage.image, normalized_image.image, 0, 65535, cv::NORM_MINMAX); + normalized_image.header = inputImage.header; + normalized_image.encoding = inputImage.encoding; + } else if(inputImage.encoding == sensor_msgs::image_encodings::TYPE_16SC1) { + cv::normalize(inputImage.image, normalized_image.image, -32768, 32767, cv::NORM_MINMAX); + normalized_image.header = inputImage.header; + normalized_image.encoding = inputImage.encoding; + } else if(inputImage.encoding == sensor_msgs::image_encodings::TYPE_8UC1) { + cv::normalize(inputImage.image, normalized_image.image, 0, 255, cv::NORM_MINMAX); + normalized_image.header = inputImage.header; + normalized_image.encoding = inputImage.encoding; + } else if(inputImage.encoding == sensor_msgs::image_encodings::TYPE_8SC1) { + cv::normalize(inputImage.image, normalized_image.image, -128, 127, cv::NORM_MINMAX); + normalized_image.header = inputImage.header; + normalized_image.encoding = inputImage.encoding; + } else { + normalized_image = inputImage; + } + + return normalized_image; +} +cv_bridge::CvImage PointCloud2Subscriber::ConvertToColor(const cv::Mat &depthMask, const cv_bridge::CvImage &inputImage) +{ + // Create gradient background image (BGR format for depth visualization) + cv::Mat gradientBackground = cv::Mat::zeros(height_, width_, CV_8UC3); + for (int row = 0; row < height_; row++) { + for (int col = 0; col < width_; col++) { + // Create vertical saturation-based gradient from saturated medium blue at top to black at bottom + // Medium blue color: RGB(0, 80, 200) -> BGR(200, 80, 0) + float saturation_ratio = 1.0f - (float)row / (float)height_; // 1.0 at top (full saturation), 0.0 at bottom (black) + uint8_t blue_value = (uint8_t)(200 * saturation_ratio); // Blue channel (200->0) + uint8_t green_value = (uint8_t)(80 * saturation_ratio); // Green channel (80->0) + uint8_t red_value = (uint8_t)(0 * saturation_ratio); // Red channel (0->0) + + cv::Vec3b& pixel = gradientBackground.at(row, col); + pixel[0] = blue_value; // B + pixel[1] = green_value; // G + pixel[2] = red_value; // R + } + } + + // Composite the final image by blending data with gradient background + // Where mask == 255 (has data): use actual data values + // Where mask == 0 (no data): use gradient background values + RCLCPP_DEBUG_STREAM(node_->get_logger(), " Final Compositing"); + // Setup color image + RCLCPP_DEBUG_STREAM(node_->get_logger()," Color"); + cv_bridge::CvImage colorImage; + colorImage.header = inputImage.header; + if(frame_id_ != "") colorImage.header.frame_id = frame_id_; + if(inputImage.encoding == sensor_msgs::image_encodings::TYPE_32FC1) { + RCLCPP_DEBUG_STREAM(node_->get_logger(),"Converting TYPE_32FC1 to color"); + colorImage.encoding = sensor_msgs::image_encodings::BGR8; + colorImage.image = cv::Mat::zeros(height_, width_, 'bgr8'); + for (int row = 0; row < height_; row++) { + for (int col = 0; col < width_; col++) { + if (depthMask.at(row, col) == 255) { + // Pixel has real depth data - convert depth to grayscale and display as white/gray + float depth_value = inputImage.image.at(row, col); + // Normalize depth to 0-255 range (assuming max depth ~10 meters) + uint8_t intensity = static_cast(std::min(255.0f, depth_value * 2.55f)); // 10m -> 255 + + cv::Vec3b& pixel = colorImage.image.at(row, col); + pixel[0] = intensity; // B + pixel[1] = intensity; // G + pixel[2] = intensity; // R (grayscale) + } else { + // Pixel has no depth data - use gradient background value + colorImage.image.at(row, col) = gradientBackground.at(row, col); + } + } + } + } else if(inputImage.encoding == sensor_msgs::image_encodings::TYPE_32FC3) { + RCLCPP_DEBUG_STREAM(node_->get_logger(),"Converting TYPE_32FC3 to color"); + colorImage.encoding = sensor_msgs::image_encodings::BGR8; + colorImage.image = cv::Mat::zeros(height_, width_, 'bgr8'); + for (int row = 0; row < height_; row++) { + for (int col = 0; col < width_; col++) { + if (depthMask.at(row, col) == 255) { + // Pixel has real depth data - convert depth to grayscale and display as white/gray + cv::Vec3f depth_value = inputImage.image.at(row, col); + + cv::Vec3b& pixel = colorImage.image.at(row, col); + pixel[0] = static_cast(std::min(255.0f, depth_value[0] * 2.55f)); // B + pixel[1] = static_cast(std::min(255.0f, depth_value[1] * 2.55f)); // G + pixel[2] = static_cast(std::min(255.0f, depth_value[2] * 2.55f)); // R (grayscale) + } else { + // Pixel has no depth data - use gradient background value + colorImage.image.at(row, col) = gradientBackground.at(row, col); + } + } + } + } else if(inputImage.encoding == sensor_msgs::image_encodings::TYPE_16UC3) { + RCLCPP_DEBUG_STREAM(node_->get_logger(),"Converting TYPE_16UC3 to color"); + cv::Mat gradientBackground16; + gradientBackground.convertTo(gradientBackground16, CV_16UC3); + colorImage.encoding = sensor_msgs::image_encodings::BGR16; + colorImage.image = cv::Mat::zeros(height_, width_, 'bgr16'); + for (int row = 0; row < height_; row++) { + for (int col = 0; col < width_; col++) { + if (depthMask.at(row, col) == 255) { + colorImage.image.at(row, col) = inputImage.image.at(row, col); + } + else + { + // Pixel has no depth data - use gradient background value scaled to 16 bit + colorImage.image.at(row, col) = gradientBackground16.at(row, col); + } + } + } + } else if(inputImage.encoding == sensor_msgs::image_encodings::TYPE_8UC3) { + RCLCPP_DEBUG_STREAM(node_->get_logger(),"Converting TYPE_8UC3 to color"); + colorImage.encoding = sensor_msgs::image_encodings::BGR8; + colorImage.image = cv::Mat::zeros(height_, width_, 'bgr8'); + for (int row = 0; row < height_; row++) { + for (int col = 0; col < width_; col++) { + if (depthMask.at(row, col) == 255) { + colorImage.image.at(row, col) = inputImage.image.at(row, col); + } else { + // Pixel has no depth data - use gradient background value + colorImage.image.at(row, col) = gradientBackground.at(row, col); + } + } + } + } else if(inputImage.encoding == sensor_msgs::image_encodings::TYPE_8UC1) { + RCLCPP_DEBUG_STREAM(node_->get_logger(),"Converting TYPE_8UC1 to color"); + colorImage.encoding = sensor_msgs::image_encodings::BGR8; + colorImage.image = cv::Mat::zeros(height_, width_, 'bgr8'); + for (int row = 0; row < height_; row++) { + for (int col = 0; col < width_; col++) { + if (depthMask.at(row, col) == 255) { + // Pixel has real depth data - convert depth to grayscale and display as white/gray + float depth_value = inputImage.image.at(row, col); + // Normalize depth to 0-255 range (assuming max depth ~10 meters) + uint8_t intensity =depth_value; // 10m -> 255 + + cv::Vec3b& pixel = colorImage.image.at(row, col); + pixel[0] = intensity; // B + pixel[1] = intensity; // G + pixel[2] = intensity; // R (grayscale) + } else { + // Pixel has no depth data - use gradient background value + colorImage.image.at(row, col) = gradientBackground.at(row, col); + } + } + } + } else if(inputImage.encoding == sensor_msgs::image_encodings::TYPE_8SC1) { + RCLCPP_DEBUG_STREAM(node_->get_logger(),"Converting TYPE_8SC1 to color"); + colorImage.encoding = sensor_msgs::image_encodings::BGR8; + colorImage.image = cv::Mat::zeros(height_, width_, 'bgr8'); + for (int row = 0; row < height_; row++) { + for (int col = 0; col < width_; col++) { + if (depthMask.at(row, col) == 255) { + // Pixel has real depth data - convert depth to grayscale and display as white/gray + int8_t depth_value = inputImage.image.at(row, col); + // Normalize depth to 0-255 range + uint8_t intensity = static_cast(depth_value + 128); + + cv::Vec3b& pixel = colorImage.image.at(row, col); + pixel[0] = intensity; // B + pixel[1] = intensity; // G + pixel[2] = intensity; // R (grayscale) + } else { + // Pixel has no depth data - use gradient background value + colorImage.image.at(row, col) = gradientBackground.at(row, col); + } + } + } + } else if(inputImage.encoding == sensor_msgs::image_encodings::TYPE_16UC1) { + RCLCPP_DEBUG_STREAM(node_->get_logger(),"Converting TYPE_16UC1 to color"); + cv::Mat gradientBackground16; + gradientBackground.convertTo(gradientBackground16, CV_16UC3, 257.0, 0.0); + colorImage.encoding = sensor_msgs::image_encodings::BGR16; + colorImage.image = cv::Mat::zeros(height_, width_, 'bgr16'); + for (int row = 0; row < height_; row++) { + for (int col = 0; col < width_; col++) { + if (depthMask.at(row, col) == 255) { + // Pixel has real depth data - convert depth to grayscale and display as white/gray + uint16_t depth_value = inputImage.image.at(row, col); + // Normalize depth to 0-255 range + uint16_t intensity = static_cast(depth_value); + + cv::Vec3w& pixel = colorImage.image.at(row, col); + pixel[0] = intensity; // B + pixel[1] = intensity; // G + pixel[2] = intensity; // R (grayscale) + } else { + // Pixel has no depth data - use gradient background value scaled to 16 bit + colorImage.image.at(row, col) = gradientBackground16.at(row, col); + } + } + } + } else if(inputImage.encoding == sensor_msgs::image_encodings::TYPE_16SC1) { + RCLCPP_DEBUG_STREAM(node_->get_logger(),"Converting TYPE_16SC1 to color"); + cv::Mat gradientBackground16; + gradientBackground.convertTo(gradientBackground16, CV_16UC3, 257.0, 0.0); + for (int row = 0; row < height_; row++) { + for (int col = 0; col < width_; col++) { + if (depthMask.at(row, col) == 255) { + // Pixel has real depth data - convert depth to grayscale and display as white/gray + int16_t depth_value = inputImage.image.at(row, col); + // Normalize depth to 0-255 range + uint16_t intensity = static_cast(depth_value + 32768); + + cv::Vec3w& pixel = colorImage.image.at(row, col); + pixel[0] = intensity; // B + pixel[1] = intensity; // G + pixel[2] = intensity; // R (grayscale) + } else { + // Pixel has no depth data - use gradient background value scaled to 16 bit + colorImage.image.at(row, col) = gradientBackground16.at(row, col); + } + } + } + } else { + RCLCPP_DEBUG_STREAM(node_->get_logger(),"Cannot convert to color: " << inputImage.encoding.c_str()); + colorImage = inputImage; + } + + return colorImage; +} + +std::shared_ptr PointCloud2SubscriberType::create_subscriber(rclcpp::Node::SharedPtr node) +{ + return std::shared_ptr( + new PointCloud2Subscriber(node)); +} + +} \ No newline at end of file diff --git a/src/subscribers/ros_subscriber.cpp b/src/subscribers/ros_subscriber.cpp new file mode 100644 index 0000000..dedd019 --- /dev/null +++ b/src/subscribers/ros_subscriber.cpp @@ -0,0 +1,51 @@ +#include "web_video_server/subscribers/ros_subscriber.hpp" + +namespace web_video_server +{ + +RosSubscriber::RosSubscriber(rclcpp::Node::SharedPtr node) +{ + node_ = node; +} + +RosSubscriber::~RosSubscriber() +{ +} + +void RosSubscriber::subscribe(const async_web_server_cpp::HttpRequest &request, + const std::string& topic, + const ImageCallback& callback) +{ + std::scoped_lock lock(subscriber_mutex_); + + callback_ = callback; + qos_profile_name_ = request.get_query_param_value_or_default("qos_profile", "default"); + + // Get QoS profile from query parameter + RCLCPP_INFO( + node_->get_logger(), "Streaming topic %s with QoS profile %s", topic.c_str(), + qos_profile_name_.c_str()); + auto qos_profile = get_qos_profile_from_name(qos_profile_name_); + if (!qos_profile) { + qos_profile = rmw_qos_profile_default; + RCLCPP_ERROR( + node_->get_logger(), + "Invalid QoS profile %s specified. Using default profile.", + qos_profile_name_.c_str()); + } + + rclcpp::QoS qos = rclcpp::QoS( + rclcpp::QoSInitialization(qos_profile.value().history, 1), + qos_profile.value()); + + ros_sub_ = node_->create_subscription(topic, qos, std::bind(&RosSubscriber::subscriberCallback, this, std::placeholders::_1)); +} + + +void RosSubscriber::subscriberCallback(const sensor_msgs::msg::Image::ConstSharedPtr &input_msg) +{ + std::scoped_lock lock(subscriber_mutex_); + callback_(input_msg); +} + +} \ No newline at end of file diff --git a/src/vp8_streamer.cpp b/src/vp8_streamer.cpp index c4c1d0c..e182fe2 100644 --- a/src/vp8_streamer.cpp +++ b/src/vp8_streamer.cpp @@ -42,6 +42,7 @@ Vp8Streamer::Vp8Streamer( } Vp8Streamer::~Vp8Streamer() { + std::scoped_lock lock(send_mutex_); } void Vp8Streamer::initializeEncoder() diff --git a/src/vp9_streamer.cpp b/src/vp9_streamer.cpp index 65511a7..8b7acd7 100644 --- a/src/vp9_streamer.cpp +++ b/src/vp9_streamer.cpp @@ -40,6 +40,7 @@ Vp9Streamer::Vp9Streamer( } Vp9Streamer::~Vp9Streamer() { + std::scoped_lock lock(send_mutex_); } void Vp9Streamer::initializeEncoder() diff --git a/src/web_video_server.cpp b/src/web_video_server.cpp index 24db2fb..b5cb8bd 100644 --- a/src/web_video_server.cpp +++ b/src/web_video_server.cpp @@ -48,7 +48,7 @@ #include "async_web_server_cpp/http_reply.hpp" using namespace std::chrono_literals; -using namespace boost::placeholders; // NOLINT +using namespace std::placeholders; // NOLINT namespace web_video_server { @@ -63,6 +63,11 @@ WebVideoServer::WebVideoServer(const rclcpp::NodeOptions & options) declare_parameter("server_threads", 1); declare_parameter("publish_rate", -1.0); declare_parameter("default_stream_type", "mjpeg"); + declare_parameter("wait_for_tf_delay", 0.1); + declare_parameter("frame_id", ""); + declare_parameter("normalize", true); + declare_parameter("colorize", true); + declare_parameter("field", "depth"); get_parameter("port", port_); get_parameter("verbose", verbose_); @@ -79,36 +84,10 @@ WebVideoServer::WebVideoServer(const rclcpp::NodeOptions & options) stream_types_["h264"] = std::make_shared(); stream_types_["vp9"] = std::make_shared(); - handler_group_.addHandlerForPath( - "/", - boost::bind(&WebVideoServer::handle_list_streams, this, _1, _2, _3, _4)); - handler_group_.addHandlerForPath( - "/stream", - boost::bind(&WebVideoServer::handle_stream, this, _1, _2, _3, _4)); - handler_group_.addHandlerForPath( - "/stream_viewer", - boost::bind(&WebVideoServer::handle_stream_viewer, this, _1, _2, _3, _4)); - handler_group_.addHandlerForPath( - "/snapshot", - boost::bind(&WebVideoServer::handle_snapshot, this, _1, _2, _3, _4)); - - try { - server_.reset( - new async_web_server_cpp::HttpServer( - address_, std::to_string(port_), - boost::bind(&WebVideoServer::handle_request, this, _1, _2, _3, _4), - server_threads - ) - ); - } catch (boost::exception & e) { - RCLCPP_ERROR( - get_logger(), "Exception when creating the web server! %s:%d", - address_.c_str(), port_); - throw; - } - - RCLCPP_INFO(get_logger(), "Waiting For connections on %s:%d", address_.c_str(), port_); + initializeHttpServer(server_threads); + + // Initialize timers if (publish_rate_ > 0) { create_wall_timer(1s / publish_rate_, [this]() {restreamFrames(1s / publish_rate_);}); } @@ -125,26 +104,26 @@ WebVideoServer::~WebVideoServer() void WebVideoServer::restreamFrames(std::chrono::duration max_age) { - std::scoped_lock lock(subscriber_mutex_); + std::scoped_lock lock(streamer_mutex_); - for (auto & subscriber : image_subscribers_) { - subscriber->restreamFrame(max_age); + for (auto & streamer : image_streamers_) { + streamer->restreamFrame(max_age); } } void WebVideoServer::cleanup_inactive_streams() { - std::unique_lock lock(subscriber_mutex_, std::try_to_lock); + std::unique_lock lock(streamer_mutex_, std::try_to_lock); if (lock) { auto new_end = std::partition( - image_subscribers_.begin(), image_subscribers_.end(), + image_streamers_.begin(), image_streamers_.end(), [](const std::shared_ptr & streamer) {return !streamer->isInactive();}); if (verbose_) { - for (auto itr = new_end; itr < image_subscribers_.end(); ++itr) { + for (auto itr = new_end; itr < image_streamers_.end(); ++itr) { RCLCPP_INFO(get_logger(), "Removed Stream: %s", (*itr)->getTopic().c_str()); } } - image_subscribers_.erase(new_end, image_subscribers_.end()); + image_streamers_.erase(new_end, image_streamers_.end()); } } @@ -170,7 +149,9 @@ bool WebVideoServer::handle_stream( async_web_server_cpp::HttpConnectionPtr connection, const char * begin, const char * end) { + RCLCPP_INFO_STREAM(get_logger(), "Request: " << request.uri); std::string type = request.get_query_param_value_or_default("type", default_stream_type_); + RCLCPP_INFO_STREAM(get_logger(), " Received request for stream of type: " << type); if (stream_types_.find(type) != stream_types_.end()) { std::string topic = request.get_query_param_value_or_default("topic", ""); // Fallback for topics without corresponding compressed topics @@ -198,11 +179,12 @@ bool WebVideoServer::handle_stream( type = "mjpeg"; } } + RCLCPP_INFO_STREAM(get_logger(), " Starting stream of type: " << type); std::shared_ptr streamer = stream_types_[type]->create_streamer( request, connection, shared_from_this()); streamer->start(); - std::scoped_lock lock(subscriber_mutex_); - image_subscribers_.push_back(streamer); + std::scoped_lock lock(streamer_mutex_); + image_streamers_.push_back(streamer); } else { async_web_server_cpp::HttpReply::stock_reply(async_web_server_cpp::HttpReply::not_found)( request, connection, begin, end); @@ -219,8 +201,8 @@ bool WebVideoServer::handle_snapshot( request, connection, shared_from_this()); streamer->start(); - std::scoped_lock lock(subscriber_mutex_); - image_subscribers_.push_back(streamer); + std::scoped_lock lock(streamer_mutex_); + image_streamers_.push_back(streamer); return true; } @@ -284,6 +266,7 @@ bool WebVideoServer::handle_list_streams( { std::vector image_topics; std::vector camera_info_topics; + std::vector pointcloud2_topics; auto tnat = get_topic_names_and_types(); for (auto topic_and_types : tnat) { if (topic_and_types.second.size() > 1) { @@ -297,6 +280,8 @@ bool WebVideoServer::handle_list_streams( image_topics.push_back(topic_name); } else if (topic_type == "sensor_msgs/msg/CameraInfo") { camera_info_topics.push_back(topic_name); + } else if (topic_type == "sensor_msgs/msg/PointCloud2") { + pointcloud2_topics.push_back(topic_name); } } @@ -331,7 +316,7 @@ bool WebVideoServer::handle_list_streams( connection->write(" ("); connection->write("write(*image_topic_itr); - connection->write("\">Stream) ("); + connection->write("\">HTTP Stream) ("); connection->write("write(*image_topic_itr); connection->write("\">Snapshot)"); @@ -358,7 +343,7 @@ bool WebVideoServer::handle_list_streams( connection->write(" ("); connection->write("write(*image_topic_itr); - connection->write("\">Stream) ("); + connection->write("\">HTTP Stream) ("); connection->write("write(*image_topic_itr); connection->write("\">Snapshot)"); @@ -366,10 +351,70 @@ bool WebVideoServer::handle_list_streams( image_topic_itr = image_topics.erase(image_topic_itr); } + connection->write(""); + + connection->write( + "" + "ROS PointCloud2 Topic List" + "

Available ROS PointCloud2 Topics:

"); + //Add the pointcloud2 topics + connection->write(""); + + //End connection->write(""); return true; } +void WebVideoServer::initializeHttpServer(int server_threads) { + // Setup HTTP request handlers + handler_group_.addHandlerForPath( + "/", + boost::bind(&WebVideoServer::handle_list_streams, this, _1, _2, _3, _4)); + handler_group_.addHandlerForPath( + "/stream", + boost::bind(&WebVideoServer::handle_stream, this, _1, _2, _3, _4)); + handler_group_.addHandlerForPath( + "/stream_viewer", + boost::bind(&WebVideoServer::handle_stream_viewer, this, _1, _2, _3, _4)); + handler_group_.addHandlerForPath( + "/snapshot", + boost::bind(&WebVideoServer::handle_snapshot, this, _1, _2, _3, _4)); + + try { + server_.reset( + new async_web_server_cpp::HttpServer( + address_, std::to_string(port_), + boost::bind(&WebVideoServer::handle_request, this, _1, _2, _3, _4), + server_threads + ) + ); + RCLCPP_INFO(get_logger(), "HTTP server initialized on %s:%d", address_.c_str(), port_); + } catch (boost::exception & e) { + RCLCPP_ERROR( + get_logger(), "Exception when creating the HTTP server! %s:%d", + address_.c_str(), port_); + throw; + } +} + } // namespace web_video_server #include "rclcpp_components/register_node_macro.hpp"