Skip to content

Commit 241cad5

Browse files
committed
issue-193: add shutdown handler for client to call and close alive sockets
1 parent 9eaa343 commit 241cad5

File tree

5 files changed

+100
-3
lines changed

5 files changed

+100
-3
lines changed

README.md

Lines changed: 13 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -45,9 +45,7 @@ cd ~/ros_ws/src
4545
Clone this repository:
4646
```bash
4747
# ROS 2
48-
git clone https://github.com/RobotWebTools/web_video_server.git
49-
# ROS 1
50-
git clone https://github.com/RobotWebTools/web_video_server.git -b ros1
48+
git clone [email protected]:emulatebio/web-video-server.git
5149
```
5250

5351
Install dependencies with rosdep:
@@ -141,6 +139,7 @@ The following parameters can be added to the stream URL:
141139
| `invert` | flag | not present | present/not present | Invert image when parameter is present |
142140
| `default_transport` | string | "raw" | "raw", "compressed", "theora" | Image transport to use |
143141
| `qos_profile` | string | "default" | "default", "system_default", "sensor_data", "services_default" | QoS profile for ROS 2 subscribers |
142+
| `client_id` | string | "" | Any identifier | Optional tag you can later pass to `/shutdown` to close only that client’s stream |
144143

145144
Examples:
146145

@@ -161,6 +160,17 @@ It is also possible to get a single image snapshot
161160
```
162161
http://localhost:8080/snapshot?topic=/camera/image_raw
163162
```
163+
164+
### Stop an Active Stream
165+
166+
If you embed the MJPEG feed in a UI, call the server’s `/shutdown` endpoint when your component unmounts so the socket on port 8080 closes immediately:
167+
168+
169+
http://localhost:8080/shutdown?topic=/camera/image_raw&client_id=my-ui
170+
171+
172+
`topic` is required and must match the stream URL. Supplying the same `client_id` value you used when opening `/stream` (via `?client_id=my-ui`) lets you shut down only that client; omit it to stop every active stream on the topic. The endpoint responds with `text/plain` (`stopped=<count>`) so you can log/verify that the MJPEG session was torn down.
173+
164174
#### URL Parameters for Snapshot
165175

166176
| Parameter | Type | Default | Possible Values | Description |

include/web_video_server/image_streamer.hpp

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -55,13 +55,19 @@ class ImageStreamer
5555
rclcpp::Node::SharedPtr node);
5656

5757
virtual void start() = 0;
58+
virtual void stop();
5859
virtual ~ImageStreamer();
5960

6061
bool isInactive()
6162
{
6263
return inactive_;
6364
}
6465

66+
const std::string & getClientId() const
67+
{
68+
return client_id_;
69+
}
70+
6571
/**
6672
* Restreams the last received image frame if older than max_age.
6773
*/
@@ -79,6 +85,7 @@ class ImageStreamer
7985
bool inactive_;
8086
image_transport::Subscriber image_sub_;
8187
std::string topic_;
88+
std::string client_id_;
8289
};
8390

8491

@@ -92,6 +99,8 @@ class ImageTransportImageStreamer : public ImageStreamer
9299
virtual ~ImageTransportImageStreamer();
93100

94101
virtual void start();
102+
void stop() override;
103+
95104

96105
protected:
97106
virtual cv::Mat decodeImage(const sensor_msgs::msg::Image::ConstSharedPtr & msg);

include/web_video_server/web_video_server.hpp

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -93,6 +93,12 @@ class WebVideoServer : public rclcpp::Node
9393
async_web_server_cpp::HttpConnectionPtr connection,
9494
const char * begin, const char * end);
9595

96+
bool handle_shutdown(
97+
const async_web_server_cpp::HttpRequest & request,
98+
async_web_server_cpp::HttpConnectionPtr connection,
99+
const char * begin, const char * end);
100+
101+
96102
private:
97103
void restreamFrames(std::chrono::duration<double> max_age);
98104
void cleanup_inactive_streams();

src/image_streamer.cpp

