diff --git a/CMakeLists.txt b/CMakeLists.txt index 766b4a2..1befbf4 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -10,8 +10,10 @@ find_package(ament_cmake_ros REQUIRED) find_package(async_web_server_cpp REQUIRED) find_package(cv_bridge REQUIRED) find_package(image_transport REQUIRED) +find_package(pluginlib REQUIRED) find_package(rclcpp REQUIRED) find_package(rclcpp_components REQUIRED) +find_package(rmw REQUIRED) find_package(sensor_msgs REQUIRED) find_package(OpenCV REQUIRED) @@ -39,36 +41,60 @@ if(${cv_bridge_VERSION} VERSION_LESS "3.3.0") add_compile_definitions(CV_BRIDGE_USES_OLD_HEADERS) endif() -## Specify additional locations of header files -include_directories(include - ${avcodec_INCLUDE_DIRS} - ${avformat_INCLUDE_DIRS} - ${avutil_INCLUDE_DIRS} - ${swscale_INCLUDE_DIRS} -) - ## Declare a cpp library add_library(${PROJECT_NAME} SHARED src/web_video_server.cpp - src/image_streamer.cpp - src/libav_streamer.cpp - src/vp8_streamer.cpp - src/h264_streamer.cpp - src/vp9_streamer.cpp src/multipart_stream.cpp - src/ros_compressed_streamer.cpp - src/jpeg_streamers.cpp - src/png_streamers.cpp + src/streamer.cpp src/utils.cpp ) +target_include_directories(${PROJECT_NAME} + PUBLIC + "$" + "$" +) + ## Specify libraries to link a library or executable target against target_link_libraries(${PROJECT_NAME} + PUBLIC + async_web_server_cpp::async_web_server_cpp + pluginlib::pluginlib + rclcpp::rclcpp + rmw::rmw + Boost::boost + PRIVATE + rclcpp_components::component +) + +add_library(${PROJECT_NAME}_streamers SHARED + src/streamers/image_transport_streamer.cpp + src/streamers/libav_streamer.cpp + src/streamers/h264_streamer.cpp + src/streamers/jpeg_streamers.cpp + src/streamers/png_streamers.cpp + src/streamers/ros_compressed_streamer.cpp + src/streamers/vp8_streamer.cpp + src/streamers/vp9_streamer.cpp +) + +target_include_directories(${PROJECT_NAME}_streamers + PUBLIC + "$" + "$" + ${avcodec_INCLUDE_DIRS} + ${avformat_INCLUDE_DIRS} + ${avutil_INCLUDE_DIRS} + ${swscale_INCLUDE_DIRS} +) + +target_link_libraries(${PROJECT_NAME}_streamers + ${PROJECT_NAME} async_web_server_cpp::async_web_server_cpp cv_bridge::cv_bridge image_transport::image_transport + pluginlib::pluginlib rclcpp::rclcpp - rclcpp_components::component ${sensor_msgs_TARGETS} Boost::boost Boost::system @@ -90,24 +116,34 @@ target_link_libraries(${PROJECT_NAME}_node rclcpp_components_register_nodes(${PROJECT_NAME} "web_video_server::WebVideoServer") +pluginlib_export_plugin_description_file(web_video_server plugins.xml) + ############# ## Install ## ############# -## Mark executables and/or libraries for installation install( DIRECTORY include/ DESTINATION include/${PROJECT_NAME} ) install( - TARGETS ${PROJECT_NAME} + TARGETS ${PROJECT_NAME} ${PROJECT_NAME}_streamers EXPORT export_${PROJECT_NAME} LIBRARY DESTINATION lib ARCHIVE DESTINATION lib RUNTIME DESTINATION bin ) ament_export_targets(export_${PROJECT_NAME} HAS_LIBRARY_TARGET) +ament_export_dependencies( + async_web_server_cpp + cv_bridge + image_transport + pluginlib + rclcpp + rmw + sensor_msgs +) set_target_properties(${PROJECT_NAME}_node PROPERTIES OUTPUT_NAME ${PROJECT_NAME}) install( @@ -132,4 +168,6 @@ if(BUILD_TESTING) ament_lint_auto_find_test_dependencies() endif() -ament_package() +ament_package( + CONFIG_EXTRAS "config-extras.cmake" +) diff --git a/config-extras.cmake b/config-extras.cmake new file mode 100644 index 0000000..79f31b6 --- /dev/null +++ b/config-extras.cmake @@ -0,0 +1,32 @@ +# Copyright (c) 2025, The Robot Web Tools Contributors +# All rights reserved. +# +# Redistribution and use in source and binary forms, with or without +# modification, are permitted provided that the following conditions are met: +# +# * Redistributions of source code must retain the above copyright +# notice, this list of conditions and the following disclaimer. +# +# * Redistributions in binary form must reproduce the above copyright +# notice, this list of conditions and the following disclaimer in the +# documentation and/or other materials provided with the distribution. +# +# * Neither the name of the copyright holder nor the names of its +# contributors may be used to endorse or promote products derived from +# this software without specific prior written permission. +# +# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" +# AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE +# IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE +# ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE +# LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR +# CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF +# SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS +# INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN +# CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) +# ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE +# POSSIBILITY OF SUCH DAMAGE. + +# ament_export_dependencies fails to propagate Boost components +# so we need to explicitly find Boost here +find_package(Boost REQUIRED COMPONENTS system) diff --git a/include/web_video_server/multipart_stream.hpp b/include/web_video_server/multipart_stream.hpp index fae1352..4e5c8cf 100644 --- a/include/web_video_server/multipart_stream.hpp +++ b/include/web_video_server/multipart_stream.hpp @@ -1,5 +1,5 @@ // Copyright (c) 2014, Worcester Polytechnic Institute -// Copyright (c) 2024, The Robot Web Tools Contributors +// Copyright (c) 2024-2025, The Robot Web Tools Contributors // All rights reserved. // // Redistribution and use in source and binary forms, with or without @@ -30,12 +30,15 @@ #pragma once +#include +#include #include #include #include #include -#include "rclcpp/rclcpp.hpp" +#include + #include "async_web_server_cpp/http_connection.hpp" namespace web_video_server @@ -47,34 +50,37 @@ struct PendingFooter std::weak_ptr contents; }; +/** + * Helper class to manage sending multipart HTTP responses. + */ class MultipartStream { public: MultipartStream( async_web_server_cpp::HttpConnectionPtr & connection, - const std::string & boundry = "boundarydonotcross", + const std::string & boundary = "boundarydonotcross", std::size_t max_queue_size = 1); - void sendInitialHeader(); - void sendPartHeader( + void send_initial_header(); + void send_part_header( const std::chrono::steady_clock::time_point & time, const std::string & type, size_t payload_size); - void sendPartFooter(const std::chrono::steady_clock::time_point & time); - void sendPartAndClear( + void send_part_footer(const std::chrono::steady_clock::time_point & time); + void send_part_and_clear( const std::chrono::steady_clock::time_point & time, const std::string & type, std::vector & data); - void sendPart( + void send_part( const std::chrono::steady_clock::time_point & time, const std::string & type, const boost::asio::const_buffer & buffer, async_web_server_cpp::HttpConnection::ResourcePtr resource); private: - bool isBusy(); + bool is_busy(); private: const std::size_t max_queue_size_; async_web_server_cpp::HttpConnectionPtr connection_; - std::string boundry_; + std::string boundary_; std::queue pending_footers_; }; diff --git a/include/web_video_server/streamer.hpp b/include/web_video_server/streamer.hpp new file mode 100644 index 0000000..c7b6734 --- /dev/null +++ b/include/web_video_server/streamer.hpp @@ -0,0 +1,145 @@ +// Copyright (c) 2014, Worcester Polytechnic Institute +// Copyright (c) 2024-2025, The Robot Web Tools Contributors +// All rights reserved. +// +// Redistribution and use in source and binary forms, with or without +// modification, are permitted provided that the following conditions are met: +// +// * Redistributions of source code must retain the above copyright +// notice, this list of conditions and the following disclaimer. +// +// * Redistributions in binary form must reproduce the above copyright +// notice, this list of conditions and the following disclaimer in the +// documentation and/or other materials provided with the distribution. +// +// * Neither the name of the copyright holder nor the names of its +// contributors may be used to endorse or promote products derived from +// this software without specific prior written permission. +// +// THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" +// AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE +// IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE +// ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE +// LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR +// CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF +// SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS +// INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN +// CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) +// ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE +// POSSIBILITY OF SUCH DAMAGE. + +#pragma once + +#include +#include +#include +#include + +#include "async_web_server_cpp/http_connection.hpp" +#include "async_web_server_cpp/http_request.hpp" +#include "rclcpp/logger.hpp" +#include "rclcpp/node.hpp" + +namespace web_video_server +{ + +/** + * @brief A common interface for all streaming plugins. + */ +class StreamerInterface +{ +public: + StreamerInterface( + const async_web_server_cpp::HttpRequest & request, + async_web_server_cpp::HttpConnectionPtr connection, + rclcpp::Node::WeakPtr node, + std::string logger_name = "streamer"); + virtual ~StreamerInterface(); + + /** + * @brief Starts the streaming process. + */ + virtual void start() = 0; + + /** + * @brief Returns true if the streamer is inactive and should be deleted. + * + * This could be because the connection was closed or snapshot was successfully sent (in case + * of snapshot streamers). + */ + bool is_inactive() + { + return inactive_; + } + + /** + * @brief Restreams the last received image frame if older than max_age. + */ + virtual void restream_frame(std::chrono::duration max_age) = 0; + + /** + * @brief Returns the topic being streamed. + */ + std::string get_topic() + { + return topic_; + } + +protected: + rclcpp::Node::SharedPtr lock_node() const; + + async_web_server_cpp::HttpConnectionPtr connection_; + async_web_server_cpp::HttpRequest request_; + rclcpp::Node::WeakPtr node_; + rclcpp::Logger logger_; + bool inactive_; + std::string topic_; +}; + +/** + * @brief A factory interface for creating Streamer instances. + */ +class StreamerFactoryInterface +{ +public: + virtual ~StreamerFactoryInterface() = default; + + /** + * @brief Returns the type of streamer created by this factory. + * + * This should match the "type" query parameter used to select the streamer. + */ + virtual std::string get_type() = 0; + + /** + * @brief Creates a new Streamer instance. + * @param request The HTTP request that initiated the streamer. + * @param connection The HTTP connection to use for streaming. + * @param node The ROS2 node to use for subscribing to topics. + * @return A shared pointer to the created Streamer instance. + */ + virtual std::shared_ptr create_streamer( + const async_web_server_cpp::HttpRequest & request, + async_web_server_cpp::HttpConnectionPtr connection, + rclcpp::Node::WeakPtr node) = 0; + + /** + * @brief Creates HTML code for embedding a viewer for this streamer. + * @param request The HTTP request that initiated the viewer. + */ + virtual std::string create_viewer(const async_web_server_cpp::HttpRequest & request); + + /** + * @brief Returns a list of available topics that can be streamed by this streamer. + * @param node The ROS2 node to use for discovering topics. + * @return A vector of topic names. + */ + virtual std::vector get_available_topics(rclcpp::Node & node); +}; + +/** + * @brief A factory interface for creating snapshot Streamer instances. + */ +class SnapshotStreamerFactoryInterface : public StreamerFactoryInterface {}; + +} // namespace web_video_server diff --git a/include/web_video_server/h264_streamer.hpp b/include/web_video_server/streamers/h264_streamer.hpp similarity index 79% rename from include/web_video_server/h264_streamer.hpp rename to include/web_video_server/streamers/h264_streamer.hpp index 3df01b4..e6cc088 100644 --- a/include/web_video_server/h264_streamer.hpp +++ b/include/web_video_server/streamers/h264_streamer.hpp @@ -1,4 +1,4 @@ -// Copyright (c) 2024, The Robot Web Tools Contributors +// Copyright (c) 2024-2025, The Robot Web Tools Contributors // All rights reserved. // // Redistribution and use in source and binary forms, with or without @@ -32,36 +32,41 @@ #include #include -#include "image_transport/image_transport.hpp" -#include "web_video_server/libav_streamer.hpp" #include "async_web_server_cpp/http_request.hpp" #include "async_web_server_cpp/http_connection.hpp" +#include "rclcpp/node.hpp" + +#include "web_video_server/streamer.hpp" +#include "web_video_server/streamers/libav_streamer.hpp" namespace web_video_server { +namespace streamers +{ -class H264Streamer : public LibavStreamer +class H264Streamer : public LibavStreamerBase { public: H264Streamer( const async_web_server_cpp::HttpRequest & request, async_web_server_cpp::HttpConnectionPtr connection, - rclcpp::Node::SharedPtr node); + rclcpp::Node::WeakPtr node); ~H264Streamer(); protected: - virtual void initializeEncoder(); + virtual void initialize_encoder(); std::string preset_; }; -class H264StreamerType : public LibavStreamerType +class H264StreamerFactory : public LibavStreamerFactoryBase { public: - H264StreamerType(); - std::shared_ptr create_streamer( + std::string get_type() {return "h264";} + std::shared_ptr create_streamer( const async_web_server_cpp::HttpRequest & request, async_web_server_cpp::HttpConnectionPtr connection, - rclcpp::Node::SharedPtr node); + rclcpp::Node::WeakPtr node); }; +} // namespace streamers } // namespace web_video_server diff --git a/include/web_video_server/image_streamer.hpp b/include/web_video_server/streamers/image_transport_streamer.hpp similarity index 55% rename from include/web_video_server/image_streamer.hpp rename to include/web_video_server/streamers/image_transport_streamer.hpp index 0e001c5..d718b42 100644 --- a/include/web_video_server/image_streamer.hpp +++ b/include/web_video_server/streamers/image_transport_streamer.hpp @@ -1,5 +1,4 @@ -// Copyright (c) 2014, Worcester Polytechnic Institute -// Copyright (c) 2024, The Robot Web Tools Contributors +// Copyright (c) 2024-2025, The Robot Web Tools Contributors // All rights reserved. // // Redistribution and use in source and binary forms, with or without @@ -32,71 +31,46 @@ #include #include +#include #include +#include -#include +#include -#include "rclcpp/rclcpp.hpp" -#include "image_transport/image_transport.hpp" -#include "image_transport/transport_hints.hpp" -#include "web_video_server/utils.hpp" -#include "async_web_server_cpp/http_server.hpp" +#include "async_web_server_cpp/http_connection.hpp" #include "async_web_server_cpp/http_request.hpp" +#include "image_transport/image_transport.hpp" +#include "image_transport/subscriber.hpp" +#include "rclcpp/node.hpp" +#include "sensor_msgs/msg/image.hpp" + +#include "web_video_server/streamer.hpp" namespace web_video_server { - -class ImageStreamer +namespace streamers { -public: - ImageStreamer( - const async_web_server_cpp::HttpRequest & request, - async_web_server_cpp::HttpConnectionPtr connection, - rclcpp::Node::SharedPtr node); - - virtual void start() = 0; - virtual ~ImageStreamer(); - - bool isInactive() - { - return inactive_; - } - - /** - * Restreams the last received image frame if older than max_age. - */ - virtual void restreamFrame(std::chrono::duration max_age) = 0; - - std::string getTopic() - { - return topic_; - } -protected: - async_web_server_cpp::HttpConnectionPtr connection_; - async_web_server_cpp::HttpRequest request_; - rclcpp::Node::SharedPtr node_; - bool inactive_; - image_transport::Subscriber image_sub_; - std::string topic_; -}; - - -class ImageTransportImageStreamer : public ImageStreamer +/** + * @brief A common base class for all streaming plugins using image_transport to subscribe to image + * topics. + */ +class ImageTransportStreamerBase : public StreamerInterface { public: - ImageTransportImageStreamer( + ImageTransportStreamerBase( const async_web_server_cpp::HttpRequest & request, async_web_server_cpp::HttpConnectionPtr connection, - rclcpp::Node::SharedPtr node); - virtual ~ImageTransportImageStreamer(); + rclcpp::Node::WeakPtr node, + std::string logger_name = "image_transport_streamer"); + virtual ~ImageTransportStreamerBase(); virtual void start(); + virtual void restream_frame(std::chrono::duration max_age); protected: - virtual cv::Mat decodeImage(const sensor_msgs::msg::Image::ConstSharedPtr & msg); - virtual void sendImage(const cv::Mat &, const std::chrono::steady_clock::time_point & time) = 0; - virtual void restreamFrame(std::chrono::duration max_age); + virtual cv::Mat decode_image(const sensor_msgs::msg::Image::ConstSharedPtr & msg); + virtual void send_image(const cv::Mat &, const std::chrono::steady_clock::time_point & time) = 0; virtual void initialize(const cv::Mat &); image_transport::Subscriber image_sub_; @@ -111,21 +85,25 @@ class ImageTransportImageStreamer : public ImageStreamer std::mutex send_mutex_; private: - image_transport::ImageTransport it_; bool initialized_; - void imageCallback(const sensor_msgs::msg::Image::ConstSharedPtr & msg); + void image_callback(const sensor_msgs::msg::Image::ConstSharedPtr & msg); + void try_send_image( + const cv::Mat & img, const std::chrono::steady_clock::time_point & time, + rclcpp::Node & node); }; -class ImageStreamerType +class ImageTransportStreamerFactoryBase : public StreamerFactoryInterface { public: - virtual std::shared_ptr create_streamer( - const async_web_server_cpp::HttpRequest & request, - async_web_server_cpp::HttpConnectionPtr connection, - rclcpp::Node::SharedPtr node) = 0; + virtual std::vector get_available_topics(rclcpp::Node & node); +}; - virtual std::string create_viewer(const async_web_server_cpp::HttpRequest & request) = 0; +class ImageTransportSnapshotStreamerFactoryBase : public SnapshotStreamerFactoryInterface +{ +public: + virtual std::vector get_available_topics(rclcpp::Node & node); }; +} // namespace streamers } // namespace web_video_server diff --git a/include/web_video_server/jpeg_streamers.hpp b/include/web_video_server/streamers/jpeg_streamers.hpp similarity index 65% rename from include/web_video_server/jpeg_streamers.hpp rename to include/web_video_server/streamers/jpeg_streamers.hpp index 364357f..7e060a0 100644 --- a/include/web_video_server/jpeg_streamers.hpp +++ b/include/web_video_server/streamers/jpeg_streamers.hpp @@ -1,5 +1,5 @@ // Copyright (c) 2014, Worcester Polytechnic Institute -// Copyright (c) 2024, The Robot Web Tools Contributors +// Copyright (c) 2024-2025, The Robot Web Tools Contributors // All rights reserved. // // Redistribution and use in source and binary forms, with or without @@ -30,58 +30,78 @@ #pragma once +#include #include #include -#include "image_transport/image_transport.hpp" -#include "web_video_server/image_streamer.hpp" +#include + #include "async_web_server_cpp/http_request.hpp" #include "async_web_server_cpp/http_connection.hpp" +#include "rclcpp/node.hpp" + #include "web_video_server/multipart_stream.hpp" +#include "web_video_server/streamer.hpp" +#include "web_video_server/streamers/image_transport_streamer.hpp" namespace web_video_server { +namespace streamers +{ -class MjpegStreamer : public ImageTransportImageStreamer +class MjpegStreamer : public ImageTransportStreamerBase { public: MjpegStreamer( const async_web_server_cpp::HttpRequest & request, async_web_server_cpp::HttpConnectionPtr connection, - rclcpp::Node::SharedPtr node); + rclcpp::Node::WeakPtr node); ~MjpegStreamer(); protected: - virtual void sendImage(const cv::Mat &, const std::chrono::steady_clock::time_point & time); + virtual void send_image(const cv::Mat &, const std::chrono::steady_clock::time_point & time); private: MultipartStream stream_; int quality_; }; -class MjpegStreamerType : public ImageStreamerType +class MjpegStreamerFactory : public ImageTransportStreamerFactoryBase { public: - std::shared_ptr create_streamer( + std::string get_type() {return "mjpeg";} + std::shared_ptr create_streamer( const async_web_server_cpp::HttpRequest & request, async_web_server_cpp::HttpConnectionPtr connection, - rclcpp::Node::SharedPtr node); - std::string create_viewer(const async_web_server_cpp::HttpRequest & request); + rclcpp::Node::WeakPtr node); }; -class JpegSnapshotStreamer : public ImageTransportImageStreamer +class JpegSnapshotStreamer : public ImageTransportStreamerBase { public: JpegSnapshotStreamer( const async_web_server_cpp::HttpRequest & request, - async_web_server_cpp::HttpConnectionPtr connection, rclcpp::Node::SharedPtr node); + async_web_server_cpp::HttpConnectionPtr connection, + rclcpp::Node::WeakPtr node); ~JpegSnapshotStreamer(); protected: - virtual void sendImage(const cv::Mat &, const std::chrono::steady_clock::time_point & time); + virtual void send_image(const cv::Mat &, const std::chrono::steady_clock::time_point & time); private: int quality_; }; +class JpegSnapshotStreamerFactory : public ImageTransportSnapshotStreamerFactoryBase +{ +public: + std::string get_type() {return "jpeg";} + + std::shared_ptr create_streamer( + const async_web_server_cpp::HttpRequest & request, + async_web_server_cpp::HttpConnectionPtr connection, + rclcpp::Node::WeakPtr node); +}; + +} // namespace streamers } // namespace web_video_server diff --git a/include/web_video_server/libav_streamer.hpp b/include/web_video_server/streamers/libav_streamer.hpp similarity index 69% rename from include/web_video_server/libav_streamer.hpp rename to include/web_video_server/streamers/libav_streamer.hpp index 12914bb..38674e5 100644 --- a/include/web_video_server/libav_streamer.hpp +++ b/include/web_video_server/streamers/libav_streamer.hpp @@ -1,5 +1,5 @@ // Copyright (c) 2014, Worcester Polytechnic Institute -// Copyright (c) 2024, The Robot Web Tools Contributors +// Copyright (c) 2024-2025, The Robot Web Tools Contributors // All rights reserved. // // Redistribution and use in source and binary forms, with or without @@ -33,41 +33,52 @@ extern "C" { #include +#include #include -#include -#include +#include +#include #include -#include -#include -#include } #include -#include +#include +#include #include -#include "image_transport/image_transport.hpp" -#include "web_video_server/image_streamer.hpp" -#include "async_web_server_cpp/http_request.hpp" +#include + #include "async_web_server_cpp/http_connection.hpp" +#include "async_web_server_cpp/http_request.hpp" +#include "rclcpp/node.hpp" + +#include "web_video_server/streamers/image_transport_streamer.hpp" namespace web_video_server { +namespace streamers +{ -class LibavStreamer : public ImageTransportImageStreamer +/** + * @brief A common base class for all streaming plugins using image_transport to subscribe to image + * topics and libav to encode and stream video. + */ +class LibavStreamerBase : public ImageTransportStreamerBase { public: - LibavStreamer( + LibavStreamerBase( const async_web_server_cpp::HttpRequest & request, async_web_server_cpp::HttpConnectionPtr connection, - rclcpp::Node::SharedPtr node, const std::string & format_name, const std::string & codec_name, + rclcpp::Node::WeakPtr node, + std::string logger_name, + const std::string & format_name, + const std::string & codec_name, const std::string & content_type); - ~LibavStreamer(); + ~LibavStreamerBase(); protected: - virtual void initializeEncoder(); - virtual void sendImage(const cv::Mat &, const std::chrono::steady_clock::time_point & time); + virtual void initialize_encoder() = 0; + virtual void send_image(const cv::Mat &, const std::chrono::steady_clock::time_point & time); virtual void initialize(const cv::Mat &); AVFormatContext * format_context_; const AVCodec * codec_; @@ -94,24 +105,11 @@ class LibavStreamer : public ImageTransportImageStreamer uint8_t * io_buffer_; // custom IO buffer }; -class LibavStreamerType : public ImageStreamerType +class LibavStreamerFactoryBase : public ImageTransportStreamerFactoryBase { public: - LibavStreamerType( - const std::string & format_name, const std::string & codec_name, - const std::string & content_type); - - std::shared_ptr create_streamer( - const async_web_server_cpp::HttpRequest & request, - async_web_server_cpp::HttpConnectionPtr connection, - rclcpp::Node::SharedPtr node); - - std::string create_viewer(const async_web_server_cpp::HttpRequest & request); - -private: - const std::string format_name_; - const std::string codec_name_; - const std::string content_type_; + virtual std::string create_viewer(const async_web_server_cpp::HttpRequest & request); }; +} // namespace streamers } // namespace web_video_server diff --git a/include/web_video_server/png_streamers.hpp b/include/web_video_server/streamers/png_streamers.hpp similarity index 61% rename from include/web_video_server/png_streamers.hpp rename to include/web_video_server/streamers/png_streamers.hpp index c261fe6..297b7a4 100644 --- a/include/web_video_server/png_streamers.hpp +++ b/include/web_video_server/streamers/png_streamers.hpp @@ -1,4 +1,4 @@ -// Copyright (c) 2024, The Robot Web Tools Contributors +// Copyright (c) 2024-2025, The Robot Web Tools Contributors // All rights reserved. // // Redistribution and use in source and binary forms, with or without @@ -29,60 +29,81 @@ #pragma once +#include #include #include -#include "image_transport/image_transport.hpp" -#include "web_video_server/image_streamer.hpp" +#include + #include "async_web_server_cpp/http_request.hpp" #include "async_web_server_cpp/http_connection.hpp" +#include "rclcpp/node.hpp" +#include "sensor_msgs/msg/image.hpp" + #include "web_video_server/multipart_stream.hpp" +#include "web_video_server/streamer.hpp" +#include "web_video_server/streamers/image_transport_streamer.hpp" namespace web_video_server { +namespace streamers +{ -class PngStreamer : public ImageTransportImageStreamer +class PngStreamer : public ImageTransportStreamerBase { public: PngStreamer( const async_web_server_cpp::HttpRequest & request, async_web_server_cpp::HttpConnectionPtr connection, - rclcpp::Node::SharedPtr node); + rclcpp::Node::WeakPtr node); ~PngStreamer(); protected: - virtual void sendImage(const cv::Mat &, const std::chrono::steady_clock::time_point & time); - virtual cv::Mat decodeImage(const sensor_msgs::msg::Image::ConstSharedPtr & msg); + virtual void send_image(const cv::Mat &, const std::chrono::steady_clock::time_point & time); + virtual cv::Mat decode_image(const sensor_msgs::msg::Image::ConstSharedPtr & msg); private: MultipartStream stream_; int quality_; }; -class PngStreamerType : public ImageStreamerType +class PngStreamerFactory : public ImageTransportStreamerFactoryBase { public: - std::shared_ptr create_streamer( + std::string get_type() {return "png";} + std::shared_ptr create_streamer( const async_web_server_cpp::HttpRequest & request, async_web_server_cpp::HttpConnectionPtr connection, - rclcpp::Node::SharedPtr node); - std::string create_viewer(const async_web_server_cpp::HttpRequest & request); + rclcpp::Node::WeakPtr node); }; -class PngSnapshotStreamer : public ImageTransportImageStreamer +class PngSnapshotStreamer : public ImageTransportStreamerBase { public: PngSnapshotStreamer( const async_web_server_cpp::HttpRequest & request, - async_web_server_cpp::HttpConnectionPtr connection, rclcpp::Node::SharedPtr node); + async_web_server_cpp::HttpConnectionPtr connection, + rclcpp::Node::WeakPtr node); ~PngSnapshotStreamer(); protected: - virtual void sendImage(const cv::Mat &, const std::chrono::steady_clock::time_point & time); - virtual cv::Mat decodeImage(const sensor_msgs::msg::Image::ConstSharedPtr & msg); + virtual void send_image(const cv::Mat &, const std::chrono::steady_clock::time_point & time); + virtual cv::Mat decode_image(const sensor_msgs::msg::Image::ConstSharedPtr & msg); private: int quality_; }; +class PngSnapshotStreamerFactory : public ImageTransportSnapshotStreamerFactoryBase +{ +public: + std::string get_type() {return "png";} + + std::shared_ptr create_streamer( + const async_web_server_cpp::HttpRequest & request, + async_web_server_cpp::HttpConnectionPtr connection, + rclcpp::Node::WeakPtr node); +}; + +} // namespace streamers } // namespace web_video_server diff --git a/include/web_video_server/ros_compressed_streamer.hpp b/include/web_video_server/streamers/ros_compressed_streamer.hpp similarity index 57% rename from include/web_video_server/ros_compressed_streamer.hpp rename to include/web_video_server/streamers/ros_compressed_streamer.hpp index 9fc51bd..c8f1bdf 100644 --- a/include/web_video_server/ros_compressed_streamer.hpp +++ b/include/web_video_server/streamers/ros_compressed_streamer.hpp @@ -1,5 +1,5 @@ // Copyright (c) 2014, Worcester Polytechnic Institute -// Copyright (c) 2024, The Robot Web Tools Contributors +// Copyright (c) 2024-2025, The Robot Web Tools Contributors // All rights reserved. // // Redistribution and use in source and binary forms, with or without @@ -30,36 +30,44 @@ #pragma once +#include #include +#include #include +#include -#include "sensor_msgs/msg/compressed_image.hpp" -#include "web_video_server/image_streamer.hpp" -#include "async_web_server_cpp/http_request.hpp" #include "async_web_server_cpp/http_connection.hpp" +#include "async_web_server_cpp/http_request.hpp" +#include "rclcpp/node.hpp" +#include "rclcpp/subscription.hpp" +#include "sensor_msgs/msg/compressed_image.hpp" + #include "web_video_server/multipart_stream.hpp" +#include "web_video_server/streamer.hpp" namespace web_video_server { +namespace streamers +{ -class RosCompressedStreamer : public ImageStreamer +class RosCompressedStreamer : public StreamerInterface { public: RosCompressedStreamer( const async_web_server_cpp::HttpRequest & request, async_web_server_cpp::HttpConnectionPtr connection, - rclcpp::Node::SharedPtr node); + rclcpp::Node::WeakPtr node); ~RosCompressedStreamer(); virtual void start(); - virtual void restreamFrame(std::chrono::duration max_age); + virtual void restream_frame(std::chrono::duration max_age); protected: - virtual void sendImage( + virtual void send_image( const sensor_msgs::msg::CompressedImage::ConstSharedPtr msg, const std::chrono::steady_clock::time_point & time); private: - void imageCallback(const sensor_msgs::msg::CompressedImage::ConstSharedPtr msg); + void image_callback(const sensor_msgs::msg::CompressedImage::ConstSharedPtr msg); MultipartStream stream_; rclcpp::Subscription::SharedPtr image_sub_; std::chrono::steady_clock::time_point last_frame_; @@ -68,14 +76,50 @@ class RosCompressedStreamer : public ImageStreamer std::string qos_profile_name_; }; -class RosCompressedStreamerType : public ImageStreamerType +class RosCompressedStreamerFactory : public StreamerFactoryInterface +{ +public: + std::string get_type() {return "ros_compressed";} + std::shared_ptr create_streamer( + const async_web_server_cpp::HttpRequest & request, + async_web_server_cpp::HttpConnectionPtr connection, + rclcpp::Node::WeakPtr node); + std::vector get_available_topics(rclcpp::Node & node); +}; + +class RosCompressedSnapshotStreamer : public StreamerInterface +{ +public: + RosCompressedSnapshotStreamer( + const async_web_server_cpp::HttpRequest & request, + async_web_server_cpp::HttpConnectionPtr connection, + rclcpp::Node::WeakPtr node); + ~RosCompressedSnapshotStreamer(); + virtual void start(); + virtual void restream_frame(std::chrono::duration max_age); + +protected: + virtual void send_image( + const sensor_msgs::msg::CompressedImage::ConstSharedPtr msg, + const std::chrono::steady_clock::time_point & time); + +private: + void image_callback(const sensor_msgs::msg::CompressedImage::ConstSharedPtr msg); + + rclcpp::Subscription::SharedPtr image_sub_; + std::string qos_profile_name_; +}; + +class RosCompressedSnapshotStreamerFactory : public SnapshotStreamerFactoryInterface { public: - std::shared_ptr create_streamer( + std::string get_type() {return "ros_compressed";} + std::shared_ptr create_streamer( const async_web_server_cpp::HttpRequest & request, async_web_server_cpp::HttpConnectionPtr connection, - rclcpp::Node::SharedPtr node); - std::string create_viewer(const async_web_server_cpp::HttpRequest & request); + rclcpp::Node::WeakPtr node); + std::vector get_available_topics(rclcpp::Node & node); }; +} // namespace streamers } // namespace web_video_server diff --git a/include/web_video_server/vp8_streamer.hpp b/include/web_video_server/streamers/vp8_streamer.hpp similarity index 80% rename from include/web_video_server/vp8_streamer.hpp rename to include/web_video_server/streamers/vp8_streamer.hpp index ad6fd2f..aeee06a 100644 --- a/include/web_video_server/vp8_streamer.hpp +++ b/include/web_video_server/streamers/vp8_streamer.hpp @@ -1,5 +1,5 @@ // Copyright (c) 2014, Worcester Polytechnic Institute -// Copyright (c) 2024, The Robot Web Tools Contributors +// Copyright (c) 2024-2025, The Robot Web Tools Contributors // All rights reserved. // // Redistribution and use in source and binary forms, with or without @@ -33,38 +33,43 @@ #include #include -#include "image_transport/image_transport.hpp" -#include "web_video_server/libav_streamer.hpp" -#include "async_web_server_cpp/http_request.hpp" #include "async_web_server_cpp/http_connection.hpp" +#include "async_web_server_cpp/http_request.hpp" +#include "rclcpp/node.hpp" + +#include "web_video_server/streamer.hpp" +#include "web_video_server/streamers/libav_streamer.hpp" namespace web_video_server { +namespace streamers +{ -class Vp8Streamer : public LibavStreamer +class Vp8Streamer : public LibavStreamerBase { public: Vp8Streamer( const async_web_server_cpp::HttpRequest & request, async_web_server_cpp::HttpConnectionPtr connection, - rclcpp::Node::SharedPtr node); + rclcpp::Node::WeakPtr node); ~Vp8Streamer(); protected: - virtual void initializeEncoder(); + virtual void initialize_encoder(); private: std::string quality_; }; -class Vp8StreamerType : public LibavStreamerType +class Vp8StreamerFactory : public LibavStreamerFactoryBase { public: - Vp8StreamerType(); - std::shared_ptr create_streamer( + std::string get_type() {return "vp8";} + std::shared_ptr create_streamer( const async_web_server_cpp::HttpRequest & request, async_web_server_cpp::HttpConnectionPtr connection, - rclcpp::Node::SharedPtr node); + rclcpp::Node::WeakPtr node); }; +} // namespace streamers } // namespace web_video_server diff --git a/include/web_video_server/vp9_streamer.hpp b/include/web_video_server/streamers/vp9_streamer.hpp similarity index 78% rename from include/web_video_server/vp9_streamer.hpp rename to include/web_video_server/streamers/vp9_streamer.hpp index 5491db7..97d5e73 100644 --- a/include/web_video_server/vp9_streamer.hpp +++ b/include/web_video_server/streamers/vp9_streamer.hpp @@ -1,4 +1,4 @@ -// Copyright (c) 2024, The Robot Web Tools Contributors +// Copyright (c) 2024-2025, The Robot Web Tools Contributors // All rights reserved. // // Redistribution and use in source and binary forms, with or without @@ -30,36 +30,42 @@ #pragma once #include +#include -#include "image_transport/image_transport.hpp" -#include "web_video_server/libav_streamer.hpp" -#include "async_web_server_cpp/http_request.hpp" #include "async_web_server_cpp/http_connection.hpp" +#include "async_web_server_cpp/http_request.hpp" +#include "rclcpp/node.hpp" + +#include "web_video_server/streamer.hpp" +#include "web_video_server/streamers/libav_streamer.hpp" namespace web_video_server { +namespace streamers +{ -class Vp9Streamer : public LibavStreamer +class Vp9Streamer : public LibavStreamerBase { public: Vp9Streamer( const async_web_server_cpp::HttpRequest & request, async_web_server_cpp::HttpConnectionPtr connection, - rclcpp::Node::SharedPtr node); + rclcpp::Node::WeakPtr node); ~Vp9Streamer(); protected: - virtual void initializeEncoder(); + virtual void initialize_encoder(); }; -class Vp9StreamerType : public LibavStreamerType +class Vp9StreamerFactory : public LibavStreamerFactoryBase { public: - Vp9StreamerType(); - std::shared_ptr create_streamer( + std::string get_type() {return "vp9";} + std::shared_ptr create_streamer( const async_web_server_cpp::HttpRequest & request, async_web_server_cpp::HttpConnectionPtr connection, - rclcpp::Node::SharedPtr node); + rclcpp::Node::WeakPtr node); }; +} // namespace streamers } // namespace web_video_server diff --git a/include/web_video_server/utils.hpp b/include/web_video_server/utils.hpp index 13fe6d4..c9282b0 100644 --- a/include/web_video_server/utils.hpp +++ b/include/web_video_server/utils.hpp @@ -1,4 +1,4 @@ -// Copyright (c) 2024, The Robot Web Tools Contributors +// Copyright (c) 2024-2025, The Robot Web Tools Contributors // All rights reserved. // // Redistribution and use in source and binary forms, with or without @@ -31,7 +31,8 @@ #include #include -#include "rmw/qos_profiles.h" + +#include "rmw/types.h" namespace web_video_server { diff --git a/include/web_video_server/web_video_server.hpp b/include/web_video_server/web_video_server.hpp index 6e9bd95..6a2de32 100644 --- a/include/web_video_server/web_video_server.hpp +++ b/include/web_video_server/web_video_server.hpp @@ -1,5 +1,5 @@ // Copyright (c) 2014, Worcester Polytechnic Institute -// Copyright (c) 2024, The Robot Web Tools Contributors +// Copyright (c) 2024-2025, The Robot Web Tools Contributors // All rights reserved. // // Redistribution and use in source and binary forms, with or without @@ -30,22 +30,23 @@ #pragma once +#include #include #include +#include #include #include -#ifdef CV_BRIDGE_USES_OLD_HEADERS -#include "cv_bridge/cv_bridge.h" -#else -#include "cv_bridge/cv_bridge.hpp" -#endif - -#include "rclcpp/rclcpp.hpp" -#include "web_video_server/image_streamer.hpp" -#include "async_web_server_cpp/http_server.hpp" -#include "async_web_server_cpp/http_request.hpp" #include "async_web_server_cpp/http_connection.hpp" +#include "async_web_server_cpp/http_request.hpp" +#include "async_web_server_cpp/http_request_handler.hpp" +#include "async_web_server_cpp/http_server.hpp" +#include "pluginlib/class_loader.hpp" +#include "rclcpp/node.hpp" +#include "rclcpp/node_options.hpp" +#include "rclcpp/timer.hpp" + +#include "web_video_server/streamer.hpp" namespace web_video_server { @@ -94,9 +95,10 @@ class WebVideoServer : public rclcpp::Node const char * begin, const char * end); private: - void restreamFrames(std::chrono::duration max_age); + void restream_frames(std::chrono::duration max_age); void cleanup_inactive_streams(); + rclcpp::TimerBase::SharedPtr restream_timer_; rclcpp::TimerBase::SharedPtr cleanup_timer_; // Parameters @@ -106,13 +108,17 @@ class WebVideoServer : public rclcpp::Node std::string address_; bool verbose_; std::string default_stream_type_; + std::string default_snapshot_type_; std::shared_ptr server_; async_web_server_cpp::HttpRequestHandlerGroup handler_group_; - std::vector> image_subscribers_; - std::map> stream_types_; - std::mutex subscriber_mutex_; + std::vector> streamers_; + pluginlib::ClassLoader streamer_factory_loader_; + std::map> streamer_factories_; + pluginlib::ClassLoader snapshot_streamer_factory_loader_; + std::map> snapshot_streamer_factories_; + std::mutex streamers_mutex_; }; } // namespace web_video_server diff --git a/package.xml b/package.xml index 32596b9..2bc3495 100644 --- a/package.xml +++ b/package.xml @@ -18,23 +18,22 @@ ament_cmake_ros pkg-config - rclcpp + + async_web_server_cpp + boost + cv_bridge + ffmpeg + image_transport + libopencv-dev + rclcpp + rmw + sensor_msgs + + + pluginlib + pluginlib rclcpp_components - cv_bridge - image_transport - async_web_server_cpp - ffmpeg - sensor_msgs - boost - - rclcpp rclcpp_components - cv_bridge - image_transport - async_web_server_cpp - ffmpeg - sensor_msgs - boost ament_lint_auto ament_cmake_copyright diff --git a/plugins.xml b/plugins.xml new file mode 100644 index 0000000..836afda --- /dev/null +++ b/plugins.xml @@ -0,0 +1,48 @@ + + + Streams images as multipart/x-mixed-replace (MJPEG). + + + Streams images as multipart/x-mixed-replace (PNG). + + + Streams images published using compressed_image_transport as + multipart/x-mixed-replace. + + + Streams images as H.264 encoded video in an MP4 container. + + + Streams images as VP8 encoded video in a WebM container. + + + Streams images as VP9 encoded video in a WebM container. + + + Provides single JPEG images. + + + Provides single PNG images. + + + Provides single images published using compressed_image_transport as JPEG or PNG. + + \ No newline at end of file diff --git a/src/multipart_stream.cpp b/src/multipart_stream.cpp index 0429272..07868b4 100644 --- a/src/multipart_stream.cpp +++ b/src/multipart_stream.cpp @@ -1,5 +1,5 @@ // Copyright (c) 2014, Worcester Polytechnic Institute -// Copyright (c) 2024, The Robot Web Tools Contributors +// Copyright (c) 2024-2025, The Robot Web Tools Contributors // All rights reserved. // // Redistribution and use in source and binary forms, with or without @@ -29,6 +29,18 @@ // POSSIBILITY OF SUCH DAMAGE. #include "web_video_server/multipart_stream.hpp" + +#include +#include +#include +#include +#include +#include + +#include + +#include "async_web_server_cpp/http_connection.hpp" +#include "async_web_server_cpp/http_header.hpp" #include "async_web_server_cpp/http_reply.hpp" namespace web_video_server @@ -36,12 +48,12 @@ namespace web_video_server MultipartStream::MultipartStream( async_web_server_cpp::HttpConnectionPtr & connection, - const std::string & boundry, + const std::string & boundary, std::size_t max_queue_size) -: max_queue_size_(max_queue_size), connection_(connection), boundry_(boundry) +: max_queue_size_(max_queue_size), connection_(connection), boundary_(boundary) {} -void MultipartStream::sendInitialHeader() +void MultipartStream::send_initial_header() { async_web_server_cpp::HttpReply::builder(async_web_server_cpp::HttpReply::ok) .header("Connection", "close") @@ -50,13 +62,13 @@ void MultipartStream::sendInitialHeader() "Cache-Control", "no-cache, no-store, must-revalidate, pre-check=0, post-check=0, max-age=0") .header("Pragma", "no-cache") - .header("Content-type", "multipart/x-mixed-replace;boundary=" + boundry_) + .header("Content-type", "multipart/x-mixed-replace;boundary=" + boundary_) .header("Access-Control-Allow-Origin", "*") .write(connection_); - connection_->write("--" + boundry_ + "\r\n"); + connection_->write("--" + boundary_ + "\r\n"); } -void MultipartStream::sendPartHeader( +void MultipartStream::send_part_header( const std::chrono::steady_clock::time_point & time, const std::string & type, size_t payload_size) { @@ -74,9 +86,9 @@ void MultipartStream::sendPartHeader( connection_->write(async_web_server_cpp::HttpReply::to_buffers(*headers), headers); } -void MultipartStream::sendPartFooter(const std::chrono::steady_clock::time_point & time) +void MultipartStream::send_part_footer(const std::chrono::steady_clock::time_point & time) { - std::shared_ptr str(new std::string("\r\n--" + boundry_ + "\r\n")); + std::shared_ptr str(new std::string("\r\n--" + boundary_ + "\r\n")); PendingFooter pf; pf.timestamp = time; pf.contents = str; @@ -84,30 +96,30 @@ void MultipartStream::sendPartFooter(const std::chrono::steady_clock::time_point if (max_queue_size_ > 0) {pending_footers_.push(pf);} } -void MultipartStream::sendPartAndClear( +void MultipartStream::send_part_and_clear( const std::chrono::steady_clock::time_point & time, const std::string & type, std::vector & data) { - if (!isBusy()) { - sendPartHeader(time, type, data.size()); + if (!is_busy()) { + send_part_header(time, type, data.size()); connection_->write_and_clear(data); - sendPartFooter(time); + send_part_footer(time); } } -void MultipartStream::sendPart( +void MultipartStream::send_part( const std::chrono::steady_clock::time_point & time, const std::string & type, const boost::asio::const_buffer & buffer, async_web_server_cpp::HttpConnection::ResourcePtr resource) { - if (!isBusy()) { - sendPartHeader(time, type, boost::asio::buffer_size(buffer)); + if (!is_busy()) { + send_part_header(time, type, boost::asio::buffer_size(buffer)); connection_->write(buffer, resource); - sendPartFooter(time); + send_part_footer(time); } } -bool MultipartStream::isBusy() +bool MultipartStream::is_busy() { auto current_time = std::chrono::steady_clock::now(); while (!pending_footers_.empty()) { diff --git a/src/ros_compressed_streamer.cpp b/src/ros_compressed_streamer.cpp deleted file mode 100644 index 76c965d..0000000 --- a/src/ros_compressed_streamer.cpp +++ /dev/null @@ -1,157 +0,0 @@ -// Copyright (c) 2014, Worcester Polytechnic Institute -// Copyright (c) 2024, The Robot Web Tools Contributors -// All rights reserved. -// -// Redistribution and use in source and binary forms, with or without -// modification, are permitted provided that the following conditions are met: -// -// * Redistributions of source code must retain the above copyright -// notice, this list of conditions and the following disclaimer. -// -// * Redistributions in binary form must reproduce the above copyright -// notice, this list of conditions and the following disclaimer in the -// documentation and/or other materials provided with the distribution. -// -// * Neither the name of the copyright holder nor the names of its -// contributors may be used to endorse or promote products derived from -// this software without specific prior written permission. -// -// THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" -// AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE -// IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE -// ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE -// LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR -// CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF -// SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS -// INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN -// CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) -// ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE -// POSSIBILITY OF SUCH DAMAGE. - -#include "web_video_server/ros_compressed_streamer.hpp" - -namespace web_video_server -{ - -RosCompressedStreamer::RosCompressedStreamer( - const async_web_server_cpp::HttpRequest & request, - async_web_server_cpp::HttpConnectionPtr connection, rclcpp::Node::SharedPtr node) -: ImageStreamer(request, connection, node), stream_(connection) -{ - stream_.sendInitialHeader(); - qos_profile_name_ = request.get_query_param_value_or_default("qos_profile", "default"); -} - -RosCompressedStreamer::~RosCompressedStreamer() -{ - this->inactive_ = true; - std::scoped_lock lock(send_mutex_); // protects sendImage. -} - -void RosCompressedStreamer::start() -{ - const std::string compressed_topic = topic_ + "/compressed"; - - // Get QoS profile from query parameter - RCLCPP_INFO( - node_->get_logger(), "Streaming topic %s with QoS profile %s", - compressed_topic.c_str(), qos_profile_name_.c_str()); - auto qos_profile = get_qos_profile_from_name(qos_profile_name_); - if (!qos_profile) { - qos_profile = rmw_qos_profile_default; - RCLCPP_ERROR( - node_->get_logger(), - "Invalid QoS profile %s specified. Using default profile.", - qos_profile_name_.c_str()); - } - - // Create subscriber - const auto qos = rclcpp::QoS( - rclcpp::QoSInitialization(qos_profile.value().history, 1), - qos_profile.value()); - image_sub_ = node_->create_subscription( - compressed_topic, qos, - std::bind(&RosCompressedStreamer::imageCallback, this, std::placeholders::_1)); -} - -void RosCompressedStreamer::restreamFrame(std::chrono::duration max_age) -{ - if (inactive_ || (last_msg == 0)) { - return; - } - - if (last_frame_ + max_age < std::chrono::steady_clock::now()) { - std::scoped_lock lock(send_mutex_); - // don't update last_frame, it may remain an old value. - sendImage(last_msg, std::chrono::steady_clock::now()); - } -} - -void RosCompressedStreamer::sendImage( - const sensor_msgs::msg::CompressedImage::ConstSharedPtr msg, - const std::chrono::steady_clock::time_point & time) -{ - try { - std::string content_type; - if (msg->format.find("jpeg") != std::string::npos || - msg->format.find("jpg") != std::string::npos) - { - content_type = "image/jpeg"; - } else if (msg->format.find("png") != std::string::npos) { - content_type = "image/png"; - } else { - RCLCPP_WARN( - node_->get_logger(), "Unknown ROS compressed image format: %s", - msg->format.c_str()); - return; - } - - stream_.sendPart(time, content_type, boost::asio::buffer(msg->data), msg); - } catch (boost::system::system_error & e) { - // happens when client disconnects - RCLCPP_DEBUG(node_->get_logger(), "system_error exception: %s", e.what()); - inactive_ = true; - return; - } catch (std::exception & e) { - auto & clk = *node_->get_clock(); - RCLCPP_ERROR_THROTTLE(node_->get_logger(), clk, 40, "exception: %s", e.what()); - inactive_ = true; - return; - } catch (...) { - auto & clk = *node_->get_clock(); - RCLCPP_ERROR_THROTTLE(node_->get_logger(), clk, 40, "exception"); - inactive_ = true; - return; - } -} - - -void RosCompressedStreamer::imageCallback( - const sensor_msgs::msg::CompressedImage::ConstSharedPtr msg) -{ - std::scoped_lock lock(send_mutex_); // protects last_msg and last_frame - last_msg = msg; - last_frame_ = std::chrono::steady_clock::now(); - sendImage(last_msg, last_frame_); -} - - -std::shared_ptr RosCompressedStreamerType::create_streamer( - const async_web_server_cpp::HttpRequest & request, - async_web_server_cpp::HttpConnectionPtr connection, - rclcpp::Node::SharedPtr node) -{ - return std::make_shared(request, connection, node); -} - -std::string RosCompressedStreamerType::create_viewer( - const async_web_server_cpp::HttpRequest & request) -{ - std::stringstream ss; - ss << ""; - return ss.str(); -} - -} // namespace web_video_server diff --git a/src/streamer.cpp b/src/streamer.cpp new file mode 100644 index 0000000..8fbf81d --- /dev/null +++ b/src/streamer.cpp @@ -0,0 +1,85 @@ +// Copyright (c) 2014, Worcester Polytechnic Institute +// Copyright (c) 2024-2025, The Robot Web Tools Contributors +// All rights reserved. +// +// Redistribution and use in source and binary forms, with or without +// modification, are permitted provided that the following conditions are met: +// +// * Redistributions of source code must retain the above copyright +// notice, this list of conditions and the following disclaimer. +// +// * Redistributions in binary form must reproduce the above copyright +// notice, this list of conditions and the following disclaimer in the +// documentation and/or other materials provided with the distribution. +// +// * Neither the name of the copyright holder nor the names of its +// contributors may be used to endorse or promote products derived from +// this software without specific prior written permission. +// +// THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" +// AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE +// IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE +// ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE +// LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR +// CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF +// SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS +// INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN +// CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) +// ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE +// POSSIBILITY OF SUCH DAMAGE. + +#include "web_video_server/streamer.hpp" + +#include +#include + +#include "rclcpp/node.hpp" +#include "rclcpp/logging.hpp" + +#include "async_web_server_cpp/http_connection.hpp" +#include "async_web_server_cpp/http_request.hpp" + +namespace web_video_server +{ + +StreamerInterface::StreamerInterface( + const async_web_server_cpp::HttpRequest & request, + async_web_server_cpp::HttpConnectionPtr connection, + rclcpp::Node::WeakPtr node, + std::string logger_name) +: connection_(connection), request_(request), node_(std::move(node)), + logger_(node_.lock()->get_logger().get_child(logger_name)), inactive_(false), + topic_(request.get_query_param_value_or_default("topic", "")) +{ +} + +StreamerInterface::~StreamerInterface() +{ +} + +rclcpp::Node::SharedPtr StreamerInterface::lock_node() const +{ + auto node = node_.lock(); + if (!node) { + RCLCPP_WARN(logger_, "Unable to access node because the owning node has been destroyed"); + } + return node; +} + +std::string StreamerFactoryInterface::create_viewer( + const async_web_server_cpp::HttpRequest & request) +{ + std::stringstream ss; + ss << ""; + return ss.str(); +} + +std::vector StreamerFactoryInterface::get_available_topics( + rclcpp::Node & /* node */) +{ + return {}; +} + +} // namespace web_video_server diff --git a/src/h264_streamer.cpp b/src/streamers/h264_streamer.cpp similarity index 74% rename from src/h264_streamer.cpp rename to src/streamers/h264_streamer.cpp index 05e487f..36c1877 100644 --- a/src/h264_streamer.cpp +++ b/src/streamers/h264_streamer.cpp @@ -1,4 +1,4 @@ -// Copyright (c) 2024, The Robot Web Tools Contributors +// Copyright (c) 2024-2025, The Robot Web Tools Contributors // All rights reserved. // // Redistribution and use in source and binary forms, with or without @@ -27,15 +27,35 @@ // ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE // POSSIBILITY OF SUCH DAMAGE. -#include "web_video_server/h264_streamer.hpp" +#include "web_video_server/streamers/h264_streamer.hpp" + +extern "C" +{ +#include +#include +#include +#include +} + +#include +#include + +#include "async_web_server_cpp/http_connection.hpp" +#include "async_web_server_cpp/http_request.hpp" +#include "rclcpp/node.hpp" + +#include "web_video_server/streamer.hpp" +#include "web_video_server/streamers/libav_streamer.hpp" namespace web_video_server { +namespace streamers +{ H264Streamer::H264Streamer( const async_web_server_cpp::HttpRequest & request, - async_web_server_cpp::HttpConnectionPtr connection, rclcpp::Node::SharedPtr node) -: LibavStreamer(request, connection, node, "mp4", "libx264", "video/mp4") + async_web_server_cpp::HttpConnectionPtr connection, rclcpp::Node::WeakPtr node) +: LibavStreamerBase(request, connection, node, "h264_streamer", "mp4", "libx264", "video/mp4") { /* possible quality presets: * ultrafast, superfast, veryfast, faster, fast, medium, slow, slower, veryslow, placebo @@ -48,7 +68,7 @@ H264Streamer::~H264Streamer() { } -void H264Streamer::initializeEncoder() +void H264Streamer::initialize_encoder() { av_opt_set(codec_context_->priv_data, "preset", preset_.c_str(), 0); av_opt_set(codec_context_->priv_data, "tune", "zerolatency", 0); @@ -64,17 +84,19 @@ void H264Streamer::initializeEncoder() } } -H264StreamerType::H264StreamerType() -: LibavStreamerType("mp4", "libx264", "video/mp4") -{ -} - -std::shared_ptr H264StreamerType::create_streamer( +std::shared_ptr H264StreamerFactory::create_streamer( const async_web_server_cpp::HttpRequest & request, async_web_server_cpp::HttpConnectionPtr connection, - rclcpp::Node::SharedPtr node) + rclcpp::Node::WeakPtr node) { return std::make_shared(request, connection, node); } +} // namespace streamers } // namespace web_video_server + +#include "pluginlib/class_list_macros.hpp" + +PLUGINLIB_EXPORT_CLASS( + web_video_server::streamers::H264StreamerFactory, + web_video_server::StreamerFactoryInterface) diff --git a/src/image_streamer.cpp b/src/streamers/image_transport_streamer.cpp similarity index 58% rename from src/image_streamer.cpp rename to src/streamers/image_transport_streamer.cpp index 564e1dc..c7c462d 100644 --- a/src/image_streamer.cpp +++ b/src/streamers/image_transport_streamer.cpp @@ -1,5 +1,5 @@ // Copyright (c) 2014, Worcester Polytechnic Institute -// Copyright (c) 2024, The Robot Web Tools Contributors +// Copyright (c) 2024-2025, The Robot Web Tools Contributors // All rights reserved. // // Redistribution and use in source and binary forms, with or without @@ -28,35 +28,70 @@ // ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE // POSSIBILITY OF SUCH DAMAGE. -#include "web_video_server/image_streamer.hpp" +#include "web_video_server/streamers/image_transport_streamer.hpp" + +#include +#include +#include +#include +#include +#include + +#include +#include +#include +#include +#include #ifdef CV_BRIDGE_USES_OLD_HEADERS -#include +#include "cv_bridge/cv_bridge.h" #else -#include +#include "cv_bridge/cv_bridge.hpp" #endif -#include +#include "async_web_server_cpp/http_connection.hpp" +#include "async_web_server_cpp/http_request.hpp" +#include "image_transport/image_transport.hpp" +#include "image_transport/transport_hints.hpp" +#include "rclcpp/node.hpp" +#include "rclcpp/logging.hpp" +#include "rmw/qos_profiles.h" +#include "sensor_msgs/msg/image.hpp" + +#include "web_video_server/streamer.hpp" +#include "web_video_server/utils.hpp" namespace web_video_server { +namespace streamers +{ -ImageStreamer::ImageStreamer( - const async_web_server_cpp::HttpRequest & request, - async_web_server_cpp::HttpConnectionPtr connection, rclcpp::Node::SharedPtr node) -: connection_(connection), request_(request), node_(node), inactive_(false) +namespace { - topic_ = request.get_query_param_value_or_default("topic", ""); -} -ImageStreamer::~ImageStreamer() +std::vector get_image_topics(rclcpp::Node & node) { + std::vector result; + auto topic_names_and_types = node.get_topic_names_and_types(); + for (const auto & topic_and_types : topic_names_and_types) { + for (const auto & type : topic_and_types.second) { + if (type == "sensor_msgs/msg/Image") { + result.push_back(topic_and_types.first); + break; + } + } + } + return result; } -ImageTransportImageStreamer::ImageTransportImageStreamer( +} // namespace + +ImageTransportStreamerBase::ImageTransportStreamerBase( const async_web_server_cpp::HttpRequest & request, - async_web_server_cpp::HttpConnectionPtr connection, rclcpp::Node::SharedPtr node) -: ImageStreamer(request, connection, node), it_(node), initialized_(false) + async_web_server_cpp::HttpConnectionPtr connection, + rclcpp::Node::WeakPtr node, + std::string logger_name) +: StreamerInterface(request, connection, node, logger_name), initialized_(false) { output_width_ = request.get_query_param_value_or_default("width", -1); output_height_ = request.get_query_param_value_or_default("height", -1); @@ -65,14 +100,20 @@ ImageTransportImageStreamer::ImageTransportImageStreamer( qos_profile_name_ = request.get_query_param_value_or_default("qos_profile", "default"); } -ImageTransportImageStreamer::~ImageTransportImageStreamer() +ImageTransportStreamerBase::~ImageTransportStreamerBase() { } -void ImageTransportImageStreamer::start() +void ImageTransportStreamerBase::start() { - image_transport::TransportHints hints(node_.get(), default_transport_); - auto tnat = node_->get_topic_names_and_types(); + auto node = lock_node(); + if (!node) { + inactive_ = true; + return; + } + + image_transport::TransportHints hints(node.get(), default_transport_); + auto tnat = node->get_topic_names_and_types(); inactive_ = true; for (auto topic_and_types : tnat) { if (topic_and_types.second.size() > 1) { @@ -88,86 +129,58 @@ void ImageTransportImageStreamer::start() // Get QoS profile from query parameter RCLCPP_INFO( - node_->get_logger(), "Streaming topic %s with QoS profile %s", topic_.c_str(), + logger_, "Streaming topic %s with QoS profile %s", topic_.c_str(), qos_profile_name_.c_str()); auto qos_profile = get_qos_profile_from_name(qos_profile_name_); if (!qos_profile) { qos_profile = rmw_qos_profile_default; RCLCPP_ERROR( - node_->get_logger(), + logger_, "Invalid QoS profile %s specified. Using default profile.", qos_profile_name_.c_str()); } // Create subscriber image_sub_ = image_transport::create_subscription( - node_.get(), topic_, - std::bind(&ImageTransportImageStreamer::imageCallback, this, std::placeholders::_1), + node.get(), topic_, + std::bind(&ImageTransportStreamerBase::image_callback, this, std::placeholders::_1), default_transport_, qos_profile.value()); } -void ImageTransportImageStreamer::initialize(const cv::Mat &) +void ImageTransportStreamerBase::initialize(const cv::Mat &) { } -void ImageTransportImageStreamer::restreamFrame(std::chrono::duration max_age) +void ImageTransportStreamerBase::restream_frame(std::chrono::duration/* max_age */) { if (inactive_ || !initialized_) { return; } - try { - if (last_frame_ + max_age < std::chrono::steady_clock::now()) { - std::scoped_lock lock(send_mutex_); - // don't update last_frame, it may remain an old value. - sendImage(output_size_image, std::chrono::steady_clock::now()); - } - } catch (boost::system::system_error & e) { - // happens when client disconnects - RCLCPP_DEBUG(node_->get_logger(), "system_error exception: %s", e.what()); - inactive_ = true; - return; - } catch (std::exception & e) { - auto & clk = *node_->get_clock(); - RCLCPP_ERROR_THROTTLE(node_->get_logger(), clk, 40, "exception: %s", e.what()); - inactive_ = true; - return; - } catch (...) { - auto & clk = *node_->get_clock(); - RCLCPP_ERROR_THROTTLE(node_->get_logger(), clk, 40, "exception"); + + auto node = lock_node(); + if (!node) { inactive_ = true; return; } -} -cv::Mat ImageTransportImageStreamer::decodeImage( - const sensor_msgs::msg::Image::ConstSharedPtr & msg) -{ - if (msg->encoding.find("F") != std::string::npos) { - // scale floating point images - cv::Mat float_image_bridge = cv_bridge::toCvCopy(msg, msg->encoding)->image; - cv::Mat_ float_image = float_image_bridge; - double max_val; - cv::minMaxIdx(float_image, 0, &max_val); - - if (max_val > 0) { - float_image *= (255 / max_val); - } - return float_image; - } else { - // Convert to OpenCV native BGR color - return cv_bridge::toCvCopy(msg, "bgr8")->image; - } + try_send_image(output_size_image, last_frame_, *node); } -void ImageTransportImageStreamer::imageCallback(const sensor_msgs::msg::Image::ConstSharedPtr & msg) +void ImageTransportStreamerBase::image_callback(const sensor_msgs::msg::Image::ConstSharedPtr & msg) { if (inactive_) { return; } + auto node = lock_node(); + if (!node) { + inactive_ = true; + return; + } + cv::Mat img; try { - img = decodeImage(msg); + img = decode_image(msg); int input_width = img.cols; int input_height = img.rows; @@ -200,33 +213,78 @@ void ImageTransportImageStreamer::imageCallback(const sensor_msgs::msg::Image::C } last_frame_ = std::chrono::steady_clock::now(); - sendImage(output_size_image, last_frame_); } catch (cv_bridge::Exception & e) { - auto & clk = *node_->get_clock(); - RCLCPP_ERROR_THROTTLE(node_->get_logger(), clk, 40, "cv_bridge exception: %s", e.what()); + auto & clk = *node->get_clock(); + RCLCPP_ERROR_THROTTLE(logger_, clk, 40, "cv_bridge exception: %s", e.what()); inactive_ = true; return; } catch (cv::Exception & e) { - auto & clk = *node_->get_clock(); - RCLCPP_ERROR_THROTTLE(node_->get_logger(), clk, 40, "cv_bridge exception: %s", e.what()); + auto & clk = *node->get_clock(); + RCLCPP_ERROR_THROTTLE(logger_, clk, 40, "OpenCV exception: %s", e.what()); inactive_ = true; return; + } + + try_send_image(output_size_image, last_frame_, *node); +} + +void ImageTransportStreamerBase::try_send_image( + const cv::Mat & img, + const std::chrono::steady_clock::time_point & /* time */, + rclcpp::Node & node) +{ + try { + std::scoped_lock lock(send_mutex_); + send_image(img, std::chrono::steady_clock::now()); } catch (boost::system::system_error & e) { // happens when client disconnects - RCLCPP_DEBUG(node_->get_logger(), "system_error exception: %s", e.what()); + RCLCPP_DEBUG(logger_, "system_error exception: %s", e.what()); inactive_ = true; return; } catch (std::exception & e) { - auto & clk = *node_->get_clock(); - RCLCPP_ERROR_THROTTLE(node_->get_logger(), clk, 40, "exception: %s", e.what()); + auto & clk = *node.get_clock(); + RCLCPP_ERROR_THROTTLE(logger_, clk, 40, "exception: %s", e.what()); inactive_ = true; return; } catch (...) { - auto & clk = *node_->get_clock(); - RCLCPP_ERROR_THROTTLE(node_->get_logger(), clk, 40, "exception"); + auto & clk = *node.get_clock(); + RCLCPP_ERROR_THROTTLE(logger_, clk, 40, "exception"); inactive_ = true; return; } } +cv::Mat ImageTransportStreamerBase::decode_image( + const sensor_msgs::msg::Image::ConstSharedPtr & msg) +{ + if (msg->encoding.find("F") != std::string::npos) { + // scale floating point images + cv::Mat float_image_bridge = cv_bridge::toCvCopy(msg, msg->encoding)->image; + cv::Mat_ float_image = float_image_bridge; + double max_val; + cv::minMaxIdx(float_image, 0, &max_val); + + if (max_val > 0) { + float_image *= (255 / max_val); + } + return float_image; + } else { + // Convert to OpenCV native BGR color + return cv_bridge::toCvCopy(msg, "bgr8")->image; + } +} + +std::vector ImageTransportStreamerFactoryBase::get_available_topics( + rclcpp::Node & node) +{ + return get_image_topics(node); +} + +std::vector ImageTransportSnapshotStreamerFactoryBase::get_available_topics( + rclcpp::Node & node) +{ + return get_image_topics(node); +} + +} // namespace streamers } // namespace web_video_server diff --git a/src/jpeg_streamers.cpp b/src/streamers/jpeg_streamers.cpp similarity index 66% rename from src/jpeg_streamers.cpp rename to src/streamers/jpeg_streamers.cpp index bfc6c62..8daf04a 100644 --- a/src/jpeg_streamers.cpp +++ b/src/streamers/jpeg_streamers.cpp @@ -1,5 +1,5 @@ // Copyright (c) 2014, Worcester Polytechnic Institute -// Copyright (c) 2024, The Robot Web Tools Contributors +// Copyright (c) 2024-2025, The Robot Web Tools Contributors // All rights reserved. // // Redistribution and use in source and binary forms, with or without @@ -28,29 +28,50 @@ // ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE // POSSIBILITY OF SUCH DAMAGE. -#include "web_video_server/jpeg_streamers.hpp" +#include "web_video_server/streamers/jpeg_streamers.hpp" + +#include +#include +#include +#include +#include +#include +#include +#include + +#include +#include + +#include "async_web_server_cpp/http_connection.hpp" #include "async_web_server_cpp/http_reply.hpp" +#include "async_web_server_cpp/http_request.hpp" +#include "rclcpp/node.hpp" + +#include "web_video_server/streamer.hpp" +#include "web_video_server/streamers/image_transport_streamer.hpp" namespace web_video_server { +namespace streamers +{ MjpegStreamer::MjpegStreamer( const async_web_server_cpp::HttpRequest & request, - async_web_server_cpp::HttpConnectionPtr connection, rclcpp::Node::SharedPtr node) -: ImageTransportImageStreamer(request, connection, node), + async_web_server_cpp::HttpConnectionPtr connection, rclcpp::Node::WeakPtr node) +: ImageTransportStreamerBase(request, connection, node, "mjpeg_streamer"), stream_(connection) { quality_ = request.get_query_param_value_or_default("quality", 95); - stream_.sendInitialHeader(); + stream_.send_initial_header(); } MjpegStreamer::~MjpegStreamer() { this->inactive_ = true; - std::scoped_lock lock(send_mutex_); // protects sendImage. + std::scoped_lock lock(send_mutex_); // protects send_image. } -void MjpegStreamer::sendImage( +void MjpegStreamer::send_image( const cv::Mat & img, const std::chrono::steady_clock::time_point & time) { @@ -58,34 +79,25 @@ void MjpegStreamer::sendImage( encode_params.push_back(cv::IMWRITE_JPEG_QUALITY); encode_params.push_back(quality_); - std::vector encoded_buffer; + std::vector encoded_buffer; cv::imencode(".jpeg", img, encoded_buffer, encode_params); - stream_.sendPartAndClear(time, "image/jpeg", encoded_buffer); + stream_.send_part_and_clear(time, "image/jpeg", encoded_buffer); } -std::shared_ptr MjpegStreamerType::create_streamer( +std::shared_ptr MjpegStreamerFactory::create_streamer( const async_web_server_cpp::HttpRequest & request, async_web_server_cpp::HttpConnectionPtr connection, - rclcpp::Node::SharedPtr node) + rclcpp::Node::WeakPtr node) { return std::make_shared(request, connection, node); } -std::string MjpegStreamerType::create_viewer(const async_web_server_cpp::HttpRequest & request) -{ - std::stringstream ss; - ss << ""; - return ss.str(); -} - JpegSnapshotStreamer::JpegSnapshotStreamer( const async_web_server_cpp::HttpRequest & request, async_web_server_cpp::HttpConnectionPtr connection, - rclcpp::Node::SharedPtr node) -: ImageTransportImageStreamer(request, connection, node) + rclcpp::Node::WeakPtr node) +: ImageTransportStreamerBase(request, connection, node, "jpeg_snapshot_streamer") { quality_ = request.get_query_param_value_or_default("quality", 95); } @@ -93,10 +105,10 @@ JpegSnapshotStreamer::JpegSnapshotStreamer( JpegSnapshotStreamer::~JpegSnapshotStreamer() { this->inactive_ = true; - std::scoped_lock lock(send_mutex_); // protects sendImage. + std::scoped_lock lock(send_mutex_); // protects send_image. } -void JpegSnapshotStreamer::sendImage( +void JpegSnapshotStreamer::send_image( const cv::Mat & img, const std::chrono::steady_clock::time_point & time) { @@ -104,7 +116,7 @@ void JpegSnapshotStreamer::sendImage( encode_params.push_back(cv::IMWRITE_JPEG_QUALITY); encode_params.push_back(quality_); - std::vector encoded_buffer; + std::vector encoded_buffer; cv::imencode(".jpeg", img, encoded_buffer, encode_params); char stamp[20]; @@ -127,4 +139,22 @@ void JpegSnapshotStreamer::sendImage( inactive_ = true; } +std::shared_ptr JpegSnapshotStreamerFactory::create_streamer( + const async_web_server_cpp::HttpRequest & request, + async_web_server_cpp::HttpConnectionPtr connection, + rclcpp::Node::WeakPtr node) +{ + return std::make_shared(request, connection, std::move(node)); +} + +} // namespace streamers } // namespace web_video_server + +#include "pluginlib/class_list_macros.hpp" + +PLUGINLIB_EXPORT_CLASS( + web_video_server::streamers::MjpegStreamerFactory, + web_video_server::StreamerFactoryInterface) +PLUGINLIB_EXPORT_CLASS( + web_video_server::streamers::JpegSnapshotStreamerFactory, + web_video_server::SnapshotStreamerFactoryInterface) diff --git a/src/libav_streamer.cpp b/src/streamers/libav_streamer.cpp similarity index 84% rename from src/libav_streamer.cpp rename to src/streamers/libav_streamer.cpp index 327a919..b11ccf6 100644 --- a/src/libav_streamer.cpp +++ b/src/streamers/libav_streamer.cpp @@ -1,5 +1,5 @@ // Copyright (c) 2014, Worcester Polytechnic Institute -// Copyright (c) 2024, The Robot Web Tools Contributors +// Copyright (c) 2024-2025, The Robot Web Tools Contributors // All rights reserved. // // Redistribution and use in source and binary forms, with or without @@ -28,8 +28,44 @@ // ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE // POSSIBILITY OF SUCH DAMAGE. -#include "web_video_server/libav_streamer.hpp" +#include "web_video_server/streamers/libav_streamer.hpp" + +extern "C" +{ +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +} + +#include +#include +#include +#include +#include +#include +#include +#include +#include + +#include + +#include "async_web_server_cpp/http_connection.hpp" #include "async_web_server_cpp/http_reply.hpp" +#include "async_web_server_cpp/http_request.hpp" +#include "rclcpp/node.hpp" +#include "rclcpp/logging.hpp" + +#include "web_video_server/streamers/image_transport_streamer.hpp" // https://stackoverflow.com/questions/46884682/error-in-building-opencv-with-ffmpeg #define AV_CODEC_FLAG_GLOBAL_HEADER (1 << 22) @@ -37,13 +73,15 @@ namespace web_video_server { +namespace streamers +{ -LibavStreamer::LibavStreamer( +LibavStreamerBase::LibavStreamerBase( const async_web_server_cpp::HttpRequest & request, - async_web_server_cpp::HttpConnectionPtr connection, rclcpp::Node::SharedPtr node, - const std::string & format_name, const std::string & codec_name, + async_web_server_cpp::HttpConnectionPtr connection, rclcpp::Node::WeakPtr node, + std::string logger_name, const std::string & format_name, const std::string & codec_name, const std::string & content_type) -: ImageTransportImageStreamer(request, connection, node), format_context_(0), codec_(0), +: ImageTransportStreamerBase(request, connection, node, logger_name), format_context_(0), codec_(0), codec_context_(0), video_stream_(0), opt_(0), frame_(0), sws_context_(0), first_image_received_(false), first_image_time_(), format_name_(format_name), codec_name_(codec_name), content_type_(content_type), io_buffer_(0) @@ -54,7 +92,7 @@ LibavStreamer::LibavStreamer( gop_ = request.get_query_param_value_or_default("gop", 25); } -LibavStreamer::~LibavStreamer() +LibavStreamerBase::~LibavStreamerBase() { if (codec_context_) { avcodec_free_context(&codec_context_); @@ -91,7 +129,7 @@ static int dispatch_output_packet(void * opaque, const uint8_t * buffer, int buf return 0; } -void LibavStreamer::initialize(const cv::Mat & /* img */) +void LibavStreamerBase::initialize(const cv::Mat & /* img */) { // Load format format_context_ = avformat_alloc_context(); @@ -166,7 +204,7 @@ void LibavStreamer::initialize(const cv::Mat & /* img */) codec_context_->flags |= AV_CODEC_FLAG_LOW_DELAY; - initializeEncoder(); + initialize_encoder(); avcodec_parameters_from_context(video_stream_->codecpar, codec_context_); @@ -215,11 +253,7 @@ void LibavStreamer::initialize(const cv::Mat & /* img */) } } -void LibavStreamer::initializeEncoder() -{ -} - -void LibavStreamer::sendImage( +void LibavStreamerBase::send_image( const cv::Mat & img, const std::chrono::steady_clock::time_point & time) { @@ -248,7 +282,7 @@ void LibavStreamer::sendImage( } - int ret = sws_scale( + sws_scale( sws_context_, (const uint8_t * const *)raw_frame->data, raw_frame->linesize, 0, output_height_, frame_->data, frame_->linesize); @@ -258,11 +292,11 @@ void LibavStreamer::sendImage( // Encode the frame AVPacket * pkt = av_packet_alloc(); - ret = avcodec_send_frame(codec_context_, frame_); + int ret = avcodec_send_frame(codec_context_, frame_); if (ret == AVERROR_EOF) { - RCLCPP_DEBUG_STREAM(node_->get_logger(), "avcodec_send_frame() encoder flushed\n"); + RCLCPP_DEBUG_STREAM(logger_, "avcodec_send_frame() encoder flushed\n"); } else if (ret == AVERROR(EAGAIN)) { - RCLCPP_DEBUG_STREAM(node_->get_logger(), "avcodec_send_frame() need output read out\n"); + RCLCPP_DEBUG_STREAM(logger_, "avcodec_send_frame() need output read out\n"); } if (ret < 0) { throw std::runtime_error("Error encoding video frame"); @@ -271,9 +305,9 @@ void LibavStreamer::sendImage( ret = avcodec_receive_packet(codec_context_, pkt); bool got_packet = pkt->size > 0; if (ret == AVERROR_EOF) { - RCLCPP_DEBUG_STREAM(node_->get_logger(), "avcodec_receive_packet() encoder flushed\n"); + RCLCPP_DEBUG_STREAM(logger_, "avcodec_receive_packet() encoder flushed\n"); } else if (ret == AVERROR(EAGAIN)) { - RCLCPP_DEBUG_STREAM(node_->get_logger(), "avcodec_receive_packet() needs more input\n"); + RCLCPP_DEBUG_STREAM(logger_, "avcodec_receive_packet() needs more input\n"); got_packet = false; } @@ -301,24 +335,8 @@ void LibavStreamer::sendImage( av_packet_unref(pkt); } -LibavStreamerType::LibavStreamerType( - const std::string & format_name, const std::string & codec_name, - const std::string & content_type) -: format_name_(format_name), codec_name_(codec_name), content_type_(content_type) -{ -} - -std::shared_ptr LibavStreamerType::create_streamer( - const async_web_server_cpp::HttpRequest & request, - async_web_server_cpp::HttpConnectionPtr connection, - rclcpp::Node::SharedPtr node) -{ - return std::make_shared( - request, connection, node, format_name_, codec_name_, - content_type_); -} - -std::string LibavStreamerType::create_viewer(const async_web_server_cpp::HttpRequest & request) +std::string LibavStreamerFactoryBase::create_viewer( + const async_web_server_cpp::HttpRequest & request) { std::stringstream ss; ss << "