Skip to content

Commit 8e828bb

Browse files
author
Viktor Kunovski
authored
Restream buffered frames with minimum publish rate (#88)
* Restream buffered frames with minimum publish rate * Implement restreaming for ros_compressed_streamer
1 parent ef37d0b commit 8e828bb

10 files changed

+179
-28
lines changed

include/web_video_server/image_streamer.h

Lines changed: 13 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -18,13 +18,19 @@ class ImageStreamer
1818
ros::NodeHandle& nh);
1919

2020
virtual void start() = 0;
21+
virtual ~ImageStreamer();
2122

2223
bool isInactive()
2324
{
2425
return inactive_;
2526
}
2627
;
2728

29+
/**
30+
* Restreams the last received image frame if older than max_age.
31+
*/
32+
virtual void restreamFrame(double max_age) = 0;
33+
2834
std::string getTopic()
2935
{
3036
return topic_;
@@ -45,19 +51,24 @@ class ImageTransportImageStreamer : public ImageStreamer
4551
public:
4652
ImageTransportImageStreamer(const async_web_server_cpp::HttpRequest &request, async_web_server_cpp::HttpConnectionPtr connection,
4753
ros::NodeHandle& nh);
48-
54+
virtual ~ImageTransportImageStreamer();
4955
virtual void start();
5056

5157
protected:
5258
virtual void sendImage(const cv::Mat &, const ros::Time &time) = 0;
53-
59+
virtual void restreamFrame(double max_age);
5460
virtual void initialize(const cv::Mat &);
5561

5662
image_transport::Subscriber image_sub_;
5763
int output_width_;
5864
int output_height_;
5965
bool invert_;
6066
std::string default_transport_;
67+
68+
ros::Time last_frame;
69+
cv::Mat output_size_image;
70+
boost::mutex send_mutex_;
71+
6172
private:
6273
image_transport::ImageTransport it_;
6374
bool initialized_;

include/web_video_server/jpeg_streamers.h

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -15,7 +15,7 @@ class MjpegStreamer : public ImageTransportImageStreamer
1515
public:
1616
MjpegStreamer(const async_web_server_cpp::HttpRequest &request, async_web_server_cpp::HttpConnectionPtr connection,
1717
ros::NodeHandle& nh);
18-
18+
~MjpegStreamer();
1919
protected:
2020
virtual void sendImage(const cv::Mat &, const ros::Time &time);
2121

@@ -38,7 +38,7 @@ class JpegSnapshotStreamer : public ImageTransportImageStreamer
3838
public:
3939
JpegSnapshotStreamer(const async_web_server_cpp::HttpRequest &request,
4040
async_web_server_cpp::HttpConnectionPtr connection, ros::NodeHandle& nh);
41-
41+
~JpegSnapshotStreamer();
4242
protected:
4343
virtual void sendImage(const cv::Mat &, const ros::Time &time);
4444

include/web_video_server/png_streamers.h

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -15,7 +15,7 @@ class PngStreamer : public ImageTransportImageStreamer
1515
public:
1616
PngStreamer(const async_web_server_cpp::HttpRequest &request, async_web_server_cpp::HttpConnectionPtr connection,
1717
ros::NodeHandle& nh);
18-
18+
~PngStreamer();
1919
protected:
2020
virtual void sendImage(const cv::Mat &, const ros::Time &time);
2121

@@ -38,7 +38,7 @@ class PngSnapshotStreamer : public ImageTransportImageStreamer
3838
public:
3939
PngSnapshotStreamer(const async_web_server_cpp::HttpRequest &request,
4040
async_web_server_cpp::HttpConnectionPtr connection, ros::NodeHandle& nh);
41-
41+
~PngSnapshotStreamer();
4242
protected:
4343
virtual void sendImage(const cv::Mat &, const ros::Time &time);
4444

include/web_video_server/ros_compressed_streamer.h

Lines changed: 9 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -15,13 +15,21 @@ class RosCompressedStreamer : public ImageStreamer
1515
public:
1616
RosCompressedStreamer(const async_web_server_cpp::HttpRequest &request, async_web_server_cpp::HttpConnectionPtr connection,
1717
ros::NodeHandle& nh);
18+
~RosCompressedStreamer();
19+
1820
virtual void start();
21+
virtual void restreamFrame(double max_age);
22+
23+
protected:
24+
virtual void sendImage(const sensor_msgs::CompressedImageConstPtr &msg, const ros::Time &time);
1925

2026
private:
2127
void imageCallback(const sensor_msgs::CompressedImageConstPtr &msg);
22-
2328
MultipartStream stream_;
2429
ros::Subscriber image_sub_;
30+
ros::Time last_frame;
31+
sensor_msgs::CompressedImageConstPtr last_msg;
32+
boost::mutex send_mutex_;
2533
};
2634

