Skip to content

Commit f663f07

Browse files
authored
Merge pull request #21 from themactep/feature/rtsp-stability-improvements
Feature/rtsp stability improvements
2 parents d8e9707 + 48a7d01 commit f663f07

12 files changed

Lines changed: 144 additions & 35 deletions

src/AudioWorker.cpp

Lines changed: 13 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,7 @@
77

88
#include <algorithm>
99
#include <cerrno>
10+
#include <chrono>
1011
#include <cstring>
1112
#include <fcntl.h>
1213
#include <string>
@@ -309,11 +310,18 @@ void AudioWorker::process_audio_frame(IMPAudioFrame &frame) {
309310
}
310311
}
311312
if (!delivered) {
312-
#if defined(USE_AUDIO_STREAM_REPLICATOR)
313-
LOG_DDEBUG("audio encChn:" << encChn << ", size:" << af.data.size() << " clogged!");
314-
#else
315-
LOG_ERROR("audio encChn:" << encChn << ", size:" << af.data.size() << " clogged!");
316-
#endif
313+
static uint32_t clog_count = 0;
314+
static uint64_t clog_last_log_ms = 0;
315+
clog_count++;
316+
uint64_t now_ms = static_cast<uint64_t>(
317+
std::chrono::duration_cast<std::chrono::milliseconds>(
318+
std::chrono::steady_clock::now().time_since_epoch()).count());
319+
if (now_ms - clog_last_log_ms >= 5000) {
320+
LOG_WARN("audio encChn:" << encChn << " - msgChannel sink clogged, "
321+
<< clog_count << " frames dropped in last 5s");
322+
clog_count = 0;
323+
clog_last_log_ms = now_ms;
324+
}
317325
}
318326
}
319327

src/Config.cpp

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -536,9 +536,10 @@ std::vector<ConfigItem<int>> CFG::getIntItems() {
536536
{"recorder.prebuffer_max_memory_mb", recorder.prebuffer_max_memory_mb, 2, [](const int &v) { return v >= 1 && v <= 8; }},
537537
#endif
538538
{"rtsp.est_bitrate", rtsp.est_bitrate, 5000, validateIntGe0},
539-
{"rtsp.out_buffer_size", rtsp.out_buffer_size, 500000, validateIntGe0},
539+
{"rtsp.out_buffer_size", rtsp.out_buffer_size, 1048576, validateIntGe0},
540540
{"rtsp.port", rtsp.port, 554, validateInt65535},
541541
{"rtsp.send_buffer_size", rtsp.send_buffer_size, 307200, validateIntGe0},
542+
{"rtsp.send_timeout", rtsp.send_timeout, 5, validateIntGe0},
542543
{"rtsp.session_reclaim", rtsp.session_reclaim, 65, validateIntGe0},
543544
{"sensor.i2c_bus", sensor.i2c_bus, 0, validateIntGe0, false, "/proc/jz/sensor/i2c_bus"},
544545
// TODO: set default fps to the maximum supported by the SoC via HAL

src/Config.hpp

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -291,6 +291,7 @@ struct _rtsp {
291291
int est_bitrate;
292292
int out_buffer_size;
293293
int send_buffer_size;
294+
int send_timeout;
294295
int session_reclaim;
295296
bool auth_required;
296297
const char *username;

src/IMPDeviceSource.cpp

Lines changed: 25 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -88,7 +88,31 @@ template <typename FrameType, typename Stream> void IMPDeviceSource<FrameType, S
8888
gettimeofday(&fPresentationTime, NULL);
8989
}
9090
} else {
91-
gettimeofday(&fPresentationTime, NULL);
91+
// Video: use encoder timestamps for stable, jitter-free PTS.
92+
// The IMP encoder provides a monotonic microsecond timestamp per NAL.
93+
// We anchor it to wall-clock at the first frame, then derive all
94+
// subsequent presentation times from the encoder's own clock.
95+
if (videoFirstFrame) {
96+
gettimeofday(&videoBaseTime, NULL);
97+
videoFirstImpTs = nal.imp_ts;
98+
videoFirstFrame = false;
99+
}
100+
int64_t delta_us = nal.imp_ts - videoFirstImpTs;
101+
// Guard against timestamp wrap/reset (negative delta) or
102+
// impossible forward jumps (> 2s in one frame = encoder restart).
103+
if (delta_us < 0 || (delta_us > 0 && videoLastDelta >= 0 &&
104+
(delta_us - videoLastDelta) > 2000000LL)) {
105+
gettimeofday(&videoBaseTime, NULL);
106+
videoFirstImpTs = nal.imp_ts;
107+
delta_us = 0;
108+
}
109+
videoLastDelta = delta_us;
110+
fPresentationTime.tv_sec = videoBaseTime.tv_sec + static_cast<time_t>(delta_us / 1000000LL);
111+
fPresentationTime.tv_usec = videoBaseTime.tv_usec + static_cast<suseconds_t>(delta_us % 1000000LL);
112+
if (fPresentationTime.tv_usec >= 1000000) {
113+
fPresentationTime.tv_sec++;
114+
fPresentationTime.tv_usec -= 1000000;
115+
}
92116
}
93117

