Skip to content

Commit 079fa7c

Browse files
committed
Add RFC 8888 interceptor
This PR implements an RFC 8888, an alternative to transport wide congestion control.
1 parent b0b785b commit 079fa7c

File tree

14 files changed

+1582
-4
lines changed

14 files changed

+1582
-4
lines changed

AUTHORS.txt

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,7 @@ Aditya Kumar <[email protected]>
1010
1111
Antoine Baché <[email protected]>
1212
Atsushi Watanabe <[email protected]>
13+
Bobby Peck <[email protected]>
1314
1415
David Zhao <[email protected]>
1516
Jonathan Müller <[email protected]>

go.mod

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,7 @@ go 1.15
44

55
require (
66
github.com/pion/logging v0.2.2
7-
github.com/pion/rtcp v1.2.9
7+
github.com/pion/rtcp v1.2.10
88
github.com/pion/rtp v1.7.13
99
github.com/stretchr/testify v1.7.1
1010
)

go.sum

Lines changed: 2 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -4,14 +4,13 @@ github.com/pion/logging v0.2.2 h1:M9+AIj/+pxNsDfAT64+MAVgJO0rsyLnoJKCqf//DoeY=
44
github.com/pion/logging v0.2.2/go.mod h1:k0/tDVsRCX2Mb2ZEmTqNa7CWsQPc+YYCB7Q+5pahoms=
55
github.com/pion/randutil v0.1.0 h1:CFG1UdESneORglEsnimhUjf33Rwjubwj6xfiOXBa3mA=
66
github.com/pion/randutil v0.1.0/go.mod h1:XcJrSMMbbMRhASFVOlj/5hQial/Y8oH/HVo7TBZq+j8=
7-
github.com/pion/rtcp v1.2.9 h1:1ujStwg++IOLIEoOiIQ2s+qBuJ1VN81KW+9pMPsif+U=
8-
github.com/pion/rtcp v1.2.9/go.mod h1:qVPhiCzAm4D/rxb6XzKeyZiQK69yJpbUDJSF7TgrqNo=
7+
github.com/pion/rtcp v1.2.10 h1:nkr3uj+8Sp97zyItdN60tE/S6vk4al5CPRR6Gejsdjc=
8+
github.com/pion/rtcp v1.2.10/go.mod h1:ztfEwXZNLGyF1oQDttz/ZKIBaeeg/oWbRYqzBM9TL1I=
99
github.com/pion/rtp v1.7.13 h1:qcHwlmtiI50t1XivvoawdCGTP4Uiypzfrsap+bijcoA=
1010
github.com/pion/rtp v1.7.13/go.mod h1:bDb5n+BFZxXx0Ea7E5qe+klMuqiBrP+w8XSjiWtCUko=
1111
github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM=
1212
github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4=
1313
github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME=
14-
github.com/stretchr/testify v1.7.0/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg=
1514
github.com/stretchr/testify v1.7.1 h1:5TQK59W5E3v0r2duFAb7P95B6hEeOyEnHRa8MjYSMTY=
1615
github.com/stretchr/testify v1.7.1/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg=
1716
gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405 h1:yhCVgyC4o1eVCa2tZl7eS0r+SDo693bJlVdllGtEeKM=

internal/test/mock_ticker.go

Lines changed: 24 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,24 @@
1+
package test
2+
3+
import (
4+
"time"
5+
)
6+
7+
// MockTicker is a helper to replace time.Ticker for testing purposes.
8+
type MockTicker struct {
9+
C chan time.Time
10+
}
11+
12+
// Stop stops the MockTicker.
13+
func (t *MockTicker) Stop() {
14+
}
15+
16+
// Ch returns the tickers channel
17+
func (t *MockTicker) Ch() <-chan time.Time {
18+
return t.C
19+
}
20+
21+
// Tick sends now to the channel
22+
func (t *MockTicker) Tick(now time.Time) {
23+
t.C <- now
24+
}

pkg/rfc8888/interceptor.go