2735
class RosCompressedStreamerType : public ImageStreamerType

include/web_video_server/web_video_server.h

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -50,6 +50,7 @@ class WebVideoServer
5050
async_web_server_cpp::HttpConnectionPtr connection, const char* begin, const char* end);
5151

5252
private:
53+
void restreamFrames(double max_age);
5354
void cleanup_inactive_streams();
5455

5556
ros::NodeHandle nh_;
@@ -59,6 +60,7 @@ class WebVideoServer
5960
ros::Timer cleanup_timer_;
6061
#endif
6162
int ros_threads_;
63+
double publish_rate_;
6264
int port_;
6365
std::string address_;
6466
boost::shared_ptr<async_web_server_cpp::HttpServer> server_;

src/image_streamer.cpp

Lines changed: 44 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,6 @@
11
#include "web_video_server/image_streamer.h"
22
#include <cv_bridge/cv_bridge.h>
3+
#include <iostream>
34

45
namespace web_video_server
56
{
@@ -11,6 +12,10 @@ ImageStreamer::ImageStreamer(const async_web_server_cpp::HttpRequest &request,
1112
topic_ = request.get_query_param_value_or_default("topic", "");
1213
}
1314

15+
ImageStreamer::~ImageStreamer()
16+
{
17+
}
18+
1419
ImageTransportImageStreamer::ImageTransportImageStreamer(const async_web_server_cpp::HttpRequest &request,
1520
async_web_server_cpp::HttpConnectionPtr connection, ros::NodeHandle& nh) :
1621
ImageStreamer(request, connection, nh), it_(nh), initialized_(false)
@@ -21,6 +26,10 @@ ImageTransportImageStreamer::ImageTransportImageStreamer(const async_web_server_
2126
default_transport_ = request.get_query_param_value_or_default("default_transport", "raw");
2227
}
2328

29+
ImageTransportImageStreamer::~ImageTransportImageStreamer()
30+
{
31+
}
32+
2433
void ImageTransportImageStreamer::start()
2534
{
2635
image_transport::TransportHints hints(default_transport_);
@@ -41,6 +50,37 @@ void ImageTransportImageStreamer::initialize(const cv::Mat &)
4150
{
4251
}
4352

53+
void ImageTransportImageStreamer::restreamFrame(double max_age)
54+
{
55+
if (inactive_ || !initialized_ )
56+
return;
57+
try {
58+
if ( last_frame + ros::Duration(max_age) < ros::Time::now() ) {
59+
boost::mutex::scoped_lock lock(send_mutex_);
60+
sendImage(output_size_image, ros::Time::now() ); // don't update last_frame, it may remain an old value.
61+
}
62+
}
63+
catch (boost::system::system_error &e)
64+
{
65+
// happens when client disconnects
66+
ROS_DEBUG("system_error exception: %s", e.what());
67+
inactive_ = true;
68+
return;
69+
}
70+
catch (std::exception &e)
71+
{
72+
ROS_ERROR_THROTTLE(30, "exception: %s", e.what());
73+
inactive_ = true;
74+
return;
75+
}
76+
catch (...)
77+
{
78+
ROS_ERROR_THROTTLE(30, "exception");
79+
inactive_ = true;
80+
return;
81+
}
82+
}
83+
4484
void ImageTransportImageStreamer::imageCallback(const sensor_msgs::ImageConstPtr &msg)
4585
{
4686
if (inactive_)
@@ -84,7 +124,7 @@ void ImageTransportImageStreamer::imageCallback(const sensor_msgs::ImageConstPtr
84124
cv::flip(img, img, true);
85125
}
86126

87-
cv::Mat output_size_image;
127+
boost::mutex::scoped_lock lock(send_mutex_); // protects output_size_image
88128
if (output_width_ != input_width || output_height_ != input_height)
89129
{
90130
cv::Mat img_resized;
@@ -102,7 +142,9 @@ void ImageTransportImageStreamer::imageCallback(const sensor_msgs::ImageConstPtr
102142
initialize(output_size_image);
103143
initialized_ = true;
104144
}
105-
sendImage(output_size_image, msg->header.stamp);
145+
146+
last_frame = ros::Time::now();
147+
sendImage(output_size_image, last_frame );
106148

107149
}
108150
catch (cv_bridge::Exception &e)

src/jpeg_streamers.cpp

Lines changed: 25 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,12 @@ MjpegStreamer::MjpegStreamer(const async_web_server_cpp::HttpRequest &request,
1212
stream_.sendInitialHeader();
1313
}
1414

15+
MjpegStreamer::~MjpegStreamer()
16+
{
17+
this->inactive_ = true;
18+
boost::mutex::scoped_lock lock(send_mutex_); // protects sendImage.
19+
}
20+
1521
void MjpegStreamer::sendImage(const cv::Mat &img, const ros::Time &time)
1622
{
1723
std::vector<int> encode_params;
@@ -48,6 +54,12 @@ JpegSnapshotStreamer::JpegSnapshotStreamer(const async_web_server_cpp::HttpReque
4854
quality_ = request.get_query_param_value_or_default<int>("quality", 95);
4955
}
5056

57+
JpegSnapshotStreamer::~JpegSnapshotStreamer()
58+
{
59+
this->inactive_ = true;
60+
boost::mutex::scoped_lock lock(send_mutex_); // protects sendImage.
61+
}
62+
5163
void JpegSnapshotStreamer::sendImage(const cv::Mat &img, const ros::Time &time)
5264
{
5365
std::vector<int> encode_params;
@@ -59,13 +71,19 @@ void JpegSnapshotStreamer::sendImage(const cv::Mat &img, const ros::Time &time)
5971

6072
char stamp[20];
6173
sprintf(stamp, "%.06lf", time.toSec());
62-
async_web_server_cpp::HttpReply::builder(async_web_server_cpp::HttpReply::ok).header("Connection", "close").header(
63-
"Server", "web_video_server").header("Cache-Control",
64-
"no-cache, no-store, must-revalidate, pre-check=0, post-check=0, max-age=0").header(
65-
"X-Timestamp", stamp).header("Pragma", "no-cache").header("Content-type", "image/jpeg").header(
66-
"Access-Control-Allow-Origin", "*").header("Content-Length",
67-
boost::lexical_cast<std::string>(encoded_buffer.size())).write(
68-
connection_);
74+
async_web_server_cpp::HttpReply::builder(async_web_server_cpp::HttpReply::ok)
75+
.header("Connection", "close")
76+
.header("Server", "web_video_server")
77+
.header("Cache-Control",
78+
"no-cache, no-store, must-revalidate, pre-check=0, post-check=0, "
79+
"max-age=0")
80+
.header("X-Timestamp", stamp)
81+
.header("Pragma", "no-cache")
82+
.header("Content-type", "image/jpeg")
83+
.header("Access-Control-Allow-Origin", "*")
84+
.header("Content-Length",
85+
boost::lexical_cast<std::string>(encoded_buffer.size()))
86+
.write(connection_);
6987
connection_->write_and_clear(encoded_buffer);
7088
inactive_ = true;
7189
}

src/png_streamers.cpp

Lines changed: 25 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,12 @@ PngStreamer::PngStreamer(const async_web_server_cpp::HttpRequest &request,
1212
stream_.sendInitialHeader();
1313
}
1414

15+
PngStreamer::~PngStreamer()
16+
{
17+
this->inactive_ = true;
18+
boost::mutex::scoped_lock lock(send_mutex_); // protects sendImage.
19+
}
20+
1521
void PngStreamer::sendImage(const cv::Mat &img, const ros::Time &time)
1622
{
1723
std::vector<int> encode_params;
@@ -48,6 +54,12 @@ PngSnapshotStreamer::PngSnapshotStreamer(const async_web_server_cpp::HttpRequest
4854
quality_ = request.get_query_param_value_or_default<int>("quality", 3);
4955
}
5056

57+
PngSnapshotStreamer::~PngSnapshotStreamer()
58+
{
59+
this->inactive_ = true;
60+
boost::mutex::scoped_lock lock(send_mutex_); // protects sendImage.
61+
}
62+
5163
void PngSnapshotStreamer::sendImage(const cv::Mat &img, const ros::Time &time)
5264
{
5365
std::vector<int> encode_params;
@@ -59,13 +71,19 @@ void PngSnapshotStreamer::sendImage(const cv::Mat &img, const ros::Time &time)
5971

6072
char stamp[20];
6173
sprintf(stamp, "%.06lf", time.toSec());
62-
async_web_server_cpp::HttpReply::builder(async_web_server_cpp::HttpReply::ok).header("Connection", "close").header(
63-
"Server", "web_video_server").header("Cache-Control",
64-
"no-cache, no-store, must-revalidate, pre-check=0, post-check=0, max-age=0").header(
65-
"X-Timestamp", stamp).header("Pragma", "no-cache").header("Content-type", "image/png").header(
66-
"Access-Control-Allow-Origin", "*").header("Content-Length",
67-
boost::lexical_cast<std::string>(encoded_buffer.size())).write(
68-
connection_);
74+
async_web_server_cpp::HttpReply::builder(async_web_server_cpp::HttpReply::ok)
75+
.header("Connection", "close")
76+
.header("Server", "web_video_server")
77+
.header("Cache-Control",
78+
"no-cache, no-store, must-revalidate, pre-check=0, post-check=0, "
79+
"max-age=0")
80+
.header("X-Timestamp", stamp)
81+
.header("Pragma", "no-cache")
82+
.header("Content-type", "image/png")
83+
.header("Access-Control-Allow-Origin", "*")
84+
.header("Content-Length",
85+
boost::lexical_cast<std::string>(encoded_buffer.size()))
86+
.write(connection_);
6987
connection_->write_and_clear(encoded_buffer);
7088
inactive_ = true;
7189
}

src/ros_compressed_streamer.cpp

Lines changed: 28 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -10,12 +10,30 @@ RosCompressedStreamer::RosCompressedStreamer(const async_web_server_cpp::HttpReq
1010
stream_.sendInitialHeader();
1111
}
1212

13+
RosCompressedStreamer::~RosCompressedStreamer()
14+
{
15+
this->inactive_ = true;
16+
boost::mutex::scoped_lock lock(send_mutex_); // protects sendImage.
17+
}
18+
1319
void RosCompressedStreamer::start() {
1420
std::string compressed_topic = topic_ + "/compressed";
1521
image_sub_ = nh_.subscribe(compressed_topic, 1, &RosCompressedStreamer::imageCallback, this);
1622
}
1723

18-
void RosCompressedStreamer::imageCallback(const sensor_msgs::CompressedImageConstPtr &msg) {
24+
void RosCompressedStreamer::restreamFrame(double max_age)
25+
{
26+
if (inactive_ || (last_msg == 0))
27+
return;
28+
29+
if ( last_frame + ros::Duration(max_age) < ros::Time::now() ) {
30+
boost::mutex::scoped_lock lock(send_mutex_);
31+
sendImage(last_msg, ros::Time::now() ); // don't update last_frame, it may remain an old value.
32+
}
33+
}
34+
35+
void RosCompressedStreamer::sendImage(const sensor_msgs::CompressedImageConstPtr &msg,
36+
const ros::Time &time) {
1937
try {
2038
std::string content_type;
2139
if(msg->format.find("jpeg") != std::string::npos) {
@@ -29,7 +47,7 @@ void RosCompressedStreamer::imageCallback(const sensor_msgs::CompressedImageCons
2947
return;
3048
}
3149

32-
stream_.sendPart(msg->header.stamp, content_type, boost::asio::buffer(msg->data), msg);
50+
stream_.sendPart(time, content_type, boost::asio::buffer(msg->data), msg);
3351
}
3452
catch (boost::system::system_error &e)
3553
{
@@ -53,6 +71,14 @@ void RosCompressedStreamer::imageCallback(const sensor_msgs::CompressedImageCons
5371
}
5472

5573

74+
void RosCompressedStreamer::imageCallback(const sensor_msgs::CompressedImageConstPtr &msg) {
75+
boost::mutex::scoped_lock lock(send_mutex_); // protects last_msg and last_frame
76+
last_msg = msg;
77+
last_frame = ros::Time(msg->header.stamp.sec, msg->header.stamp.nsec);
78+
sendImage(last_msg, last_frame);
79+
}
80+
81+
5682
boost::shared_ptr<ImageStreamer> RosCompressedStreamerType::create_streamer(const async_web_server_cpp::HttpRequest &request,
5783
async_web_server_cpp::HttpConnectionPtr connection,
5884
ros::NodeHandle& nh)

0 commit comments

Comments
 (0)