Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@

- libuv: Update to v1.51.0 ([PR #1543](https://github.com/versatica/mediasoup/pull/1543)).
- libsrtp: Update to v3.0.0-beta version in our fork ([PR #1544](https://github.com/versatica/mediasoup/pull/1544)).
- `XxxxConsumer`: Add target layer retransmission buffer ([PR #1548](https://github.com/versatica/mediasoup/pull/1548)).

### 3.16.0

Expand Down
7 changes: 7 additions & 0 deletions worker/include/RTC/SimpleConsumer.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
#include "RTC/Consumer.hpp"
#include "RTC/SeqManager.hpp"
#include "RTC/Shared.hpp"
#include <map>

namespace RTC
{
Expand Down Expand Up @@ -72,6 +73,8 @@ namespace RTC
void UserOnResumed() override;
void CreateRtpStream();
void RequestKeyFrame();
void StorePacketInTargetLayerRetransmissionBuffer(
RTC::RtpPacket* packet, std::shared_ptr<RTC::RtpPacket>& sharedPacket);
void EmitScore() const;

/* Pure virtual methods inherited from RtpStreamSend::Listener. */
Expand All @@ -90,6 +93,10 @@ namespace RTC
std::unique_ptr<RTC::SeqManager<uint16_t>> rtpSeqManager;
bool managingBitrate{ false };
std::unique_ptr<RTC::Codecs::EncodingContext> encodingContext;
// Buffer to store packets that arrive earlier than the first packet of the
// video key frame.
std::map<uint16_t, std::shared_ptr<RTC::RtpPacket>, RTC::SeqManager<uint16_t>::SeqLowerThan>
targetLayerRetransmissionBuffer;
};
} // namespace RTC

Expand Down
7 changes: 7 additions & 0 deletions worker/include/RTC/SimulcastConsumer.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
#include "RTC/Consumer.hpp"
#include "RTC/SeqManager.hpp"
#include "RTC/Shared.hpp"
#include <map>

namespace RTC
{
Expand Down Expand Up @@ -94,6 +95,8 @@ namespace RTC
bool RecalculateTargetLayers(int16_t& newTargetSpatialLayer, int16_t& newTargetTemporalLayer) const;
void UpdateTargetLayers(int16_t newTargetSpatialLayer, int16_t newTargetTemporalLayer);
bool CanSwitchToSpatialLayer(int16_t spatialLayer) const;
void StorePacketInTargetLayerRetransmissionBuffer(
RTC::RtpPacket* packet, std::shared_ptr<RTC::RtpPacket>& sharedPacket);
void EmitScore() const;
void EmitLayersChange() const;
RTC::RtpStreamRecv* GetProducerCurrentRtpStream() const;
Expand Down Expand Up @@ -130,6 +133,10 @@ namespace RTC
uint32_t tsOffset{ 0u }; // RTP Timestamp offset.
bool keyFrameForTsOffsetRequested{ false };
uint64_t lastBweDowngradeAtMs{ 0u }; // Last time we moved to lower spatial layer due to BWE.
// Buffer to store packets that arrive earlier than the first packet of the
// video key frame.
std::map<uint16_t, std::shared_ptr<RTC::RtpPacket>, RTC::SeqManager<uint16_t>::SeqLowerThan>
targetLayerRetransmissionBuffer;
};
} // namespace RTC

Expand Down
6 changes: 6 additions & 0 deletions worker/include/RTC/SvcConsumer.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -85,6 +85,8 @@ namespace RTC
void MayChangeLayers(bool force = false);
bool RecalculateTargetLayers(int16_t& newTargetSpatialLayer, int16_t& newTargetTemporalLayer) const;
void UpdateTargetLayers(int16_t newTargetSpatialLayer, int16_t newTargetTemporalLayer);
void StorePacketInTargetLayerRetransmissionBuffer(
RTC::RtpPacket* packet, std::shared_ptr<RTC::RtpPacket>& sharedPacket);
void EmitScore() const;
void EmitLayersChange() const;

Expand All @@ -107,6 +109,10 @@ namespace RTC
int16_t provisionalTargetTemporalLayer{ -1 };
std::unique_ptr<RTC::Codecs::EncodingContext> encodingContext;
uint64_t lastBweDowngradeAtMs{ 0u }; // Last time we moved to lower spatial layer due to BWE.
// Buffer to store packets that arrive earlier than the first packet of the
// video key frame.
std::map<uint16_t, std::shared_ptr<RTC::RtpPacket>, RTC::SeqManager<uint16_t>::SeqLowerThan>
targetLayerRetransmissionBuffer;
};
} // namespace RTC

Expand Down
90 changes: 79 additions & 11 deletions worker/src/RTC/SimpleConsumer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,10 @@

namespace RTC
{
/* Static. */

static constexpr size_t TargetLayerRetransmissionBufferSize{ 15u };

/* Instance methods. */

SimpleConsumer::SimpleConsumer(
Expand Down Expand Up @@ -74,6 +78,7 @@ namespace RTC
this->shared->channelMessageRegistrator->UnregisterHandler(this->id);

delete this->rtpStream;
this->targetLayerRetransmissionBuffer.clear();
}

flatbuffers::Offset<FBS::Consumer::DumpResponse> SimpleConsumer::FillBuffer(
Expand Down Expand Up @@ -362,31 +367,36 @@ namespace RTC
return;
}

// If we need to sync, support key frames and this is not a key frame, ignore
// the packet.
if (this->syncRequired && this->keyFrameSupported && !packet->IsKeyFrame())
// Packets with only padding are not forwarded.
if (packet->GetPayloadLength() == 0)
{
#ifdef MS_RTC_LOGGER_RTP
packet->logger.Dropped(RtcLogger::RtpPacket::DropReason::NOT_A_KEYFRAME);
packet->logger.Dropped(RtcLogger::RtpPacket::DropReason::EMPTY_PAYLOAD);
#endif

this->rtpSeqManager->Drop(packet->GetSequenceNumber());

return;
}

// Packets with only padding are not forwarded.
if (packet->GetPayloadLength() == 0)
// If we need to sync, support key frames and this is not a key frame, do not
// send the packet and store it for the scenario in which this packet is part
// of the key frame and arrived before the first packet of the key frame.
if (this->syncRequired && this->keyFrameSupported && !packet->IsKeyFrame())
{
#ifdef MS_RTC_LOGGER_RTP
packet->logger.Dropped(RtcLogger::RtpPacket::DropReason::EMPTY_PAYLOAD);
#endif
// No need to drop in the sequence manager the packet since we are blocking
// all packets that are not a key frame and anyway we are syncing below when
// the key frame arrives.

this->rtpSeqManager->Drop(packet->GetSequenceNumber());
StorePacketInTargetLayerRetransmissionBuffer(packet, sharedPacket);

return;
}

// Whether packets stored in the target layer retransmission buffer must be
// sent once this packet is sent.
bool sendPacketsInTargetLayerRetransmissionBuffer{ false };

// Whether this is the first packet after re-sync.
const bool isSyncPacket = this->syncRequired;

Expand All @@ -395,7 +405,13 @@ namespace RTC
{
if (packet->IsKeyFrame())
{
MS_DEBUG_TAG(rtp, "sync key frame received");
MS_DEBUG_TAG(
rtp,
"sync key frame received [seq:%" PRIu16 ", ts:%" PRIu32 "]",
packet->GetSequenceNumber(),
packet->GetTimestamp());

sendPacketsInTargetLayerRetransmissionBuffer = true;
}

this->rtpSeqManager->Sync(packet->GetSequenceNumber() - 1);
Expand Down Expand Up @@ -457,6 +473,31 @@ namespace RTC
// Restore packet fields.
packet->SetSsrc(origSsrc);
packet->SetSequenceNumber(origSeq);

// If sent packet was the first packet of a key frame, we may need to send
// buffered packets belonging to the same key frame that arrived earlier due
// to packet misorder.
if (sendPacketsInTargetLayerRetransmissionBuffer)
{
for (auto& pair : this->targetLayerRetransmissionBuffer)
{
auto& bufferedSharedPacket = pair.second;
auto* bufferedPacket = bufferedSharedPacket.get();

if (bufferedPacket->GetSequenceNumber() > origSeq)
{
MS_DEBUG_DEV(
"sending packet buffered in the target layer retransmission buffer [seq:%" PRIu16
", ts:%" PRIu32 "]",
bufferedPacket->GetSequenceNumber(),
bufferedPacket->GetTimestamp());

SendRtpPacket(bufferedPacket, bufferedSharedPacket);
}
}

this->targetLayerRetransmissionBuffer.clear();
}
}

bool SimpleConsumer::GetRtcp(RTC::RTCP::CompoundPacket* packet, uint64_t nowMs)
Expand Down Expand Up @@ -607,13 +648,15 @@ namespace RTC
MS_TRACE();

this->rtpStream->Pause();
this->targetLayerRetransmissionBuffer.clear();
}

void SimpleConsumer::UserOnPaused()
{
MS_TRACE();

this->rtpStream->Pause();
this->targetLayerRetransmissionBuffer.clear();

if (this->externallyManagedBitrate && this->kind == RTC::Media::Kind::VIDEO)
{
Expand Down Expand Up @@ -729,6 +772,31 @@ namespace RTC
this->listener->OnConsumerKeyFrameRequested(this, mappedSsrc);
}

void SimpleConsumer::StorePacketInTargetLayerRetransmissionBuffer(
RTC::RtpPacket* packet, std::shared_ptr<RTC::RtpPacket>& sharedPacket)
{
MS_TRACE();

MS_DEBUG_DEV(
"storing packet in target layer retransmission buffer [seq:%" PRIu16 ", ts:%" PRIu32 "]",
packet->GetSequenceNumber(),
packet->GetTimestamp());

// Store original packet into the buffer. Only clone once and only if
// necessary.
if (!sharedPacket)
{
sharedPacket.reset(packet->Clone());
}

this->targetLayerRetransmissionBuffer[packet->GetSequenceNumber()] = sharedPacket;

if (this->targetLayerRetransmissionBuffer.size() > TargetLayerRetransmissionBufferSize)
{
this->targetLayerRetransmissionBuffer.erase(this->targetLayerRetransmissionBuffer.begin());
}
}

inline void SimpleConsumer::EmitScore() const
{
MS_TRACE();
Expand Down
Loading