Skip to content

Commit f8ecf93

Browse files
committed
WIP: Add interceptor to aggregate CCFB reports
1 parent c06f448 commit f8ecf93

File tree

3 files changed

+256
-0
lines changed

3 files changed

+256
-0
lines changed

pkg/ccfb/ccfb_receiver.go

Lines changed: 55 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,55 @@
1+
package ccfb
2+
3+
import (
4+
"time"
5+
6+
"github.com/pion/interceptor/internal/ntp"
7+
"github.com/pion/rtcp"
8+
)
9+
10+
type acknowledgement struct {
11+
seqNr uint16
12+
arrived bool
13+
arrival time.Time
14+
ecn rtcp.ECN
15+
}
16+
17+
type acknowledgementList struct {
18+
ts time.Time
19+
acks []acknowledgement
20+
}
21+
22+
func convertCCFB(ts time.Time, feedback *rtcp.CCFeedbackReport) map[uint32]acknowledgementList {
23+
result := map[uint32]acknowledgementList{}
24+
referenceTime := ntp.ToTime(uint64(feedback.ReportTimestamp) << 16)
25+
for _, rb := range feedback.ReportBlocks {
26+
result[rb.MediaSSRC] = convertMetricBlock(ts, referenceTime, rb.BeginSequence, rb.MetricBlocks)
27+
}
28+
return result
29+
}
30+
31+
func convertMetricBlock(ts time.Time, referenceTime time.Time, seqNrOffset uint16, blocks []rtcp.CCFeedbackMetricBlock) acknowledgementList {
32+
reports := make([]acknowledgement, len(blocks))
33+
for i, mb := range blocks {
34+
if mb.Received {
35+
delta := time.Duration((float64(mb.ArrivalTimeOffset) / 1024.0) * float64(time.Second))
36+
reports[i] = acknowledgement{
37+
seqNr: seqNrOffset + uint16(i),
38+
arrived: true,
39+
arrival: referenceTime.Add(-delta),
40+
ecn: mb.ECN,
41+
}
42+
} else {
43+
reports[i] = acknowledgement{
44+
seqNr: seqNrOffset + uint16(i),
45+
arrived: false,
46+
arrival: time.Time{},
47+
ecn: 0,
48+
}
49+
}
50+
}
51+
return acknowledgementList{
52+
ts: ts,
53+
acks: reports,
54+
}
55+
}

pkg/ccfb/history.go

Lines changed: 92 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,92 @@
1+
package ccfb
2+
3+
import (
4+
"errors"
5+
"log"
6+
"time"
7+
8+
"github.com/pion/interceptor/internal/sequencenumber"
9+
"github.com/pion/rtcp"
10+
)
11+
12+
type PacketReportList struct {
13+
Timestamp time.Time
14+
Reports []PacketReport
15+
}
16+
17+
type PacketReport struct {
18+
SeqNr int64
19+
Size uint16
20+
Departure time.Time
21+
Arrived bool
22+
Arrival time.Time
23+
ECN rtcp.ECN
24+
}
25+
26+
type sentPacket struct {
27+
seqNr int64
28+
size uint16
29+
departure time.Time
30+
}
31+
32+
type history struct {
33+
inflight []sentPacket
34+
sentSeqNr *sequencenumber.Unwrapper
35+
ackedSeqNr *sequencenumber.Unwrapper
36+
}
37+
38+
func newHistory() *history {
39+
return &history{
40+
inflight: []sentPacket{},
41+
sentSeqNr: &sequencenumber.Unwrapper{},
42+
ackedSeqNr: &sequencenumber.Unwrapper{},
43+
}
44+
}
45+
46+
func (h *history) add(seqNr uint16, size uint16, departure time.Time) error {
47+
sn := h.sentSeqNr.Unwrap(seqNr)
48+
if len(h.inflight) > 0 && sn < h.inflight[len(h.inflight)-1].seqNr {
49+
return errors.New("sequence number went backwards")
50+
}
51+
h.inflight = append(h.inflight, sentPacket{
52+
seqNr: sn,
53+
size: size,
54+
departure: departure,
55+
})
56+
return nil
57+
}
58+
59+
func (h *history) getReportForAck(al acknowledgementList) PacketReportList {
60+
reports := []PacketReport{}
61+
log.Printf("highest sent: %v", h.inflight[len(h.inflight)-1].seqNr)
62+
for _, pr := range al.acks {
63+
sn := h.ackedSeqNr.Unwrap(pr.seqNr)
64+
i := h.index(sn)
65+
if i > -1 {
66+
reports = append(reports, PacketReport{
67+
SeqNr: sn,
68+
Size: h.inflight[i].size,
69+
Departure: h.inflight[i].departure,
70+
Arrived: pr.arrived,
71+
Arrival: pr.arrival,
72+
ECN: pr.ecn,
73+
})
74+
} else {
75+
panic("got feedback for unknown packet")
76+
}
77+
log.Printf("processed ack for seq nr %v, arrived: %v", sn, pr.arrived)
78+
}
79+
return PacketReportList{
80+
Timestamp: al.ts,
81+
Reports: reports,
82+
}
83+
}
84+
85+
func (h *history) index(seqNr int64) int {
86+
for i := range h.inflight {
87+
if h.inflight[i].seqNr == seqNr {
88+
return i
89+
}
90+
}
91+
return -1
92+
}

