Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 12 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -141,6 +141,7 @@ The following parameters can be added to the stream URL:
| `invert` | flag | not present | present/not present | Invert image when parameter is present |
| `default_transport` | string | "raw" | "raw", "compressed", "theora" | Image transport to use |
| `qos_profile` | string | "default" | "default", "system_default", "sensor_data", "services_default" | QoS profile for ROS 2 subscribers |
| `client_id` | string | "" | Any identifier | Optional tag you can later pass to `/shutdown` to close only that client’s stream |

Examples:

Expand All @@ -161,6 +162,17 @@ It is also possible to get a single image snapshot
```
http://localhost:8080/snapshot?topic=/camera/image_raw
```

### Stop an Active Stream

If you embed the MJPEG feed in a UI, call the server’s `/shutdown` endpoint when your component unmounts so the socket on port 8080 closes immediately:


http://localhost:8080/shutdown?topic=/camera/image_raw&client_id=my-ui


`topic` is required and must match the stream URL. Supplying the same `client_id` value you used when opening `/stream` (via `?client_id=my-ui`) lets you shut down only that client; omit it to stop every active stream on the topic. The endpoint responds with `text/plain` (`stopped=<count>`) so you can log/verify that the MJPEG session was torn down.

#### URL Parameters for Snapshot

| Parameter | Type | Default | Possible Values | Description |
Expand Down
9 changes: 9 additions & 0 deletions include/web_video_server/image_streamer.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -55,13 +55,19 @@ class ImageStreamer
rclcpp::Node::SharedPtr node);

virtual void start() = 0;
virtual void stop();
virtual ~ImageStreamer();

bool isInactive()
{
return inactive_;
}

const std::string & getClientId() const
{
return client_id_;
}

/**
* Restreams the last received image frame if older than max_age.
*/
Expand All @@ -79,6 +85,7 @@ class ImageStreamer
bool inactive_;
image_transport::Subscriber image_sub_;
std::string topic_;
std::string client_id_;
};


Expand All @@ -92,6 +99,8 @@ class ImageTransportImageStreamer : public ImageStreamer
virtual ~ImageTransportImageStreamer();

virtual void start();
void stop() override;


protected:
virtual cv::Mat decodeImage(const sensor_msgs::msg::Image::ConstSharedPtr & msg);
Expand Down
6 changes: 6 additions & 0 deletions include/web_video_server/web_video_server.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -93,6 +93,12 @@ class WebVideoServer : public rclcpp::Node
async_web_server_cpp::HttpConnectionPtr connection,
const char * begin, const char * end);

bool handle_shutdown(
const async_web_server_cpp::HttpRequest & request,
async_web_server_cpp::HttpConnectionPtr connection,
const char * begin, const char * end);


private:
void restreamFrames(std::chrono::duration<double> max_age);
void cleanup_inactive_streams();
Expand Down
21 changes: 21 additions & 0 deletions src/image_streamer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -47,12 +47,25 @@ ImageStreamer::ImageStreamer(
: connection_(connection), request_(request), node_(node), inactive_(false)
{
topic_ = request.get_query_param_value_or_default("topic", "");
client_id_ = request.get_query_param_value_or_default("client_id", "");
}

ImageStreamer::~ImageStreamer()
{
}

void ImageStreamer::stop()
{
inactive_ = true;
try {
if (connection_) {
connection_->socket().close();
}
} catch (const std::exception & e) {
RCLCPP_WARN(node_->get_logger(), "Failed to close MJPEG connection: %s", e.what());
}
}

ImageTransportImageStreamer::ImageTransportImageStreamer(
const async_web_server_cpp::HttpRequest & request,
async_web_server_cpp::HttpConnectionPtr connection, rclcpp::Node::SharedPtr node)
Expand Down Expand Up @@ -106,6 +119,14 @@ void ImageTransportImageStreamer::start()
default_transport_, qos_profile.value());
}

void ImageTransportImageStreamer::stop()
{
ImageStreamer::stop();
if (image_sub_) {
image_sub_.shutdown();
}
}

void ImageTransportImageStreamer::initialize(const cv::Mat &)
{
}
Expand Down
51 changes: 51 additions & 0 deletions src/web_video_server.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -91,6 +91,9 @@ WebVideoServer::WebVideoServer(const rclcpp::NodeOptions & options)
handler_group_.addHandlerForPath(
"/snapshot",
boost::bind(&WebVideoServer::handle_snapshot, this, _1, _2, _3, _4));
handler_group_.addHandlerForPath(
"/shutdown",
boost::bind(&WebVideoServer::handle_shutdown, this, _1, _2, _3, _4));

try {
server_.reset(
Expand Down Expand Up @@ -370,6 +373,54 @@ bool WebVideoServer::handle_list_streams(
return true;
}

bool WebVideoServer::handle_shutdown(
const async_web_server_cpp::HttpRequest & request,
async_web_server_cpp::HttpConnectionPtr connection, const char * begin,
const char * end)
{
std::string topic = request.get_query_param_value_or_default("topic", "");
std::string client_id = request.get_query_param_value_or_default("client_id", "");

auto matches_topic = [&topic](const std::string & candidate) {
if (topic.empty()) {
return true;
}
return candidate == topic ||
(topic[0] != '/' && candidate == "/" + topic);
};

std::vector<std::shared_ptr<ImageStreamer>> to_stop;
{
std::scoped_lock lock(subscriber_mutex_);
for (auto & streamer : image_subscribers_) {
if (!streamer || streamer->isInactive()) {
continue;
}
if (matches_topic(streamer->getTopic()) &&
(client_id.empty() || streamer->getClientId() == client_id))
{
to_stop.push_back(streamer);
}
}
}

for (auto & streamer : to_stop) {
streamer->stop();
}

if (verbose_) {
RCLCPP_INFO(
get_logger(), "Shutdown request removed %zu stream(s) for topic %s",
to_stop.size(), topic.empty() ? "<all>" : topic.c_str());
}

std::string body = "stopped=" + std::to_string(to_stop.size());
async_web_server_cpp::HttpReply::static_reply(
async_web_server_cpp::HttpReply::ok, "text/plain", body)(
request, connection, begin, end);
return true;
}

} // namespace web_video_server

#include "rclcpp_components/register_node_macro.hpp"
Expand Down