Skip to content

Commit d34a3c5

Browse files
committed
Move buffer for RTP packets into internal
Can be used by NACK and JitterBuffer now Relates to #278
1 parent 0eab188 commit d34a3c5

File tree

11 files changed

+245
-237
lines changed

11 files changed

+245
-237
lines changed

internal/rtpbuffer/errors.go

Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,15 @@
1+
// SPDX-FileCopyrightText: 2023 The Pion community <https://pion.ly>
2+
// SPDX-License-Identifier: MIT
3+
4+
package rtpbuffer
5+
6+
import "errors"
7+
8+
// ErrInvalidSize is returned by newReceiveLog/newRTPBuffer, when an incorrect buffer size is supplied.
9+
var ErrInvalidSize = errors.New("invalid buffer size")
10+
11+
var (
12+
errPacketReleased = errors.New("could not retain packet, already released")
13+
errFailedToCastHeaderPool = errors.New("could not access header pool, failed cast")
14+
errFailedToCastPayloadPool = errors.New("could not access payload pool, failed cast")
15+
)
Lines changed: 21 additions & 58 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,7 @@
11
// SPDX-FileCopyrightText: 2023 The Pion community <https://pion.ly>
22
// SPDX-License-Identifier: MIT
33

4-
package nack
4+
package rtpbuffer
55

