Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
23 changes: 23 additions & 0 deletions .github/workflows/mediasoup-worker.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,26 @@ jobs:
run-test-asan-address: true
run-test-asan-undefined: false
run-test-asan-thread: true
- os: ubuntu-24.04
cc: gcc
cxx: g++
pip-break-system-packages: true
run-lint: true
run-test: true
run-test-asan-address: true
run-test-asan-undefined: false
run-test-asan-thread: true
# Let's just compile with all Meson option flags enabled once with gcc.
meson_args: '-Dms_log_trace=true -Dms_log_file_line=true -Dms_rtc_logger_rtp=true -Dms_disable_liburing=true -Dms_sctp_stack=true'
- os: ubuntu-24.04
cc: clang
cxx: clang++
pip-break-system-packages: true
run-lint: true
run-test: true
run-test-asan-address: true
run-test-asan-undefined: true
run-test-asan-thread: true
- os: ubuntu-24.04
cc: clang
cxx: clang++
Expand All @@ -62,6 +82,8 @@ jobs:
run-test-asan-address: true
run-test-asan-undefined: true
run-test-asan-thread: true
# Let's just compile with all Meson option flags enabled once with clang.
meson_args: '-Dms_log_trace=true -Dms_log_file_line=true -Dms_rtc_logger_rtp=true -Dms_disable_liburing=true -Dms_sctp_stack=true'
- os: ubuntu-24.04-arm
cc: gcc
cxx: g++
Expand Down Expand Up @@ -147,6 +169,7 @@ jobs:
MEDIASOUP_SKIP_WORKER_PREBUILT_DOWNLOAD: 'true'
MEDIASOUP_LOCAL_DEV: 'false'
MEDIASOUP_BUILDTYPE: ${{ matrix.build-type }}
MESON_ARGS: ${{ matrix.build.meson_args }}

steps:
- name: Checkout
Expand Down
5 changes: 3 additions & 2 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,8 @@