Lines changed: 179 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,179 @@
1+
// Package rfc8888 provides an interceptor that generates congestion control
2+
// feedback reports as defined by RFC 8888.
3+
package rfc8888
4+
5+
import (
6+
"sync"
7+
"time"
8+
9+
"github.com/pion/interceptor"
10+
"github.com/pion/logging"
11+
"github.com/pion/rtcp"
12+
)
13+
14+
// TickerFactory is a factory to create new tickers
15+
type TickerFactory func(d time.Duration) ticker
16+
17+
// SenderInterceptorFactory is a interceptor.Factory for a SenderInterceptor
18+
type SenderInterceptorFactory struct {
19+
opts []Option
20+
}
21+
22+
// NewInterceptor constructs a new SenderInterceptor
23+
func (s *SenderInterceptorFactory) NewInterceptor(id string) (interceptor.Interceptor, error) {
24+
i := &SenderInterceptor{
25+
NoOp: interceptor.NoOp{},
26+
log: logging.NewDefaultLoggerFactory().NewLogger("rfc8888_interceptor"),
27+
lock: sync.Mutex{},
28+
wg: sync.WaitGroup{},
29+
recorder: NewRecorder(),
30+
interval: 100 * time.Millisecond,
31+
maxReportSize: 1200,
32+
packetChan: make(chan packet),
33+
newTicker: func(d time.Duration) ticker {
34+
return &timeTicker{time.NewTicker(d)}
35+
},
36+
now: time.Now,
37+
close: make(chan struct{}),
38+
}
39+
for _, opt := range s.opts {
40+
err := opt(i)
41+
if err != nil {
42+
return nil, err
43+
}
44+
}
45+
return i, nil
46+
}
47+
48+
// NewSenderInterceptor returns a new SenderInterceptorFactory configured with the given options.
49+
func NewSenderInterceptor(opts ...Option) (*SenderInterceptorFactory, error) {
50+
return &SenderInterceptorFactory{opts: opts}, nil
51+
}
52+
53+
// SenderInterceptor sends congestion control feedback as specified in RFC 8888.
54+
type SenderInterceptor struct {
55+
interceptor.NoOp
56+
log logging.LeveledLogger
57+
lock sync.Mutex
58+
wg sync.WaitGroup
59+
recorder *Recorder
60+
interval time.Duration
61+
maxReportSize int64
62+
packetChan chan packet
63+
newTicker TickerFactory
64+
now func() time.Time
65+
close chan struct{}
66+
}
67+
68+
type packet struct {
69+
arrival time.Time
70+
ssrc uint32
71+
sequenceNumber uint16
72+
ecn uint8
73+
}
74+
75+
// BindRTCPWriter lets you modify any outgoing RTCP packets. It is called once per PeerConnection. The returned method
76+
// will be called once per packet batch.
77+
func (s *SenderInterceptor) BindRTCPWriter(writer interceptor.RTCPWriter) interceptor.RTCPWriter {
78+
s.lock.Lock()
79+
defer s.lock.Unlock()
80+
81+
if s.isClosed() {
82+
return writer
83+
}
84+
85+
s.wg.Add(1)
86+
go s.loop(writer)
87+
88+
return writer
89+
}
90+
91+
// BindRemoteStream lets you modify any incoming RTP packets. It is called once for per RemoteStream. The returned method
92+
// will be called once per rtp packet.
93+
func (s *SenderInterceptor) BindRemoteStream(info *interceptor.StreamInfo, reader interceptor.RTPReader) interceptor.RTPReader {
94+
return interceptor.RTPReaderFunc(func(b []byte, a interceptor.Attributes) (int, interceptor.Attributes, error) {
95+
i, attr, err := reader.Read(b, a)
96+
if err != nil {
97+
return 0, nil, err
98+
}
99+
100+
if attr == nil {
101+
attr = make(interceptor.Attributes)
102+
}
103+
header, err := attr.GetRTPHeader(b[:i])
104+
if err != nil {
105+
return 0, nil, err
106+
}
107+
108+
p := packet{
109+
arrival: s.now(),
110+
ssrc: header.SSRC,
111+
sequenceNumber: header.SequenceNumber,
112+
ecn: 0, // ECN is not supported (yet).
113+
}
114+
s.packetChan <- p
115+
return i, attr, nil
116+
})
117+
}
118+
119+
// Close closes the interceptor.
120+
func (s *SenderInterceptor) Close() error {
121+
s.log.Trace("close")
122+
defer s.wg.Wait()
123+
124+
if !s.isClosed() {
125+
close(s.close)
126+
}
127+
128+
return nil
129+
}
130+
131+
func (s *SenderInterceptor) isClosed() bool {
132+
select {
133+
case <-s.close:
134+
return true
135+
default:
136+
return false
137+
}
138+
}
139+
140+
func (s *SenderInterceptor) loop(writer interceptor.RTCPWriter) {
141+
defer s.wg.Done()
142+
143+
select {
144+
case <-s.close:
145+
return
146+
case pkt := <-s.packetChan:
147+
s.log.Tracef("got first packet: %v", pkt)
148+
s.recorder.AddPacket(pkt.arrival, pkt.ssrc, pkt.sequenceNumber, pkt.ecn)
149+
}
150+
151+
s.log.Trace("start loop")
152+
t := s.newTicker(s.interval)
153+
for {
154+
select {
155+
case <-s.close:
156+
t.Stop()
157+
return
158+
159+
case pkt := <-s.packetChan:
160+
s.log.Tracef("got packet: %v", pkt)
161+
s.recorder.AddPacket(pkt.arrival, pkt.ssrc, pkt.sequenceNumber, pkt.ecn)
162+
163+
case now := <-t.Ch():
164+
s.log.Tracef("report triggered at %v", now)
165+
if writer == nil {
166+
s.log.Trace("no writer added, continue")
167+
continue
168+
}
169+
pkts := s.recorder.BuildReport(now, int(s.maxReportSize))
170+
if pkts == nil {
171+
continue
172+
}
173+
s.log.Tracef("got report: %v", pkts)
174+
if _, err := writer.Write([]rtcp.Packet{pkts}, nil); err != nil {
175+
s.log.Error(err.Error())
176+
}
177+
}
178+
}
179+
}

0 commit comments

Comments
 (0)