pkg/ccfb/interceptor.go

Lines changed: 109 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,109 @@
1+
package ccfb
2+
3+
import (
4+
"sync"
5+
"time"
6+
7+
"github.com/pion/interceptor"
8+
"github.com/pion/rtcp"
9+
"github.com/pion/rtp"
10+
)
11+
12+
type ccfbAttributesKeyType uint32
13+
14+
const CCFBAttributesKey ccfbAttributesKeyType = iota
15+
16+
type Option func(*Interceptor) error
17+
18+
type InterceptorFactory struct {
19+
opts []Option
20+
}
21+
22+
func NewInterceptor(opts ...Option) (*InterceptorFactory, error) {
23+
return &InterceptorFactory{
24+
opts: opts,
25+
}, nil
26+
}
27+
28+
func (f *InterceptorFactory) NewInterceptor(_ string) (interceptor.Interceptor, error) {
29+
i := &Interceptor{
30+
NoOp: interceptor.NoOp{},
31+
timestamp: time.Now,
32+
ssrcToHistory: make(map[uint32]*history),
33+
}
34+
for _, opt := range f.opts {
35+
if err := opt(i); err != nil {
36+
return nil, err
37+
}
38+
}
39+
return i, nil
40+
}
41+
42+
type Interceptor struct {
43+
interceptor.NoOp
44+
lock sync.Mutex
45+
timestamp func() time.Time
46+
ssrcToHistory map[uint32]*history
47+
}
48+
49+
// BindLocalStream implements interceptor.Interceptor.
50+
func (i *Interceptor) BindLocalStream(info *interceptor.StreamInfo, writer interceptor.RTPWriter) interceptor.RTPWriter {
51+
i.lock.Lock()
52+
defer i.lock.Unlock()
53+
i.ssrcToHistory[info.SSRC] = newHistory()
54+
55+
return interceptor.RTPWriterFunc(func(header *rtp.Header, payload []byte, attributes interceptor.Attributes) (int, error) {
56+
i.lock.Lock()
57+
defer i.lock.Unlock()
58+
i.ssrcToHistory[header.SSRC].add(header.SequenceNumber, uint16(header.MarshalSize()+len(payload)), i.timestamp())
59+
return writer.Write(header, payload, attributes)
60+
})
61+
}
62+
63+
// BindRTCPReader implements interceptor.Interceptor.
64+
func (i *Interceptor) BindRTCPReader(reader interceptor.RTCPReader) interceptor.RTCPReader {
65+
return interceptor.RTCPReaderFunc(func(b []byte, a interceptor.Attributes) (int, interceptor.Attributes, error) {
66+
now := i.timestamp()
67+
68+
n, attr, err := reader.Read(b, a)
69+
if err != nil {
70+
return n, attr, err
71+
}
72+
buf := make([]byte, n)
73+
copy(buf, b[:n])
74+
75+
if attr == nil {
76+
attr = make(interceptor.Attributes)
77+
}
78+
79+
pktReportLists := map[uint32]*PacketReportList{}
80+
81+
pkts, err := attr.GetRTCPPackets(buf)
82+
for _, pkt := range pkts {
83+
switch fb := pkt.(type) {
84+
case *rtcp.CCFeedbackReport:
85+
reportLists := convertCCFB(now, fb)
86+
for ssrc, reportList := range reportLists {
87+
prl := i.ssrcToHistory[ssrc].getReportForAck(reportList)
88+
if l, ok := pktReportLists[ssrc]; !ok {
89+
pktReportLists[ssrc] = &prl
90+
} else {
91+
l.Reports = append(l.Reports, prl.Reports...)
92+
}
93+
}
94+
}
95+
}
96+
attr.Set(CCFBAttributesKey, pktReportLists)
97+
return n, attr, err
98+
})
99+
}
100+
101+
// Close implements interceptor.Interceptor.
102+
func (i *Interceptor) Close() error {
103+
panic("unimplemented")
104+
}
105+
106+
// UnbindLocalStream implements interceptor.Interceptor.
107+
func (i *Interceptor) UnbindLocalStream(info *interceptor.StreamInfo) {
108+
panic("unimplemented")
109+
}

0 commit comments

Comments
 (0)