66
import (
77
"encoding/binary"
@@ -11,16 +11,22 @@ import (
1111
"github.com/pion/rtp"
1212
)
1313

14-
const maxPayloadLen = 1460
14+
// PacketFactory allows custom logic around the handle of RTP Packets before they added to the RTPBuffer.
15+
// The NoOpPacketFactory doesn't copy packets, while the RetainablePacket will take a copy before adding
16+
type PacketFactory interface {
17+
NewPacket(header *rtp.Header, payload []byte, rtxSsrc uint32, rtxPayloadType uint8) (*RetainablePacket, error)
18+
}
1519

16-
type packetManager struct {
20+
// PacketFactoryCopy is PacketFactory that takes a copy of packets when added to the RTPBuffer
21+
type PacketFactoryCopy struct {
1722
headerPool *sync.Pool
1823
payloadPool *sync.Pool
1924
rtxSequencer rtp.Sequencer
2025
}
2126

22-
func newPacketManager() *packetManager {
23-
return &packetManager{
27+
// NewPacketFactoryCopy constructs a PacketFactory that takes a copy of packets when added to the RTPBuffer
28+
func NewPacketFactoryCopy() *PacketFactoryCopy {
29+
return &PacketFactoryCopy{
2430
headerPool: &sync.Pool{
2531
New: func() interface{} {
2632
return &rtp.Header{}
@@ -36,12 +42,13 @@ func newPacketManager() *packetManager {
3642
}
3743
}
3844

39-
func (m *packetManager) NewPacket(header *rtp.Header, payload []byte, rtxSsrc uint32, rtxPayloadType uint8) (*retainablePacket, error) {
45+
// NewPacket constructs a new RetainablePacket that can be added to the RTPBuffer
46+
func (m *PacketFactoryCopy) NewPacket(header *rtp.Header, payload []byte, rtxSsrc uint32, rtxPayloadType uint8) (*RetainablePacket, error) {
4047
if len(payload) > maxPayloadLen {
4148
return nil, io.ErrShortBuffer
4249
}
4350

44-
p := &retainablePacket{
51+
p := &RetainablePacket{
4552
onRelease: m.releasePacket,
4653
sequenceNumber: header.SequenceNumber,
4754
// new packets have retain count of 1
@@ -92,17 +99,19 @@ func (m *packetManager) NewPacket(header *rtp.Header, payload []byte, rtxSsrc ui
9299
return p, nil
93100
}
94101

95-
func (m *packetManager) releasePacket(header *rtp.Header, payload *[]byte) {
102+
func (m *PacketFactoryCopy) releasePacket(header *rtp.Header, payload *[]byte) {
96103
m.headerPool.Put(header)
97104
if payload != nil {
98105
m.payloadPool.Put(payload)
99106
}
100107
}
101108

102-
type noOpPacketFactory struct{}
109+
// PacketFactoryNoOp is a PacketFactory implementation that doesn't copy packets
110+
type PacketFactoryNoOp struct{}
103111

104-
func (f *noOpPacketFactory) NewPacket(header *rtp.Header, payload []byte, _ uint32, _ uint8) (*retainablePacket, error) {
105-
return &retainablePacket{
112+
// NewPacket constructs a new RetainablePacket that can be added to the RTPBuffer
113+
func (f *PacketFactoryNoOp) NewPacket(header *rtp.Header, payload []byte, _ uint32, _ uint8) (*RetainablePacket, error) {
114+
return &RetainablePacket{
106115
onRelease: f.releasePacket,
107116
count: 1,
108117
header: header,
@@ -111,52 +120,6 @@ func (f *noOpPacketFactory) NewPacket(header *rtp.Header, payload []byte, _ uint
111120
}, nil
112121
}
113122

114-
func (f *noOpPacketFactory) releasePacket(_ *rtp.Header, _ *[]byte) {
123+
func (f *PacketFactoryNoOp) releasePacket(_ *rtp.Header, _ *[]byte) {
115124
// no-op
116125
}
117-
118-
type retainablePacket struct {
119-
onRelease func(*rtp.Header, *[]byte)
120-
121-
countMu sync.Mutex
122-
count int
123-
124-
header *rtp.Header
125-
buffer *[]byte
126-
payload []byte
127-
128-
sequenceNumber uint16
129-
}
130-
131-
func (p *retainablePacket) Header() *rtp.Header {
132-
return p.header
133-
}
134-
135-
func (p *retainablePacket) Payload() []byte {
136-
return p.payload
137-
}
138-
139-
func (p *retainablePacket) Retain() error {
140-
p.countMu.Lock()
141-
defer p.countMu.Unlock()
142-
if p.count == 0 {
143-
// already released
144-
return errPacketReleased
145-
}
146-
p.count++
147-
return nil
148-
}
149-
150-
func (p *retainablePacket) Release() {
151-
p.countMu.Lock()
152-
defer p.countMu.Unlock()
153-
p.count--
154-
155-
if p.count == 0 {
156-
// release back to pool
157-
p.onRelease(p.header, p.buffer)
158-
p.header = nil
159-
p.buffer = nil
160-
p.payload = nil
161-
}
162-
}
Lines changed: 61 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,61 @@
1+
// SPDX-FileCopyrightText: 2023 The Pion community <https://pion.ly>
2+
// SPDX-License-Identifier: MIT
3+
4+
package rtpbuffer
5+
6+
import (
7+
"sync"
8+
9+
"github.com/pion/rtp"
10+
)
11+
12+
// RetainablePacket is a referenced counted RTP packet
13+
type RetainablePacket struct {
14+
onRelease func(*rtp.Header, *[]byte)
15+
16+
countMu sync.Mutex
17+
count int
18+
19+
header *rtp.Header
20+
buffer *[]byte
21+
payload []byte
22+
23+
sequenceNumber uint16
24+
}
25+
26+
// Header returns the RTP Header of the RetainablePacket
27+
func (p *RetainablePacket) Header() *rtp.Header {
28+
return p.header
29+
}
30+
31+
// Payload returns the RTP Payload of the RetainablePacket
32+
func (p *RetainablePacket) Payload() []byte {
33+
return p.payload
34+
}
35+
36+
// Retain increases the reference count of the RetainablePacket
37+
func (p *RetainablePacket) Retain() error {
38+
p.countMu.Lock()
39+
defer p.countMu.Unlock()
40+
if p.count == 0 {
41+
// already released
42+
return errPacketReleased
43+
}
44+
p.count++
45+
return nil
46+
}
47+
48+
// Release decreases the reference count of the RetainablePacket and frees if needed
49+
func (p *RetainablePacket) Release() {
50+
p.countMu.Lock()
51+
defer p.countMu.Unlock()
52+
p.count--
53+
54+
if p.count == 0 {
55+
// release back to pool
56+
p.onRelease(p.header, p.buffer)
57+
p.header = nil
58+
p.buffer = nil
59+
p.payload = nil
60+
}
61+
}

internal/rtpbuffer/rtpbuffer.go

Lines changed: 103 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,103 @@
1+
// SPDX-FileCopyrightText: 2023 The Pion community <https://pion.ly>
2+
// SPDX-License-Identifier: MIT
3+
4+
// Package rtpbuffer provides a buffer for storing RTP packets
5+
package rtpbuffer
6+
7+
import (
8+
"fmt"
9+
)
10+
11+
const (
12+
// Uint16SizeHalf is half of a math.Uint16
13+
Uint16SizeHalf = 1 << 15
14+
15+
maxPayloadLen = 1460
16+
)
17+
18+
// RTPBuffer stores RTP packets and allows custom logic around the lifetime of them via the PacketFactory
19+
type RTPBuffer struct {
20+
packets []*RetainablePacket
21+
size uint16
22+
lastAdded uint16
23+
started bool
24+
}
25+
26+
// NewRTPBuffer constructs a new RTPBuffer
27+
func NewRTPBuffer(size uint16) (*RTPBuffer, error) {
28+
allowedSizes := make([]uint16, 0)
29+
correctSize := false
30+
for i := 0; i < 16; i++ {
31+
if size == 1<<i {
32+
correctSize = true
33+
break
34+
}
35+
allowedSizes = append(allowedSizes, 1<<i)
36+
}
37+
38+
if !correctSize {
39+
return nil, fmt.Errorf("%w: %d is not a valid size, allowed sizes: %v", ErrInvalidSize, size, allowedSizes)
40+
}
41+
42+
return &RTPBuffer{
43+
packets: make([]*RetainablePacket, size),
44+
size: size,
45+
}, nil
46+
}
47+
48+
// Add places the RetainablePacket in the RTPBuffer
49+
func (r *RTPBuffer) Add(packet *RetainablePacket) {
50+
seq := packet.sequenceNumber
51+
if !r.started {
52+
r.packets[seq%r.size] = packet
53+
r.lastAdded = seq
54+
r.started = true
55+
return
56+
}
57+
58+
diff := seq - r.lastAdded
59+
if diff == 0 {
60+
return
61+
} else if diff < Uint16SizeHalf {
62+
for i := r.lastAdded + 1; i != seq; i++ {
63+
idx := i % r.size
64+
prevPacket := r.packets[idx]
65+
if prevPacket != nil {
66+
prevPacket.Release()
67+
}
68+
r.packets[idx] = nil
69+
}
70+
}
71+
72+
idx := seq % r.size
73+
prevPacket := r.packets[idx]
74+
if prevPacket != nil {
75+
prevPacket.Release()
76+
}
77+
r.packets[idx] = packet
78+
r.lastAdded = seq
79+
}
80+
81+
// Get returns the RetainablePacket for the requested sequence number
82+
func (r *RTPBuffer) Get(seq uint16) *RetainablePacket {
83+
diff := r.lastAdded - seq
84+
if diff >= Uint16SizeHalf {
85+
return nil
86+
}
87+
88+
if diff >= r.size {
89+
return nil
90+
}
91+
92+
pkt := r.packets[seq%r.size]
93+
if pkt != nil {
94+
if pkt.sequenceNumber != seq {
95+
return nil
96+
}
97+
// already released
98+
if err := pkt.Retain(); err != nil {
99+
return nil
100+
}
101+
}
102+
return pkt
103+
}

0 commit comments

Comments
 (0)