- libuv: Update to v1.51.0 ([PR #1543](https://github.com/versatica/mediasoup/pull/1543)).
- libsrtp: Update to v3.0.0-beta version in our fork ([PR #1544](https://github.com/versatica/mediasoup/pull/1544)).
- `XxxxConsumer.cpp`: Only drop packets in RTP sequence manager when they belong to current spatial layer ([PR #1549](https://github.com/versatica/mediasoup/pull/1549)).
- `Consumer` classes: Only drop packets in RTP sequence manager when they belong to current spatial layer ([PR #1549](https://github.com/versatica/mediasoup/pull/1549)).
- `Consumer` classes: Add target layer retransmission buffer to avoid PLIs/FIRs when RTP packets containing a key frame arrive out of order ([PR #1550](https://github.com/versatica/mediasoup/pull/1550)).

### 3.16.0

Expand Down Expand Up @@ -785,7 +786,7 @@ Migrate `npm-scripts.js` to `npm-scripts.mjs` (ES Module) ([PR #1093](https://gi

### 3.6.35

- `XxxxConsumer.hpp`: make `IsActive()` return `true` (even if `Producer`'s score is 0) when DTX is enabled ([PR #534](https://github.com/versatica/mediasoup/pull/534) due to issue #532).
- `Consumer` classes: make `IsActive()` return `true` (even if `Producer`'s score is 0) when DTX is enabled ([PR #534](https://github.com/versatica/mediasoup/pull/534) due to issue #532).

### 3.6.34

Expand Down
12 changes: 12 additions & 0 deletions worker/include/RTC/PipeConsumer.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
#include "RTC/Consumer.hpp"
#include "RTC/SeqManager.hpp"
#include "RTC/Shared.hpp"
#include <map>

namespace RTC
{
Expand Down Expand Up @@ -59,6 +60,11 @@ namespace RTC
void UserOnResumed() override;
void CreateRtpStreams();
void RequestKeyFrame();
void StorePacketInTargetLayerRetransmissionBuffer(
std::map<uint16_t, std::shared_ptr<RTC::RtpPacket>, RTC::SeqManager<uint16_t>::SeqLowerThan>&
targetLayerRetransmissionBuffer,
RTC::RtpPacket* packet,
std::shared_ptr<RTC::RtpPacket>& sharedPacket);

/* Pure virtual methods inherited from RtpStreamSend::Listener. */
public:
Expand All @@ -75,6 +81,12 @@ namespace RTC
absl::flat_hash_map<RTC::RtpStreamSend*, bool> mapRtpStreamSyncRequired;
absl::flat_hash_map<RTC::RtpStreamSend*, std::unique_ptr<RTC::SeqManager<uint16_t>>>
mapRtpStreamRtpSeqManager;
// Buffers to store packets that arrive earlier than the first packet of the
// video key frame.
absl::flat_hash_map<
RTC::RtpStreamSend*,
std::map<uint16_t, std::shared_ptr<RTC::RtpPacket>, RTC::SeqManager<uint16_t>::SeqLowerThan>>
mapRtpStreamTargetLayerRetransmissionBuffer;
};
} // namespace RTC

Expand Down
7 changes: 7 additions & 0 deletions worker/include/RTC/SimpleConsumer.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
#include "RTC/Consumer.hpp"
#include "RTC/SeqManager.hpp"
#include "RTC/Shared.hpp"
#include <map>

namespace RTC
{
Expand Down Expand Up @@ -72,6 +73,8 @@ namespace RTC
void UserOnResumed() override;
void CreateRtpStream();
void RequestKeyFrame();
void StorePacketInTargetLayerRetransmissionBuffer(
RTC::RtpPacket* packet, std::shared_ptr<RTC::RtpPacket>& sharedPacket);
void EmitScore() const;

/* Pure virtual methods inherited from RtpStreamSend::Listener. */
Expand All @@ -90,6 +93,10 @@ namespace RTC
std::unique_ptr<RTC::SeqManager<uint16_t>> rtpSeqManager;
bool managingBitrate{ false };
std::unique_ptr<RTC::Codecs::EncodingContext> encodingContext;
// Buffer to store packets that arrive earlier than the first packet of the
// video key frame.
std::map<uint16_t, std::shared_ptr<RTC::RtpPacket>, RTC::SeqManager<uint16_t>::SeqLowerThan>
targetLayerRetransmissionBuffer;
};
} // namespace RTC

Expand Down
10 changes: 9 additions & 1 deletion worker/include/RTC/SimulcastConsumer.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
#include "RTC/Consumer.hpp"
#include "RTC/SeqManager.hpp"
#include "RTC/Shared.hpp"
#include <map>

namespace RTC
{
Expand Down Expand Up @@ -95,6 +96,8 @@ namespace RTC
void UpdateTargetLayers(int16_t newTargetSpatialLayer, int16_t newTargetTemporalLayer);
bool CanSwitchToSpatialLayer(int16_t spatialLayer) const;
void EmitScore() const;
void StorePacketInTargetLayerRetransmissionBuffer(
RTC::RtpPacket* packet, std::shared_ptr<RTC::RtpPacket>& sharedPacket);
void EmitLayersChange() const;
RTC::RtpStreamRecv* GetProducerCurrentRtpStream() const;
RTC::RtpStreamRecv* GetProducerTargetRtpStream() const;
Expand Down Expand Up @@ -129,7 +132,12 @@ namespace RTC
std::unique_ptr<RTC::Codecs::EncodingContext> encodingContext;
uint32_t tsOffset{ 0u }; // RTP Timestamp offset.
bool keyFrameForTsOffsetRequested{ false };
uint64_t lastBweDowngradeAtMs{ 0u }; // Last time we moved to lower spatial layer due to BWE.
// Last time we moved to lower spatial layer due to BWE.
uint64_t lastBweDowngradeAtMs{ 0u };
// Buffer to store packets that arrive earlier than the first packet of the
// video key frame.
std::map<uint16_t, std::shared_ptr<RTC::RtpPacket>, RTC::SeqManager<uint16_t>::SeqLowerThan>
targetLayerRetransmissionBuffer;
};
} // namespace RTC

Expand Down
9 changes: 8 additions & 1 deletion worker/include/RTC/SvcConsumer.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -86,6 +86,8 @@ namespace RTC
bool RecalculateTargetLayers(int16_t& newTargetSpatialLayer, int16_t& newTargetTemporalLayer) const;
void UpdateTargetLayers(int16_t newTargetSpatialLayer, int16_t newTargetTemporalLayer);
void EmitScore() const;
void StorePacketInTargetLayerRetransmissionBuffer(
RTC::RtpPacket* packet, std::shared_ptr<RTC::RtpPacket>& sharedPacket);
void EmitLayersChange() const;

/* Pure virtual methods inherited from RtpStreamSend::Listener. */
Expand All @@ -106,7 +108,12 @@ namespace RTC
int16_t provisionalTargetSpatialLayer{ -1 };
int16_t provisionalTargetTemporalLayer{ -1 };
std::unique_ptr<RTC::Codecs::EncodingContext> encodingContext;
uint64_t lastBweDowngradeAtMs{ 0u }; // Last time we moved to lower spatial layer due to BWE.
// Last time we moved to lower spatial layer due to BWE.
uint64_t lastBweDowngradeAtMs{ 0u };
// Buffer to store packets that arrive earlier than the first packet of the
// video key frame.
std::map<uint16_t, std::shared_ptr<RTC::RtpPacket>, RTC::SeqManager<uint16_t>::SeqLowerThan>
targetLayerRetransmissionBuffer;
};
} // namespace RTC

Expand Down
2 changes: 1 addition & 1 deletion worker/meson_options.txt
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
option('ms_log_trace', type : 'boolean', value : false, description : 'When set to true, logs the current method/function if current log level is "debug"')
option('ms_log_file_line', type : 'boolean', value : false, description : 'When set to true, all the logging macros print more verbose information, including current file and line')
option('ms_rtc_logger_rtp', type : 'boolean', value : false, description : 'When set to true, prints a line with information for each RTP packet')
option('ms_rtc_logger_rtp', type : 'boolean', value : false, description : 'When set to true, prints a line with information for each processed RTP packet')
option('ms_disable_liburing', type : 'boolean', value : false, description : 'When set to true, disables liburing integration despite current host supports it')
option('ms_sctp_stack', type : 'boolean', value : false, description : 'When set to true, uses mediasoup SCTP stack instead of usrsctp library')
106 changes: 104 additions & 2 deletions worker/src/RTC/PipeConsumer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -6,9 +6,16 @@
#include "MediaSoupErrors.hpp"
#include "Utils.hpp"
#include "RTC/Codecs/Tools.hpp"
#ifdef MS_RTC_LOGGER_RTP
#include "RTC/RtcLogger.hpp"
#endif

namespace RTC
{
/* Static. */

static constexpr size_t TargetLayerRetransmissionBufferSize{ 15u };

/* Instance methods. */

PipeConsumer::PipeConsumer(
Expand Down Expand Up @@ -52,9 +59,13 @@ namespace RTC
{
delete rtpStream;
}

this->rtpStreams.clear();
this->mapMappedSsrcSsrc.clear();
this->mapSsrcRtpStream.clear();
this->mapRtpStreamSyncRequired.clear();
this->mapRtpStreamRtpSeqManager.clear();
this->mapRtpStreamTargetLayerRetransmissionBuffer.clear();
}

flatbuffers::Offset<FBS::Consumer::DumpResponse> PipeConsumer::FillBuffer(
Expand Down Expand Up @@ -226,6 +237,8 @@ namespace RTC
auto* rtpStream = this->mapSsrcRtpStream.at(ssrc);
auto& syncRequired = this->mapRtpStreamSyncRequired.at(rtpStream);
auto& rtpSeqManager = this->mapRtpStreamRtpSeqManager.at(rtpStream);
auto& targetLayerRetransmissionBuffer =
this->mapRtpStreamTargetLayerRetransmissionBuffer.at(rtpStream);

if (!IsActive())
{
Expand All @@ -250,6 +263,11 @@ namespace RTC
// we are blocking all packets but the key frame that would trigger sync
// below.

// Store the packet for the scenario in which this packet is part of the
// key frame and it arrived before the first packet of the key frame.
StorePacketInTargetLayerRetransmissionBuffer(
targetLayerRetransmissionBuffer, packet, sharedPacket);

return;
}

Expand Down Expand Up @@ -285,6 +303,10 @@ namespace RTC
// Whether this is the first packet after re-sync.
const bool isSyncPacket = syncRequired;

// Whether packets stored in the target layer retransmission buffer must be
// sent once this packet is sent.
bool sendPacketsInTargetLayerRetransmissionBuffer{ false };

// Sync sequence number and timestamp if required.
if (isSyncPacket)
{
Expand All @@ -296,6 +318,8 @@ namespace RTC
packet->GetSsrc(),
packet->GetSequenceNumber(),
packet->GetTimestamp());

sendPacketsInTargetLayerRetransmissionBuffer = true;
}

rtpSeqManager->Sync(packet->GetSequenceNumber() - 1);
Expand Down Expand Up @@ -334,8 +358,9 @@ namespace RTC
origSeq);
}

// Process the packet.
if (rtpStream->ReceivePacket(packet, sharedPacket))
const bool sent = rtpStream->ReceivePacket(packet, sharedPacket);

if (sent)
{
// Send the packet.
this->listener->OnConsumerSendRtpPacket(this, packet);
Expand Down Expand Up @@ -363,6 +388,37 @@ namespace RTC
// Restore packet fields.
packet->SetSsrc(origSsrc);
packet->SetSequenceNumber(origSeq);

// If sent packet was the first packet of a key frame, let's send buffered
// packets belonging to the same key frame that arrived earlier due to
// packet misorder.
if (sendPacketsInTargetLayerRetransmissionBuffer)
{
// NOTE: Only send buffered packets if the first packet containing the key
// frame was sent.
if (sent)
{
for (auto& kv : targetLayerRetransmissionBuffer)
{
auto& bufferedSharedPacket = kv.second;
auto* bufferedPacket = bufferedSharedPacket.get();

if (bufferedPacket->GetSequenceNumber() > origSeq)
{
MS_DEBUG_DEV(
"sending packet buffered in the target layer retransmission buffer [ssrc:%" PRIu32
", seq:%" PRIu16 ", ts:%" PRIu32 "]",
bufferedPacket->GetSsrc(),
bufferedPacket->GetSequenceNumber(),
bufferedPacket->GetTimestamp());

SendRtpPacket(bufferedPacket, bufferedSharedPacket);
}
}
}

targetLayerRetransmissionBuffer.clear();
}
}

bool PipeConsumer::GetRtcp(RTC::RTCP::CompoundPacket* packet, uint64_t nowMs)
Expand Down Expand Up @@ -574,6 +630,13 @@ namespace RTC
{
rtpStream->Pause();
}

for (auto& kv : this->mapRtpStreamTargetLayerRetransmissionBuffer)
{
auto& targetLayerRetransmissionBuffer = kv.second;

targetLayerRetransmissionBuffer.clear();
}
}

void PipeConsumer::UserOnPaused()
Expand All @@ -584,6 +647,13 @@ namespace RTC
{
rtpStream->Pause();
}

for (auto& kv : this->mapRtpStreamTargetLayerRetransmissionBuffer)
{
auto& targetLayerRetransmissionBuffer = kv.second;

targetLayerRetransmissionBuffer.clear();
}
}

void PipeConsumer::UserOnResumed()
Expand Down Expand Up @@ -707,6 +777,8 @@ namespace RTC

this->mapRtpStreamRtpSeqManager[rtpStream].reset(
new RTC::SeqManager<uint16_t>(initialOutputSeq));

this->mapRtpStreamTargetLayerRetransmissionBuffer[rtpStream];
}
}

Expand All @@ -727,6 +799,36 @@ namespace RTC
}
}

void PipeConsumer::StorePacketInTargetLayerRetransmissionBuffer(
std::map<uint16_t, std::shared_ptr<RTC::RtpPacket>, RTC::SeqManager<uint16_t>::SeqLowerThan>&
targetLayerRetransmissionBuffer,
RTC::RtpPacket* packet,
std::shared_ptr<RTC::RtpPacket>& sharedPacket)
{
MS_TRACE();

MS_DEBUG_DEV(
"storing packet in target layer retransmission buffer [ssrc:%" PRIu32 ", seq:%" PRIu16
", ts:%" PRIu32 "]",
packet->GetSsrc(),
packet->GetSequenceNumber(),
packet->GetTimestamp());

// Store original packet into the buffer. Only clone once and only if
// necessary.
if (!sharedPacket)
{
sharedPacket.reset(packet->Clone());
}

targetLayerRetransmissionBuffer[packet->GetSequenceNumber()] = sharedPacket;

if (targetLayerRetransmissionBuffer.size() > TargetLayerRetransmissionBufferSize)
{
targetLayerRetransmissionBuffer.erase(targetLayerRetransmissionBuffer.begin());
}
}

void PipeConsumer::OnRtpStreamScore(
RTC::RtpStream* /*rtpStream*/, uint8_t /*score*/, uint8_t /*previousScore*/)
{
Expand Down
Loading
Loading