Lines changed: 21 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -47,12 +47,25 @@ ImageStreamer::ImageStreamer(
4747
: connection_(connection), request_(request), node_(node), inactive_(false)
4848
{
4949
topic_ = request.get_query_param_value_or_default("topic", "");
50+
client_id_ = request.get_query_param_value_or_default("client_id", "");
5051
}
5152

5253
ImageStreamer::~ImageStreamer()
5354
{
5455
}
5556

57+
void ImageStreamer::stop()
58+
{
59+
inactive_ = true;
60+
try {
61+
if (connection_) {
62+
connection_->socket().close();
63+
}
64+
} catch (const std::exception & e) {
65+
RCLCPP_WARN(node_->get_logger(), "Failed to close MJPEG connection: %s", e.what());
66+
}
67+
}
68+
5669
ImageTransportImageStreamer::ImageTransportImageStreamer(
5770
const async_web_server_cpp::HttpRequest & request,
5871
async_web_server_cpp::HttpConnectionPtr connection, rclcpp::Node::SharedPtr node)
@@ -106,6 +119,14 @@ void ImageTransportImageStreamer::start()
106119
default_transport_, qos_profile.value());
107120
}
108121

122+
void ImageTransportImageStreamer::stop()
123+
{
124+
ImageStreamer::stop();
125+
if (image_sub_) {
126+
image_sub_.shutdown();
127+
}
128+
}
129+
109130
void ImageTransportImageStreamer::initialize(const cv::Mat &)
110131
{
111132
}

src/web_video_server.cpp

Lines changed: 51 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -91,6 +91,9 @@ WebVideoServer::WebVideoServer(const rclcpp::NodeOptions & options)
9191
handler_group_.addHandlerForPath(
9292
"/snapshot",
9393
boost::bind(&WebVideoServer::handle_snapshot, this, _1, _2, _3, _4));
94+
handler_group_.addHandlerForPath(
95+
"/shutdown",
96+
boost::bind(&WebVideoServer::handle_shutdown, this, _1, _2, _3, _4));
9497

9598
try {
9699
server_.reset(
@@ -370,6 +373,54 @@ bool WebVideoServer::handle_list_streams(
370373
return true;
371374
}
372375

376+
bool WebVideoServer::handle_shutdown(
377+
const async_web_server_cpp::HttpRequest & request,
378+
async_web_server_cpp::HttpConnectionPtr connection, const char * begin,
379+
const char * end)
380+
{
381+
std::string topic = request.get_query_param_value_or_default("topic", "");
382+
std::string client_id = request.get_query_param_value_or_default("client_id", "");
383+
384+
auto matches_topic = [&topic](const std::string & candidate) {
385+
if (topic.empty()) {
386+
return true;
387+
}
388+
return candidate == topic ||
389+
(topic[0] != '/' && candidate == "/" + topic);
390+
};
391+
392+
std::vector<std::shared_ptr<ImageStreamer>> to_stop;
393+
{
394+
std::scoped_lock lock(subscriber_mutex_);
395+
for (auto & streamer : image_subscribers_) {
396+
if (!streamer || streamer->isInactive()) {
397+
continue;
398+
}
399+
if (matches_topic(streamer->getTopic()) &&
400+
(client_id.empty() || streamer->getClientId() == client_id))
401+
{
402+
to_stop.push_back(streamer);
403+
}
404+
}
405+
}
406+
407+
for (auto & streamer : to_stop) {
408+
streamer->stop();
409+
}
410+
411+
if (verbose_) {
412+
RCLCPP_INFO(
413+
get_logger(), "Shutdown request removed %zu stream(s) for topic %s",
414+
to_stop.size(), topic.empty() ? "<all>" : topic.c_str());
415+
}
416+
417+
std::string body = "stopped=" + std::to_string(to_stop.size());
418+
async_web_server_cpp::HttpReply::static_reply(
419+
async_web_server_cpp::HttpReply::ok, "text/plain", body)(
420+
request, connection, begin, end);
421+
return true;
422+
}
423+
373424
} // namespace web_video_server
374425

375426
#include "rclcpp_components/register_node_macro.hpp"

0 commit comments

Comments
 (0)