Skip to content

Commit 4333d93

Browse files
authored
Consumer classes: Add target layer retransmission buffer to avoid PLIs/FIRs when RTP packets containing a key frame arrive out of order (#1550)
1 parent afea333 commit 4333d93

File tree

15 files changed

+447
-32
lines changed

15 files changed

+447
-32
lines changed

.github/workflows/mediasoup-worker.yaml

Lines changed: 23 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -53,6 +53,26 @@ jobs:
5353
run-test-asan-address: true
5454
run-test-asan-undefined: false
5555
run-test-asan-thread: true
56+
- os: ubuntu-24.04
57+
cc: gcc
58+
cxx: g++
59+
pip-break-system-packages: true
60+
run-lint: true
61+
run-test: true
62+
run-test-asan-address: true
63+
run-test-asan-undefined: false
64+
run-test-asan-thread: true
65+
# Let's just compile with all Meson option flags enabled once with gcc.
66+
meson_args: '-Dms_log_trace=true -Dms_log_file_line=true -Dms_rtc_logger_rtp=true -Dms_disable_liburing=true -Dms_sctp_stack=true'
67+
- os: ubuntu-24.04
68+
cc: clang
69+
cxx: clang++
70+
pip-break-system-packages: true
71+
run-lint: true
72+
run-test: true
73+
run-test-asan-address: true
74+
run-test-asan-undefined: true
75+
run-test-asan-thread: true
5676
- os: ubuntu-24.04
5777
cc: clang
5878
cxx: clang++
@@ -62,6 +82,8 @@ jobs:
6282
run-test-asan-address: true
6383
run-test-asan-undefined: true
6484
run-test-asan-thread: true
85+
# Let's just compile with all Meson option flags enabled once with clang.
86+
meson_args: '-Dms_log_trace=true -Dms_log_file_line=true -Dms_rtc_logger_rtp=true -Dms_disable_liburing=true -Dms_sctp_stack=true'
6587
- os: ubuntu-24.04-arm
6688
cc: gcc
6789
cxx: g++
@@ -147,6 +169,7 @@ jobs:
147169
MEDIASOUP_SKIP_WORKER_PREBUILT_DOWNLOAD: 'true'
148170
MEDIASOUP_LOCAL_DEV: 'false'
149171
MEDIASOUP_BUILDTYPE: ${{ matrix.build-type }}
172+
MESON_ARGS: ${{ matrix.build.meson_args }}
150173

151174
steps:
152175
- name: Checkout

CHANGELOG.md

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,8 @@
44

55
- libuv: Update to v1.51.0 ([PR #1543](https://github.com/versatica/mediasoup/pull/1543)).
66
- libsrtp: Update to v3.0.0-beta version in our fork ([PR #1544](https://github.com/versatica/mediasoup/pull/1544)).
7-
- `XxxxConsumer.cpp`: Only drop packets in RTP sequence manager when they belong to current spatial layer ([PR #1549](https://github.com/versatica/mediasoup/pull/1549)).
7+
- `Consumer` classes: Only drop packets in RTP sequence manager when they belong to current spatial layer ([PR #1549](https://github.com/versatica/mediasoup/pull/1549)).
8+
- `Consumer` classes: Add target layer retransmission buffer to avoid PLIs/FIRs when RTP packets containing a key frame arrive out of order ([PR #1550](https://github.com/versatica/mediasoup/pull/1550)).
89

910
### 3.16.0
1011

@@ -785,7 +786,7 @@ Migrate `npm-scripts.js` to `npm-scripts.mjs` (ES Module) ([PR #1093](https://gi
785786

786787
### 3.6.35
787788

788-
- `XxxxConsumer.hpp`: make `IsActive()` return `true` (even if `Producer`'s score is 0) when DTX is enabled ([PR #534](https://github.com/versatica/mediasoup/pull/534) due to issue #532).
789+
- `Consumer` classes: make `IsActive()` return `true` (even if `Producer`'s score is 0) when DTX is enabled ([PR #534](https://github.com/versatica/mediasoup/pull/534) due to issue #532).
789790

790791
### 3.6.34
791792

worker/include/RTC/PipeConsumer.hpp

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,7 @@
44
#include "RTC/Consumer.hpp"
55
#include "RTC/SeqManager.hpp"
66
#include "RTC/Shared.hpp"
7+
#include <map>
78

89
namespace RTC
910
{
@@ -59,6 +60,11 @@ namespace RTC
5960
void UserOnResumed() override;
6061
void CreateRtpStreams();
6162
void RequestKeyFrame();
63+
void StorePacketInTargetLayerRetransmissionBuffer(
64+
std::map<uint16_t, std::shared_ptr<RTC::RtpPacket>, RTC::SeqManager<uint16_t>::SeqLowerThan>&
65+
targetLayerRetransmissionBuffer,
66+
RTC::RtpPacket* packet,
67+
std::shared_ptr<RTC::RtpPacket>& sharedPacket);
6268

6369
/* Pure virtual methods inherited from RtpStreamSend::Listener. */
6470
public:
@@ -75,6 +81,12 @@ namespace RTC
7581
absl::flat_hash_map<RTC::RtpStreamSend*, bool> mapRtpStreamSyncRequired;
7682
absl::flat_hash_map<RTC::RtpStreamSend*, std::unique_ptr<RTC::SeqManager<uint16_t>>>
7783
mapRtpStreamRtpSeqManager;
84+
// Buffers to store packets that arrive earlier than the first packet of the
85+
// video key frame.
86+
absl::flat_hash_map<
87+
RTC::RtpStreamSend*,
88+
std::map<uint16_t, std::shared_ptr<RTC::RtpPacket>, RTC::SeqManager<uint16_t>::SeqLowerThan>>
89+
mapRtpStreamTargetLayerRetransmissionBuffer;
7890
};
7991
} // namespace RTC
8092

worker/include/RTC/SimpleConsumer.hpp

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,7 @@
55
#include "RTC/Consumer.hpp"
66
#include "RTC/SeqManager.hpp"
77
#include "RTC/Shared.hpp"
8+
#include <map>
89

910
namespace RTC
1011
{
@@ -72,6 +73,8 @@ namespace RTC
7273
void UserOnResumed() override;
7374
void CreateRtpStream();
7475
void RequestKeyFrame();
76+
void StorePacketInTargetLayerRetransmissionBuffer(
77+
RTC::RtpPacket* packet, std::shared_ptr<RTC::RtpPacket>& sharedPacket);
7578
void EmitScore() const;
7679

7780
/* Pure virtual methods inherited from RtpStreamSend::Listener. */
@@ -90,6 +93,10 @@ namespace RTC
9093
std::unique_ptr<RTC::SeqManager<uint16_t>> rtpSeqManager;
9194
bool managingBitrate{ false };
9295
std::unique_ptr<RTC::Codecs::EncodingContext> encodingContext;
96+
// Buffer to store packets that arrive earlier than the first packet of the
97+
// video key frame.
98+
std::map<uint16_t, std::shared_ptr<RTC::RtpPacket>, RTC::SeqManager<uint16_t>::SeqLowerThan>
99+
targetLayerRetransmissionBuffer;
93100
};
94101
} // namespace RTC
95102

worker/include/RTC/SimulcastConsumer.hpp

Lines changed: 9 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,7 @@
66
#include "RTC/Consumer.hpp"
77
#include "RTC/SeqManager.hpp"
88
#include "RTC/Shared.hpp"
9+
#include <map>
910

1011
namespace RTC
1112
{
@@ -95,6 +96,8 @@ namespace RTC
9596
void UpdateTargetLayers(int16_t newTargetSpatialLayer, int16_t newTargetTemporalLayer);
9697
bool CanSwitchToSpatialLayer(int16_t spatialLayer) const;
9798
void EmitScore() const;
99+
void StorePacketInTargetLayerRetransmissionBuffer(
100+
RTC::RtpPacket* packet, std::shared_ptr<RTC::RtpPacket>& sharedPacket);
98101
void EmitLayersChange() const;
99102
RTC::RtpStreamRecv* GetProducerCurrentRtpStream() const;
100103
RTC::RtpStreamRecv* GetProducerTargetRtpStream() const;
@@ -129,7 +132,12 @@ namespace RTC
129132
std::unique_ptr<RTC::Codecs::EncodingContext> encodingContext;
130133
uint32_t tsOffset{ 0u }; // RTP Timestamp offset.
131134
bool keyFrameForTsOffsetRequested{ false };
132-
uint64_t lastBweDowngradeAtMs{ 0u }; // Last time we moved to lower spatial layer due to BWE.
135+
// Last time we moved to lower spatial layer due to BWE.
136+
uint64_t lastBweDowngradeAtMs{ 0u };
137+
// Buffer to store packets that arrive earlier than the first packet of the
138+
// video key frame.
139+
std::map<uint16_t, std::shared_ptr<RTC::RtpPacket>, RTC::SeqManager<uint16_t>::SeqLowerThan>
140+
targetLayerRetransmissionBuffer;
133141
};
134142
} // namespace RTC
135143

worker/include/RTC/SvcConsumer.hpp

Lines changed: 8 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -86,6 +86,8 @@ namespace RTC
8686
bool RecalculateTargetLayers(int16_t& newTargetSpatialLayer, int16_t& newTargetTemporalLayer) const;
8787
void UpdateTargetLayers(int16_t newTargetSpatialLayer, int16_t newTargetTemporalLayer);
8888
void EmitScore() const;
89+
void StorePacketInTargetLayerRetransmissionBuffer(
90+
RTC::RtpPacket* packet, std::shared_ptr<RTC::RtpPacket>& sharedPacket);
8991
void EmitLayersChange() const;
9092

9193
/* Pure virtual methods inherited from RtpStreamSend::Listener. */
@@ -106,7 +108,12 @@ namespace RTC
106108
int16_t provisionalTargetSpatialLayer{ -1 };
107109
int16_t provisionalTargetTemporalLayer{ -1 };
108110
std::unique_ptr<RTC::Codecs::EncodingContext> encodingContext;
109-
uint64_t lastBweDowngradeAtMs{ 0u }; // Last time we moved to lower spatial layer due to BWE.
111+
// Last time we moved to lower spatial layer due to BWE.
112+
uint64_t lastBweDowngradeAtMs{ 0u };
113+
// Buffer to store packets that arrive earlier than the first packet of the
114+
// video key frame.
115+
std::map<uint16_t, std::shared_ptr<RTC::RtpPacket>, RTC::SeqManager<uint16_t>::SeqLowerThan>
116+
targetLayerRetransmissionBuffer;
110117
};
111118
} // namespace RTC
112119

worker/meson_options.txt

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
option('ms_log_trace', type : 'boolean', value : false, description : 'When set to true, logs the current method/function if current log level is "debug"')
22
option('ms_log_file_line', type : 'boolean', value : false, description : 'When set to true, all the logging macros print more verbose information, including current file and line')
3-
option('ms_rtc_logger_rtp', type : 'boolean', value : false, description : 'When set to true, prints a line with information for each RTP packet')
3+
option('ms_rtc_logger_rtp', type : 'boolean', value : false, description : 'When set to true, prints a line with information for each processed RTP packet')
44
option('ms_disable_liburing', type : 'boolean', value : false, description : 'When set to true, disables liburing integration despite current host supports it')
55
option('ms_sctp_stack', type : 'boolean', value : false, description : 'When set to true, uses mediasoup SCTP stack instead of usrsctp library')

worker/src/RTC/PipeConsumer.cpp

Lines changed: 104 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -6,9 +6,16 @@
66
#include "MediaSoupErrors.hpp"
77
#include "Utils.hpp"
88
#include "RTC/Codecs/Tools.hpp"
9+
#ifdef MS_RTC_LOGGER_RTP
10+
#include "RTC/RtcLogger.hpp"
11+
#endif
912

1013
namespace RTC
1114
{
15+
/* Static. */
16+
17+
static constexpr size_t TargetLayerRetransmissionBufferSize{ 15u };
18+
1219
/* Instance methods. */
1320

1421
PipeConsumer::PipeConsumer(
@@ -52,9 +59,13 @@ namespace RTC
5259
{
5360
delete rtpStream;
5461
}
62+
5563
this->rtpStreams.clear();
5664
this->mapMappedSsrcSsrc.clear();
5765
this->mapSsrcRtpStream.clear();
66+
this->mapRtpStreamSyncRequired.clear();
67+
this->mapRtpStreamRtpSeqManager.clear();
68+
this->mapRtpStreamTargetLayerRetransmissionBuffer.clear();
5869
}
5970

6071
flatbuffers::Offset<FBS::Consumer::DumpResponse> PipeConsumer::FillBuffer(
@@ -226,6 +237,8 @@ namespace RTC
226237
auto* rtpStream = this->mapSsrcRtpStream.at(ssrc);
227238
auto& syncRequired = this->mapRtpStreamSyncRequired.at(rtpStream);
228239
auto& rtpSeqManager = this->mapRtpStreamRtpSeqManager.at(rtpStream);
240+
auto& targetLayerRetransmissionBuffer =
241+
this->mapRtpStreamTargetLayerRetransmissionBuffer.at(rtpStream);
229242

230243
if (!IsActive())
231244
{
@@ -250,6 +263,11 @@ namespace RTC
250263
// we are blocking all packets but the key frame that would trigger sync
251264
// below.
252265

266+
// Store the packet for the scenario in which this packet is part of the
267+
// key frame and it arrived before the first packet of the key frame.
268+
StorePacketInTargetLayerRetransmissionBuffer(
269+
targetLayerRetransmissionBuffer, packet, sharedPacket);
270+
253271
return;
254272
}
255273

@@ -285,6 +303,10 @@ namespace RTC
285303
// Whether this is the first packet after re-sync.
286304
const bool isSyncPacket = syncRequired;
287305

306+
// Whether packets stored in the target layer retransmission buffer must be
307+
// sent once this packet is sent.
308+
bool sendPacketsInTargetLayerRetransmissionBuffer{ false };
309+
288310
// Sync sequence number and timestamp if required.
289311
if (isSyncPacket)
290312
{
@@ -296,6 +318,8 @@ namespace RTC
296318
packet->GetSsrc(),
297319
packet->GetSequenceNumber(),
298320
packet->GetTimestamp());
321+
322+
sendPacketsInTargetLayerRetransmissionBuffer = true;
299323
}
300324

301325
rtpSeqManager->Sync(packet->GetSequenceNumber() - 1);
@@ -334,8 +358,9 @@ namespace RTC
334358
origSeq);
335359
}
336360

337-
// Process the packet.
338-
if (rtpStream->ReceivePacket(packet, sharedPacket))
361+
const bool sent = rtpStream->ReceivePacket(packet, sharedPacket);
362+
363+
if (sent)
339364
{
340365
// Send the packet.
341366
this->listener->OnConsumerSendRtpPacket(this, packet);
@@ -363,6 +388,37 @@ namespace RTC
363388
// Restore packet fields.
364389
packet->SetSsrc(origSsrc);
365390
packet->SetSequenceNumber(origSeq);
391+
392+
// If sent packet was the first packet of a key frame, let's send buffered
393+
// packets belonging to the same key frame that arrived earlier due to
394+
// packet misorder.
395+
if (sendPacketsInTargetLayerRetransmissionBuffer)
396+
{
397+
// NOTE: Only send buffered packets if the first packet containing the key
398+
// frame was sent.
399+
if (sent)
400+
{
401+
for (auto& kv : targetLayerRetransmissionBuffer)
402+
{
403+
auto& bufferedSharedPacket = kv.second;
404+
auto* bufferedPacket = bufferedSharedPacket.get();
405+
406+
if (bufferedPacket->GetSequenceNumber() > origSeq)
407+
{
408+
MS_DEBUG_DEV(
409+
"sending packet buffered in the target layer retransmission buffer [ssrc:%" PRIu32
410+
", seq:%" PRIu16 ", ts:%" PRIu32 "]",
411+
bufferedPacket->GetSsrc(),
412+
bufferedPacket->GetSequenceNumber(),
413+
bufferedPacket->GetTimestamp());
414+
415+
SendRtpPacket(bufferedPacket, bufferedSharedPacket);
416+
}
417+
}
418+
}
419+
420+
targetLayerRetransmissionBuffer.clear();
421+
}
366422
}
367423

368424
bool PipeConsumer::GetRtcp(RTC::RTCP::CompoundPacket* packet, uint64_t nowMs)
@@ -574,6 +630,13 @@ namespace RTC
574630
{
575631
rtpStream->Pause();
576632
}
633+
634+
for (auto& kv : this->mapRtpStreamTargetLayerRetransmissionBuffer)
635+
{
636+
auto& targetLayerRetransmissionBuffer = kv.second;
637+
638+
targetLayerRetransmissionBuffer.clear();
639+
}
577640
}
578641

579642
void PipeConsumer::UserOnPaused()
@@ -584,6 +647,13 @@ namespace RTC
584647
{
585648
rtpStream->Pause();
586649
}
650+
651+
for (auto& kv : this->mapRtpStreamTargetLayerRetransmissionBuffer)
652+
{
653+
auto& targetLayerRetransmissionBuffer = kv.second;
654+
655+
targetLayerRetransmissionBuffer.clear();
656+
}
587657
}
588658

589659
void PipeConsumer::UserOnResumed()
@@ -707,6 +777,8 @@ namespace RTC
707777

708778
this->mapRtpStreamRtpSeqManager[rtpStream].reset(
709779
new RTC::SeqManager<uint16_t>(initialOutputSeq));
780+
781+
this->mapRtpStreamTargetLayerRetransmissionBuffer[rtpStream];
710782
}
711783
}
712784

@@ -727,6 +799,36 @@ namespace RTC
727799
}
728800
}
729801

802+
void PipeConsumer::StorePacketInTargetLayerRetransmissionBuffer(
803+
std::map<uint16_t, std::shared_ptr<RTC::RtpPacket>, RTC::SeqManager<uint16_t>::SeqLowerThan>&
804+
targetLayerRetransmissionBuffer,
805+
RTC::RtpPacket* packet,
806+
std::shared_ptr<RTC::RtpPacket>& sharedPacket)
807+
{
808+
MS_TRACE();
809+
810+
MS_DEBUG_DEV(
811+
"storing packet in target layer retransmission buffer [ssrc:%" PRIu32 ", seq:%" PRIu16
812+
", ts:%" PRIu32 "]",
813+
packet->GetSsrc(),
814+
packet->GetSequenceNumber(),
815+
packet->GetTimestamp());
816+
817+
// Store original packet into the buffer. Only clone once and only if
818+
// necessary.
819+
if (!sharedPacket)
820+
{
821+
sharedPacket.reset(packet->Clone());
822+
}
823+
824+
targetLayerRetransmissionBuffer[packet->GetSequenceNumber()] = sharedPacket;
825+
826+
if (targetLayerRetransmissionBuffer.size() > TargetLayerRetransmissionBufferSize)
827+
{
828+
targetLayerRetransmissionBuffer.erase(targetLayerRetransmissionBuffer.begin());
829+
}
830+
}
831+
730832
void PipeConsumer::OnRtpStreamScore(
731833
RTC::RtpStream* /*rtpStream*/, uint8_t /*score*/, uint8_t /*previousScore*/)
732834
{

0 commit comments

Comments
 (0)