Skip to content
Closed
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@

- libuv: Update to v1.51.0 ([PR #1543](https://github.com/versatica/mediasoup/pull/1543)).
- libsrtp: Update to v3.0.0-beta version in our fork ([PR #1544](https://github.com/versatica/mediasoup/pull/1544)).
- `XxxxConsumer`: Add target layer retransmission buffer ([PR #1548](https://github.com/versatica/mediasoup/pull/1548)).

### 3.16.0

Expand Down
7 changes: 7 additions & 0 deletions worker/include/RTC/SimpleConsumer.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
#include "RTC/Consumer.hpp"
#include "RTC/SeqManager.hpp"
#include "RTC/Shared.hpp"
#include <map>

namespace RTC
{
Expand Down Expand Up @@ -72,6 +73,8 @@ namespace RTC
void UserOnResumed() override;
void CreateRtpStream();
void RequestKeyFrame();
void StorePacketInTargetLayerRetransmissionBuffer(
RTC::RtpPacket* packet, std::shared_ptr<RTC::RtpPacket>& sharedPacket);
void EmitScore() const;

/* Pure virtual methods inherited from RtpStreamSend::Listener. */
Expand All @@ -90,6 +93,10 @@ namespace RTC
std::unique_ptr<RTC::SeqManager<uint16_t>> rtpSeqManager;
bool managingBitrate{ false };
std::unique_ptr<RTC::Codecs::EncodingContext> encodingContext;
// Buffer to store packets that arrive earlier than the first packet of the
// video key frame.
std::map<uint16_t, std::shared_ptr<RTC::RtpPacket>, RTC::SeqManager<uint16_t>::SeqLowerThan>
targetLayerRetransmissionBuffer;
};
} // namespace RTC

Expand Down
85 changes: 73 additions & 12 deletions worker/src/RTC/SimpleConsumer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,10 @@

namespace RTC
{
/* Static. */

static constexpr size_t TargetLayerRetransmissionBufferSize{ 20u };

/* Instance methods. */

SimpleConsumer::SimpleConsumer(
Expand Down Expand Up @@ -74,6 +78,7 @@ namespace RTC
this->shared->channelMessageRegistrator->UnregisterHandler(this->id);

delete this->rtpStream;
this->targetLayerRetransmissionBuffer.clear();
}

flatbuffers::Offset<FBS::Consumer::DumpResponse> SimpleConsumer::FillBuffer(
Expand Down Expand Up @@ -362,31 +367,32 @@ namespace RTC
return;
}

// If we need to sync, support key frames and this is not a key frame, ignore
// the packet.
if (this->syncRequired && this->keyFrameSupported && !packet->IsKeyFrame())
// Packets with only padding are not forwarded.
if (packet->GetPayloadLength() == 0)
{
#ifdef MS_RTC_LOGGER_RTP
packet->logger.Dropped(RtcLogger::RtpPacket::DropReason::NOT_A_KEYFRAME);
packet->logger.Dropped(RtcLogger::RtpPacket::DropReason::EMPTY_PAYLOAD);
#endif

this->rtpSeqManager->Drop(packet->GetSequenceNumber());

return;
}

// Packets with only padding are not forwarded.
if (packet->GetPayloadLength() == 0)
// If we need to sync, support key frames and this is not a key frame, do not
// send the packet and store it for the scenario in which this packet is part
// of the key frame and arrived before the first packet of the key frame.
if (this->syncRequired && this->keyFrameSupported && !packet->IsKeyFrame())
{
#ifdef MS_RTC_LOGGER_RTP
packet->logger.Dropped(RtcLogger::RtpPacket::DropReason::EMPTY_PAYLOAD);
#endif

this->rtpSeqManager->Drop(packet->GetSequenceNumber());
StorePacketInTargetLayerRetransmissionBuffer(packet, sharedPacket);

return;
}

// Whether packets stored in the target layer retransmission buffer must be
// sent once this packet is sent.
bool sendPacketsInTargetLayerRetransmissionBuffer{ false };

// Whether this is the first packet after re-sync.
const bool isSyncPacket = this->syncRequired;

Expand All @@ -395,7 +401,13 @@ namespace RTC
{
if (packet->IsKeyFrame())
{
MS_DEBUG_TAG(rtp, "sync key frame received");
MS_DEBUG_TAG(
rtp,
"sync key frame received [seq:%" PRIu16 ", ts:%" PRIu32 "]",
packet->GetSequenceNumber(),
packet->GetTimestamp());

sendPacketsInTargetLayerRetransmissionBuffer = true;
}

this->rtpSeqManager->Sync(packet->GetSequenceNumber() - 1);
Expand Down Expand Up @@ -457,6 +469,28 @@ namespace RTC
// Restore packet fields.
packet->SetSsrc(origSsrc);
packet->SetSequenceNumber(origSeq);

// If sent packet was the first packet of a key frame, we may need to send
// buffered packets belonging to the same key frame that arrived earlier due
// to packet misorder.
if (sendPacketsInTargetLayerRetransmissionBuffer)
{
for (auto& pair : this->targetLayerRetransmissionBuffer)
{
auto& bufferedSharedPacket = pair.second;
auto* bufferedPacket = bufferedSharedPacket.get();

MS_DEBUG_DEV(
"sending packet buffered in the target layer retransmission buffer [seq:%" PRIu16
", ts:%" PRIu32 "]",
bufferedPacket->GetSequenceNumber(),
bufferedPacket->GetTimestamp());

SendRtpPacket(bufferedPacket, bufferedSharedPacket);
}

this->targetLayerRetransmissionBuffer.clear();
}
}

bool SimpleConsumer::GetRtcp(RTC::RTCP::CompoundPacket* packet, uint64_t nowMs)
Expand Down Expand Up @@ -607,13 +641,15 @@ namespace RTC
MS_TRACE();

this->rtpStream->Pause();
this->targetLayerRetransmissionBuffer.clear();
}

void SimpleConsumer::UserOnPaused()
{
MS_TRACE();

this->rtpStream->Pause();
this->targetLayerRetransmissionBuffer.clear();

if (this->externallyManagedBitrate && this->kind == RTC::Media::Kind::VIDEO)
{
Expand Down Expand Up @@ -729,6 +765,31 @@ namespace RTC
this->listener->OnConsumerKeyFrameRequested(this, mappedSsrc);
}

void SimpleConsumer::StorePacketInTargetLayerRetransmissionBuffer(
RTC::RtpPacket* packet, std::shared_ptr<RTC::RtpPacket>& sharedPacket)
{
MS_TRACE();

MS_DEBUG_DEV(
"storing packet in target layer retransmission buffer [seq:%" PRIu16 ", ts:%" PRIu32 "]",
packet->GetSequenceNumber(),
packet->GetTimestamp());

// Store original packet into the buffer. Only clone once and only if
// necessary.
if (!sharedPacket)
{
sharedPacket.reset(packet->Clone());
}

this->targetLayerRetransmissionBuffer[packet->GetSequenceNumber()] = sharedPacket;

if (this->targetLayerRetransmissionBuffer.size() > TargetLayerRetransmissionBufferSize)
{
this->targetLayerRetransmissionBuffer.erase(this->targetLayerRetransmissionBuffer.begin());
}
}

inline void SimpleConsumer::EmitScore() const
{
MS_TRACE();
Expand Down
Loading