diff --git a/README.md b/README.md index 2738767..979ad4f 100644 --- a/README.md +++ b/README.md @@ -141,6 +141,7 @@ The following parameters can be added to the stream URL: | `invert` | flag | not present | present/not present | Invert image when parameter is present | | `default_transport` | string | "raw" | "raw", "compressed", "theora" | Image transport to use | | `qos_profile` | string | "default" | "default", "system_default", "sensor_data", "services_default" | QoS profile for ROS 2 subscribers | +| `client_id` | string | "" | Any identifier | Optional tag you can later pass to `/shutdown` to close only that client’s stream | Examples: @@ -161,6 +162,17 @@ It is also possible to get a single image snapshot ``` http://localhost:8080/snapshot?topic=/camera/image_raw ``` + +### Stop an Active Stream + + If you embed the MJPEG feed in a UI, call the server’s `/shutdown` endpoint when your component unmounts so the socket on port 8080 closes immediately: + + + http://localhost:8080/shutdown?topic=/camera/image_raw&client_id=my-ui + + + `topic` is required and must match the stream URL. Supplying the same `client_id` value you used when opening `/stream` (via `?client_id=my-ui`) lets you shut down only that client; omit it to stop every active stream on the topic. The endpoint responds with `text/plain` (`stopped=`) so you can log/verify that the MJPEG session was torn down. + #### URL Parameters for Snapshot | Parameter | Type | Default | Possible Values | Description | diff --git a/include/web_video_server/image_streamer.hpp b/include/web_video_server/image_streamer.hpp index 0e001c5..22b94e5 100644 --- a/include/web_video_server/image_streamer.hpp +++ b/include/web_video_server/image_streamer.hpp @@ -55,6 +55,7 @@ class ImageStreamer rclcpp::Node::SharedPtr node); virtual void start() = 0; + virtual void stop(); virtual ~ImageStreamer(); bool isInactive() @@ -62,6 +63,11 @@ class ImageStreamer return inactive_; } + const std::string & getClientId() const + { + return client_id_; + } + /** * Restreams the last received image frame if older than max_age. */ @@ -79,6 +85,7 @@ class ImageStreamer bool inactive_; image_transport::Subscriber image_sub_; std::string topic_; + std::string client_id_; }; @@ -92,6 +99,8 @@ class ImageTransportImageStreamer : public ImageStreamer virtual ~ImageTransportImageStreamer(); virtual void start(); + void stop() override; + protected: virtual cv::Mat decodeImage(const sensor_msgs::msg::Image::ConstSharedPtr & msg); diff --git a/include/web_video_server/web_video_server.hpp b/include/web_video_server/web_video_server.hpp index 6e9bd95..f026998 100644 --- a/include/web_video_server/web_video_server.hpp +++ b/include/web_video_server/web_video_server.hpp @@ -93,6 +93,12 @@ class WebVideoServer : public rclcpp::Node async_web_server_cpp::HttpConnectionPtr connection, const char * begin, const char * end); + bool handle_shutdown( + const async_web_server_cpp::HttpRequest & request, + async_web_server_cpp::HttpConnectionPtr connection, + const char * begin, const char * end); + + private: void restreamFrames(std::chrono::duration max_age); void cleanup_inactive_streams(); diff --git a/src/image_streamer.cpp b/src/image_streamer.cpp index 564e1dc..2ba351a 100644 --- a/src/image_streamer.cpp +++ b/src/image_streamer.cpp @@ -47,12 +47,25 @@ ImageStreamer::ImageStreamer( : connection_(connection), request_(request), node_(node), inactive_(false) { topic_ = request.get_query_param_value_or_default("topic", ""); + client_id_ = request.get_query_param_value_or_default("client_id", ""); } ImageStreamer::~ImageStreamer() { } +void ImageStreamer::stop() +{ + inactive_ = true; + try { + if (connection_) { + connection_->socket().close(); + } + } catch (const std::exception & e) { + RCLCPP_WARN(node_->get_logger(), "Failed to close MJPEG connection: %s", e.what()); + } +} + ImageTransportImageStreamer::ImageTransportImageStreamer( const async_web_server_cpp::HttpRequest & request, async_web_server_cpp::HttpConnectionPtr connection, rclcpp::Node::SharedPtr node) @@ -106,6 +119,14 @@ void ImageTransportImageStreamer::start() default_transport_, qos_profile.value()); } +void ImageTransportImageStreamer::stop() +{ + ImageStreamer::stop(); + if (image_sub_) { + image_sub_.shutdown(); + } +} + void ImageTransportImageStreamer::initialize(const cv::Mat &) { } diff --git a/src/web_video_server.cpp b/src/web_video_server.cpp index 24db2fb..8df6c39 100644 --- a/src/web_video_server.cpp +++ b/src/web_video_server.cpp @@ -91,6 +91,9 @@ WebVideoServer::WebVideoServer(const rclcpp::NodeOptions & options) handler_group_.addHandlerForPath( "/snapshot", boost::bind(&WebVideoServer::handle_snapshot, this, _1, _2, _3, _4)); + handler_group_.addHandlerForPath( + "/shutdown", + boost::bind(&WebVideoServer::handle_shutdown, this, _1, _2, _3, _4)); try { server_.reset( @@ -370,6 +373,54 @@ bool WebVideoServer::handle_list_streams( return true; } +bool WebVideoServer::handle_shutdown( + const async_web_server_cpp::HttpRequest & request, + async_web_server_cpp::HttpConnectionPtr connection, const char * begin, + const char * end) +{ + std::string topic = request.get_query_param_value_or_default("topic", ""); + std::string client_id = request.get_query_param_value_or_default("client_id", ""); + + auto matches_topic = [&topic](const std::string & candidate) { + if (topic.empty()) { + return true; + } + return candidate == topic || + (topic[0] != '/' && candidate == "/" + topic); + }; + + std::vector> to_stop; + { + std::scoped_lock lock(subscriber_mutex_); + for (auto & streamer : image_subscribers_) { + if (!streamer || streamer->isInactive()) { + continue; + } + if (matches_topic(streamer->getTopic()) && + (client_id.empty() || streamer->getClientId() == client_id)) + { + to_stop.push_back(streamer); + } + } + } + + for (auto & streamer : to_stop) { + streamer->stop(); + } + + if (verbose_) { + RCLCPP_INFO( + get_logger(), "Shutdown request removed %zu stream(s) for topic %s", + to_stop.size(), topic.empty() ? "" : topic.c_str()); + } + + std::string body = "stopped=" + std::to_string(to_stop.size()); + async_web_server_cpp::HttpReply::static_reply( + async_web_server_cpp::HttpReply::ok, "text/plain", body)( + request, connection, begin, end); + return true; +} + } // namespace web_video_server #include "rclcpp_components/register_node_macro.hpp"