From bfd937feba377f0346eae0bf5888826f3c8e49f3 Mon Sep 17 00:00:00 2001 From: Oto Dusek Date: Tue, 29 Oct 2024 17:23:56 +0100 Subject: [PATCH 1/3] Increment rtx rtp packet sequence number only when trasmitted --- internal/rtpbuffer/packet_factory.go | 7 +++++-- pkg/nack/responder_interceptor.go | 1 + 2 files changed, 6 insertions(+), 2 deletions(-) diff --git a/internal/rtpbuffer/packet_factory.go b/internal/rtpbuffer/packet_factory.go index d3f5a12d..06687058 100644 --- a/internal/rtpbuffer/packet_factory.go +++ b/internal/rtpbuffer/packet_factory.go @@ -17,6 +17,7 @@ const rtxSsrcByteLength = 2 // The NoOpPacketFactory doesn't copy packets, while the RetainablePacket will take a copy before adding type PacketFactory interface { NewPacket(header *rtp.Header, payload []byte, rtxSsrc uint32, rtxPayloadType uint8) (*RetainablePacket, error) + FillSequenceNumber(packet *RetainablePacket) } // PacketFactoryCopy is PacketFactory that takes a copy of packets when added to the RTPBuffer @@ -94,8 +95,6 @@ func (m *PacketFactoryCopy) NewPacket(header *rtp.Header, payload []byte, rtxSsr p.header.SSRC = rtxSsrc // Rewrite the payload type. p.header.PayloadType = rtxPayloadType - // Rewrite the sequence number. - p.header.SequenceNumber = m.rtxSequencer.NextSequenceNumber() // Remove padding if present. if p.header.Padding && p.payload != nil && len(p.payload) > 0 { paddingLength := int(p.payload[len(p.payload)-1]) @@ -107,6 +106,10 @@ func (m *PacketFactoryCopy) NewPacket(header *rtp.Header, payload []byte, rtxSsr return p, nil } +func (m *PacketFactoryCopy) FillSequenceNumber(packet *RetainablePacket) { + packet.header.SequenceNumber = m.rtxSequencer.NextSequenceNumber() +} + func (m *PacketFactoryCopy) releasePacket(header *rtp.Header, payload *[]byte) { m.headerPool.Put(header) if payload != nil { diff --git a/pkg/nack/responder_interceptor.go b/pkg/nack/responder_interceptor.go index 58e34301..49bcf381 100644 --- a/pkg/nack/responder_interceptor.go +++ b/pkg/nack/responder_interceptor.go @@ -147,6 +147,7 @@ func (n *ResponderInterceptor) resendPackets(nack *rtcp.TransportLayerNack) { defer stream.rtpBufferMutex.Unlock() if p := stream.rtpBuffer.Get(seq); p != nil { + n.packetFactory.FillSequenceNumber(p) if _, err := stream.rtpWriter.Write(p.Header(), p.Payload(), interceptor.Attributes{}); err != nil { n.log.Warnf("failed resending nacked packet: %+v", err) } From d7734aa53671ca02a7b3e4bf3843ea8315678f0e Mon Sep 17 00:00:00 2001 From: Oto Dusek Date: Wed, 30 Oct 2024 15:04:10 +0100 Subject: [PATCH 2/3] Fix test --- internal/rtpbuffer/packet_factory.go | 3 +++ 1 file changed, 3 insertions(+) diff --git a/internal/rtpbuffer/packet_factory.go b/internal/rtpbuffer/packet_factory.go index 06687058..a73d72fb 100644 --- a/internal/rtpbuffer/packet_factory.go +++ b/internal/rtpbuffer/packet_factory.go @@ -131,6 +131,9 @@ func (f *PacketFactoryNoOp) NewPacket(header *rtp.Header, payload []byte, _ uint }, nil } +func (m *PacketFactoryNoOp) FillSequenceNumber(packet *RetainablePacket) { +} + func (f *PacketFactoryNoOp) releasePacket(_ *rtp.Header, _ *[]byte) { // no-op } From 1f03e3f8b71fba04d394273bf9f98c33f3e42feb Mon Sep 17 00:00:00 2001 From: Oto Dusek Date: Tue, 18 Mar 2025 14:17:01 +0100 Subject: [PATCH 3/3] Fixed tests --- internal/rtpbuffer/packet_factory.go | 8 -------- pkg/nack/responder_interceptor.go | 5 ++++- pkg/nack/responder_interceptor_test.go | 10 +++++++--- 3 files changed, 11 insertions(+), 12 deletions(-) diff --git a/internal/rtpbuffer/packet_factory.go b/internal/rtpbuffer/packet_factory.go index a73d72fb..8bb49823 100644 --- a/internal/rtpbuffer/packet_factory.go +++ b/internal/rtpbuffer/packet_factory.go @@ -17,7 +17,6 @@ const rtxSsrcByteLength = 2 // The NoOpPacketFactory doesn't copy packets, while the RetainablePacket will take a copy before adding type PacketFactory interface { NewPacket(header *rtp.Header, payload []byte, rtxSsrc uint32, rtxPayloadType uint8) (*RetainablePacket, error) - FillSequenceNumber(packet *RetainablePacket) } // PacketFactoryCopy is PacketFactory that takes a copy of packets when added to the RTPBuffer @@ -106,10 +105,6 @@ func (m *PacketFactoryCopy) NewPacket(header *rtp.Header, payload []byte, rtxSsr return p, nil } -func (m *PacketFactoryCopy) FillSequenceNumber(packet *RetainablePacket) { - packet.header.SequenceNumber = m.rtxSequencer.NextSequenceNumber() -} - func (m *PacketFactoryCopy) releasePacket(header *rtp.Header, payload *[]byte) { m.headerPool.Put(header) if payload != nil { @@ -131,9 +126,6 @@ func (f *PacketFactoryNoOp) NewPacket(header *rtp.Header, payload []byte, _ uint }, nil } -func (m *PacketFactoryNoOp) FillSequenceNumber(packet *RetainablePacket) { -} - func (f *PacketFactoryNoOp) releasePacket(_ *rtp.Header, _ *[]byte) { // no-op } diff --git a/pkg/nack/responder_interceptor.go b/pkg/nack/responder_interceptor.go index 49bcf381..2a2117ac 100644 --- a/pkg/nack/responder_interceptor.go +++ b/pkg/nack/responder_interceptor.go @@ -25,6 +25,7 @@ func (r *ResponderInterceptorFactory) NewInterceptor(_ string) (interceptor.Inte size: 1024, log: logging.NewDefaultLoggerFactory().NewLogger("nack_responder"), streams: map[uint32]*localStream{}, + rtxSequencer: rtp.NewRandomSequencer(), } for _, opt := range r.opts { @@ -54,6 +55,8 @@ type ResponderInterceptor struct { streams map[uint32]*localStream streamsMu sync.Mutex + + rtxSequencer rtp.Sequencer } type localStream struct { @@ -147,7 +150,7 @@ func (n *ResponderInterceptor) resendPackets(nack *rtcp.TransportLayerNack) { defer stream.rtpBufferMutex.Unlock() if p := stream.rtpBuffer.Get(seq); p != nil { - n.packetFactory.FillSequenceNumber(p) + p.Header().SequenceNumber = n.rtxSequencer.NextSequenceNumber() if _, err := stream.rtpWriter.Write(p.Header(), p.Payload(), interceptor.Attributes{}); err != nil { n.log.Warnf("failed resending nacked packet: %+v", err) } diff --git a/pkg/nack/responder_interceptor_test.go b/pkg/nack/responder_interceptor_test.go index 019d85e5..9ff6cde5 100644 --- a/pkg/nack/responder_interceptor_test.go +++ b/pkg/nack/responder_interceptor_test.go @@ -76,12 +76,16 @@ func TestResponderInterceptor(t *testing.T) { }, }, }) - + expectedSequenceNumber := uint16(0) // seq number 13 was never sent, so it can't be resent - for _, seqNum := range []uint16{11, 12, 15} { + for range []uint16{11, 12, 15} { select { case p := <-stream.WrittenRTP(): - require.Equal(t, seqNum, p.SequenceNumber) + if expectedSequenceNumber == 0 { + expectedSequenceNumber = p.SequenceNumber + } + require.Equal(t, expectedSequenceNumber, p.SequenceNumber) + expectedSequenceNumber++ case <-time.After(10 * time.Millisecond): t.Fatal("written rtp packet not found") }