Skip to content

Commit eacb42d

Browse files
committed
Fix locking issues
1 parent 66841af commit eacb42d

File tree

1 file changed

+32
-13
lines changed

1 file changed

+32
-13
lines changed

pkg/ccfb/interceptor.go

Lines changed: 32 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -97,7 +97,7 @@ func NewInterceptor(opts ...Option) (*InterceptorFactory, error) {
9797
func (f *InterceptorFactory) NewInterceptor(_ string) (interceptor.Interceptor, error) {
9898
in := &Interceptor{
9999
NoOp: interceptor.NoOp{},
100-
lock: sync.Mutex{},
100+
lock: sync.RWMutex{},
101101
log: logging.NewDefaultLoggerFactory().NewLogger("ccfb_interceptor"),
102102
timestamp: time.Now,
103103
convertCCFB: convertCCFB,
@@ -128,7 +128,7 @@ func (f *InterceptorFactory) NewInterceptor(_ string) (interceptor.Interceptor,
128128
// contains a single entry with SSRC=0 if TWCC is used.
129129
type Interceptor struct {
130130
interceptor.NoOp
131-
lock sync.Mutex
131+
lock sync.RWMutex
132132
log logging.LeveledLogger
133133
timestamp func() time.Time
134134
convertCCFB func(ts time.Time, feedback *rtcp.CCFeedbackReport) (time.Time, map[uint32][]acknowledgement)
@@ -165,8 +165,8 @@ func (i *Interceptor) BindLocalStream(
165165

166166
// nolint
167167
return interceptor.RTPWriterFunc(func(header *rtp.Header, payload []byte, attributes interceptor.Attributes) (int, error) {
168-
i.lock.Lock()
169-
defer i.lock.Unlock()
168+
i.lock.RLock()
169+
defer i.lock.RUnlock()
170170

171171
// If we are using TWCC, we use the sequence number from the TWCC header
172172
// extension and save all TWCC sequence numbers with the same SSRC (0).
@@ -182,9 +182,13 @@ func (i *Interceptor) BindLocalStream(
182182
"Falling back to saving history for CCFB feedback reports. err: %v",
183183
err,
184184
)
185+
i.lock.RUnlock()
186+
i.lock.Lock()
185187
if _, ok := i.ssrcToHistory[ssrc]; !ok {
186188
i.ssrcToHistory[ssrc] = i.historyFactory(i.historySize)
187189
}
190+
i.lock.Unlock()
191+
i.lock.RLock()
188192
} else {
189193
seqNr = twccHdrExt.TransportSequence
190194
ssrc = 0
@@ -220,6 +224,9 @@ func (i *Interceptor) BindRTCPReader(reader interceptor.RTCPReader) interceptor.
220224
if err != nil {
221225
return n, attr, err
222226
}
227+
228+
i.lock.RLock()
229+
defer i.lock.RUnlock()
223230
for _, pkt := range pkts {
224231
var reportLists map[uint32][]acknowledgement
225232
var reportDeparture time.Time
@@ -230,15 +237,7 @@ func (i *Interceptor) BindRTCPReader(reader interceptor.RTCPReader) interceptor.
230237
reportDeparture, reportLists = i.convertTWCC(fb)
231238
default:
232239
}
233-
ssrcToPrl := map[uint32][]PacketReport{}
234-
for ssrc, reportList := range reportLists {
235-
prl := i.ssrcToHistory[ssrc].getReportForAck(reportList)
236-
if _, ok := ssrcToPrl[ssrc]; !ok {
237-
ssrcToPrl[ssrc] = prl
238-
} else {
239-
ssrcToPrl[ssrc] = append(ssrcToPrl[ssrc], prl...)
240-
}
241-
}
240+
ssrcToPrl := i.mapAckToHistory(reportLists)
242241
res = append(res, Report{
243242
Arrival: now,
244243
Departure: reportDeparture,
@@ -250,3 +249,23 @@ func (i *Interceptor) BindRTCPReader(reader interceptor.RTCPReader) interceptor.
250249
return n, attr, err
251250
})
252251
}
252+
253+
func (i *Interceptor) mapAckToHistory(reportLists map[uint32][]acknowledgement) map[uint32][]PacketReport {
254+
ssrcToPrl := map[uint32][]PacketReport{}
255+
for ssrc, reportList := range reportLists {
256+
hist, ok := i.ssrcToHistory[ssrc]
257+
if !ok {
258+
i.log.Warnf("dropping report for unknown SSRC: %v", ssrc)
259+
260+
continue
261+
}
262+
prl := hist.getReportForAck(reportList)
263+
if _, ok := ssrcToPrl[ssrc]; !ok {
264+
ssrcToPrl[ssrc] = prl
265+
} else {
266+
ssrcToPrl[ssrc] = append(ssrcToPrl[ssrc], prl...)
267+
}
268+
}
269+
270+
return ssrcToPrl
271+
}

0 commit comments

Comments
 (0)