94118
memcpy(fTo, &nal.data[0], fFrameSize);

src/IMPDeviceSource.hpp

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -34,6 +34,12 @@ template <typename FrameType, typename Stream> class IMPDeviceSource : public Fr
3434
bool audioFirstFrame{true};
3535
struct timeval audioStartTime{};
3636
uint64_t audioFrameCount{0};
37+
38+
// Video encoder timestamp tracking (avoids gettimeofday jitter)
39+
bool videoFirstFrame{true};
40+
int64_t videoFirstImpTs{0};
41+
int64_t videoLastDelta{-1};
42+
struct timeval videoBaseTime{};
3743
};
3844

3945
#endif

src/IMPServerMediaSubsession.cpp

Lines changed: 43 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,7 @@
88
#include "IMPDeviceSource.hpp"
99
#include <iostream>
1010
#include <memory>
11+
#include <sys/socket.h>
1112

1213
// Modify method to accept pointers for the NAL units
1314
IMPServerMediaSubsession *IMPServerMediaSubsession::createNew(UsageEnvironment &env,
@@ -48,6 +49,15 @@ FramedSource *IMPServerMediaSubsession::createNewStreamSource(unsigned clientSes
4849
RTPSink *IMPServerMediaSubsession::createNewRTPSink(Groupsock *rtpGroupsock, unsigned char rtpPayloadTypeIfDynamic,
4950
FramedSource *fs) {
5051
increaseSendBufferTo(envir(), rtpGroupsock->socketNum(), cfg->rtsp.send_buffer_size);
52+
53+
// Set send timeout to detect and disconnect stalled RTSP clients
54+
// (inspired by go2rtc which uses a 5s write deadline on TCP sockets)
55+
if (cfg->rtsp.send_timeout > 0) {
56+
struct timeval tv;
57+
tv.tv_sec = cfg->rtsp.send_timeout;
58+
tv.tv_usec = 0;
59+
setsockopt(rtpGroupsock->socketNum(), SOL_SOCKET, SO_SNDTIMEO, &tv, sizeof(tv));
60+
}
5161
// Use VPS only if it's available (non-nullptr, and we are in H265 mode)
5262
if (vps) {
5363
return H265VideoRTPSink::createNew(envir(), rtpGroupsock, rtpPayloadTypeIfDynamic, &vps->data[0],
@@ -63,3 +73,36 @@ RTPSink *IMPServerMediaSubsession::createNewRTPSink(Groupsock *rtpGroupsock, uns
6373
// not only the first sdp is used
6474
// delete[] fSDPLines; fSDPLines = NULL;
6575
}
76+
77+
char const *IMPServerMediaSubsession::sdpLines(int addressFamily) {
78+
// Check if encoder codec config (SPS/PPS/VPS) has changed since last SDP.
79+
// If so, update our copies and invalidate cached SDP so live555 regenerates it.
80+
// This enables dynamic resolution/profile changes without RTSP server restart.
81+
if (encChn >= 0 && encChn < NUM_VIDEO_CHANNELS && global_video[encChn]) {
82+
std::lock_guard<std::mutex> lock(global_video[encChn]->codec_config_mutex);
83+
bool changed = false;
84+
85+
if (global_video[encChn]->have_sps && global_video[encChn]->latest_sps != lastKnownSps) {
86+
sps.data = global_video[encChn]->latest_sps;
87+
lastKnownSps = global_video[encChn]->latest_sps;
88+
changed = true;
89+
}
90+
if (global_video[encChn]->have_pps && global_video[encChn]->latest_pps != lastKnownPps) {
91+
pps.data = global_video[encChn]->latest_pps;
92+
lastKnownPps = global_video[encChn]->latest_pps;
93+
changed = true;
94+
}
95+
if (vps && global_video[encChn]->have_vps && global_video[encChn]->latest_vps != lastKnownVps) {
96+
vps->data = global_video[encChn]->latest_vps;
97+
lastKnownVps = global_video[encChn]->latest_vps;
98+
changed = true;
99+
}
100+
101+
if (changed) {
102+
delete[] fSDPLines;
103+
fSDPLines = NULL;
104+
}
105+
}
106+
107+
return OnDemandServerMediaSubsession::sdpLines(addressFamily);
108+
}

src/IMPServerMediaSubsession.hpp

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,9 @@ class IMPServerMediaSubsession : public OnDemandServerMediaSubsession {
2626
virtual RTPSink *createNewRTPSink(Groupsock *rtpGroupsock, unsigned char rtpPayloadTypeIfDynamic,
2727
FramedSource *inputSource);
2828

29+
// Override sdpLines to refresh SDP when encoder SPS/PPS change
30+
virtual char const *sdpLines(int addressFamily) override;
31+
2932
virtual void startStream(unsigned clientSessionId, void *streamToken, TaskFunc *rtcpRRHandler,
3033
void *rtcpRRHandlerClientData, unsigned short &rtpSeqNum, unsigned &rtpTimestamp,
3134
ServerRequestAlternativeByteHandler *serverRequestAlternativeByteHandler,
@@ -54,6 +57,11 @@ class IMPServerMediaSubsession : public OnDemandServerMediaSubsession {
5457
H264NALUnit sps;
5558
H264NALUnit pps;
5659
int encChn;
60+
61+
// Track whether codec config has changed since last SDP generation
62+
std::vector<uint8_t> lastKnownSps;
63+
std::vector<uint8_t> lastKnownPps;
64+
std::vector<uint8_t> lastKnownVps;
5765
};
5866

5967
#endif

src/JsonAPI.cpp

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1361,6 +1361,7 @@ void handle_rtsp(JsonValue *obj, std::string &out, bool &sep) {
13611361
add_int("est_bitrate", "rtsp.est_bitrate");
13621362
add_int("out_buffer_size", "rtsp.out_buffer_size");
13631363
add_int("send_buffer_size", "rtsp.send_buffer_size");
1364+
add_int("send_timeout", "rtsp.send_timeout");
13641365
add_bool_r("auth_required", "rtsp.auth_required");
13651366
add_str_r("name", "rtsp.name");
13661367
add_str_r("username", "rtsp.username");

src/RTSP.cpp

Lines changed: 13 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -72,17 +72,23 @@ void RTSP::addSubsession(int chnNr, _stream &stream) {
7272
LOG_INFO("Audio stream " << chnNr << " added to session");
7373
}
7474

75-
if (cfg->audio.output_enabled && stream.audio_enabled) {
76-
#define ADD_BACKCHANNEL_SUBSESSION(EnumName, NameString, PayloadType, Frequency, MimeType) \
75+
// ONVIF backchannel: add backchannel subsessions to the primary stream (ch0)
76+
// with a require tag so they only appear in the SDP when the client sends
77+
// "Require: www.onvif.org/ver20/backchannel" in the DESCRIBE request.
78+
// Per ONVIF Streaming Spec Section 5.3, backchannel tracks must be part of
79+
// the main media session, not a separate endpoint.
80+
if (chnNr == 0 && cfg->audio.output_enabled) {
81+
#define ADD_BC_SUBSESSION_CH0(EnumName, NameString, PayloadType, Frequency, MimeType) \
7782
{ \
78-
BackchannelServerMediaSubsession *backchannelSub = \
83+
BackchannelServerMediaSubsession *bcSub = \
7984
BackchannelServerMediaSubsession::createNew(*env, IMPBackchannelFormat::EnumName); \
80-
sms->addSubsession(backchannelSub); \
81-
LOG_INFO("Backchannel stream " << NameString << " added to session"); \
85+
bcSub->setRequireTag("www.onvif.org/ver20/backchannel"); \
86+
sms->addSubsession(bcSub); \
87+
LOG_INFO("Backchannel (" << NameString << ") added to ch0 (conditional on Require header)"); \
8288
}
8389

84-
X_FOREACH_BACKCHANNEL_FORMAT(ADD_BACKCHANNEL_SUBSESSION)
85-
#undef ADD_BACKCHANNEL_SUBSESSION
90+
X_FOREACH_BACKCHANNEL_FORMAT(ADD_BC_SUBSESSION_CH0)
91+
#undef ADD_BC_SUBSESSION_CH0
8692
}
8793

8894
rtspServer->addServerMediaSession(sms);

src/VideoWorker.cpp

Lines changed: 28 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -275,12 +275,10 @@ void VideoWorker::run() {
275275
continue;
276276
}
277277

278-
/* timestamp fix, can be removed if solved
279278
int64_t nal_ts = stream.pack[stream.packCount - 1].timestamp;
280279
struct timeval encoder_time;
281280
encoder_time.tv_sec = nal_ts / 1000000;
282281
encoder_time.tv_usec = nal_ts % 1000000;
283-
*/
284282

285283
for (uint32_t i = 0; i < stream.packCount; ++i) {
286284
bool recorder_active = channel_recorder.isActive();
@@ -439,10 +437,10 @@ void VideoWorker::run() {
439437
if (global_video[encChn]->hasDataCallback) {
440438
H264NALUnit nalu;
441439

442-
/* timestamp fix, can be removed if solved
443-
nalu.imp_ts = stream.pack[i].timestamp;
444-
nalu.time = encoder_time;
445-
*/
440+
// Use the frame-level timestamp (from last pack) for all NALs
441+
// in this GetStream() call. SPS/PPS packs may carry timestamp=0
442+
// which would cause a DTS discontinuity in the RTP stream.
443+
nalu.imp_ts = nal_ts;
446444

447445
// We use start+4 because the encoder inserts 4-byte MPEG
448446
// 'startcodes' at the beginning of each NAL. Live555 complains.
@@ -467,13 +465,21 @@ void VideoWorker::run() {
467465

468466
if (global_video[encChn]->idr == true) {
469467
bool delivered = false;
470-
// Use write_wait() to apply backpressure instead of silent drops
468+
// Use non-blocking write() to avoid stalling encoder on slow clients
469+
// (go2rtc-inspired: drop oldest frame rather than block producer)
471470
try {
472-
global_video[encChn]->msgChannel->write_wait(nalu);
473-
delivered = true;
474-
std::unique_lock<std::mutex> lock_stream{global_video[encChn]->onDataCallbackLock};
475-
if (global_video[encChn]->onDataCallback)
476-
global_video[encChn]->onDataCallback();
471+
delivered = global_video[encChn]->msgChannel->write(nalu);
472+
if (delivered) {
473+
std::unique_lock<std::mutex> lock_stream{global_video[encChn]->onDataCallbackLock};
474+
if (global_video[encChn]->onDataCallback)
475+
global_video[encChn]->onDataCallback();
476+
} else {
477+
LOG_DDEBUG("video channel:" << encChn << " msgChannel full, dropped oldest NAL");
478+
// Still notify so consumer processes queued data
479+
std::unique_lock<std::mutex> lock_stream{global_video[encChn]->onDataCallbackLock};
480+
if (global_video[encChn]->onDataCallback)
481+
global_video[encChn]->onDataCallback();
482+
}
477483
} catch (const std::exception& e) {
478484
LOG_ERROR("video channel:" << encChn << ", frame_id:" << nalu.frame_id
479485
<< ", packet:" << nalu.packet_index << "/" << nalu.packet_count
@@ -496,10 +502,16 @@ void VideoWorker::run() {
496502
}
497503
}
498504
if (!delivered) {
499-
LOG_ERROR("video channel:" << encChn << ", "
500-
<< "frame_id:" << nalu.frame_id << ", "
501-
<< "package:" << i << " of " << stream.packCount << ", "
502-
<< "packageSize:" << nalu.data.size() << " - msgChannel sink clogged!");
505+
static uint32_t clog_count[NUM_VIDEO_CHANNELS] = {};
506+
static uint64_t clog_last_log_ms[NUM_VIDEO_CHANNELS] = {};
507+
clog_count[encChn]++;
508+
uint64_t now_ms = monotonic_ms();
509+
if (now_ms - clog_last_log_ms[encChn] >= 5000) {
510+
LOG_WARN("video channel:" << encChn << " - msgChannel sink clogged, "
511+
<< clog_count[encChn] << " frames dropped in last 5s");
512+
clog_count[encChn] = 0;
513+
clog_last_log_ms[encChn] = now_ms;
514+
}
503515
}
504516
}
505517
#if defined(USE_AUDIO_STREAM_REPLICATOR)

0 commit comments

Comments
 (0)