Skip to content

Commit 1209fc9

Browse files
committed
Fix writer deadlocks in nack
1 parent d12daa5 commit 1209fc9

File tree

4 files changed

+226
-52
lines changed

4 files changed

+226
-52
lines changed

pkg/nack/generator_interceptor.go

Lines changed: 52 additions & 47 deletions
Original file line numberDiff line numberDiff line change
@@ -156,66 +156,71 @@ func (n *GeneratorInterceptor) loop(rtcpWriter interceptor.RTCPWriter) {
156156
for {
157157
select {
158158
case <-ticker.C:
159-
func() {
160-
n.receiveLogsMu.Lock()
161-
defer n.receiveLogsMu.Unlock()
159+
// save NACKs to send without holding the mutex during Write
160+
var toSend []rtcp.Packet
162161

163-
for ssrc, receiveLog := range n.receiveLogs {
164-
missing := receiveLog.missingSeqNumbers(n.skipLastN, missingPacketSeqNums)
162+
n.receiveLogsMu.Lock()
163+
for ssrc, receiveLog := range n.receiveLogs {
164+
missing := receiveLog.missingSeqNumbers(n.skipLastN, missingPacketSeqNums)
165165

166-
if len(missing) == 0 || n.nackCountLogs[ssrc] == nil {
167-
n.nackCountLogs[ssrc] = map[uint16]uint16{}
168-
}
169-
if len(missing) == 0 {
170-
continue
171-
}
172-
173-
nack := &rtcp.TransportLayerNack{} // nolint:ineffassign,wastedassign
174-
175-
c := 0 // nolint:varnamelen,
176-
if n.maxNacksPerPacket > 0 {
177-
for _, missingSeq := range missing {
178-
if n.nackCountLogs[ssrc][missingSeq] < n.maxNacksPerPacket {
179-
filteredMissingPacket[c] = missingSeq
180-
c++
181-
}
182-
n.nackCountLogs[ssrc][missingSeq]++
183-
}
166+
if len(missing) == 0 || n.nackCountLogs[ssrc] == nil {
167+
n.nackCountLogs[ssrc] = map[uint16]uint16{}
168+
}
169+
if len(missing) == 0 {
170+
continue
171+
}
184172

185-
if c == 0 {
186-
continue
187-
}
173+
var nack *rtcp.TransportLayerNack
188174

189-
nack = &rtcp.TransportLayerNack{
190-
SenderSSRC: senderSSRC,
191-
MediaSSRC: ssrc,
192-
Nacks: rtcp.NackPairsFromSequenceNumbers(filteredMissingPacket[:c]),
193-
}
194-
} else {
195-
nack = &rtcp.TransportLayerNack{
196-
SenderSSRC: senderSSRC,
197-
MediaSSRC: ssrc,
198-
Nacks: rtcp.NackPairsFromSequenceNumbers(missing),
175+
count := 0
176+
if n.maxNacksPerPacket > 0 {
177+
for _, missingSeq := range missing {
178+
if n.nackCountLogs[ssrc][missingSeq] < n.maxNacksPerPacket {
179+
filteredMissingPacket[count] = missingSeq
180+
count++
199181
}
182+
n.nackCountLogs[ssrc][missingSeq]++
200183
}
201184

202-
for nackSeq := range n.nackCountLogs[ssrc] {
203-
isMissing := slices.Contains(missing, nackSeq)
204-
if !isMissing {
205-
delete(n.nackCountLogs[ssrc], nackSeq)
206-
}
185+
if count == 0 {
186+
continue
207187
}
208188

209-
// clean up the count log for the ssrc if it's empty
210-
if len(n.nackCountLogs[ssrc]) == 0 {
211-
delete(n.nackCountLogs, ssrc)
189+
nack = &rtcp.TransportLayerNack{
190+
SenderSSRC: senderSSRC,
191+
MediaSSRC: ssrc,
192+
Nacks: rtcp.NackPairsFromSequenceNumbers(filteredMissingPacket[:count]),
193+
}
194+
} else {
195+
nack = &rtcp.TransportLayerNack{
196+
SenderSSRC: senderSSRC,
197+
MediaSSRC: ssrc,
198+
Nacks: rtcp.NackPairsFromSequenceNumbers(missing),
212199
}
200+
}
213201

214-
if _, err := rtcpWriter.Write([]rtcp.Packet{nack}, interceptor.Attributes{}); err != nil {
215-
n.log.Warnf("failed sending nack: %+v", err)
202+
for nackSeq := range n.nackCountLogs[ssrc] {
203+
if !slices.Contains(missing, nackSeq) {
204+
delete(n.nackCountLogs[ssrc], nackSeq)
216205
}
217206
}
218-
}()
207+
208+
// clean up the count log for the ssrc if it's empty
209+
if len(n.nackCountLogs[ssrc]) == 0 {
210+
delete(n.nackCountLogs, ssrc)
211+
}
212+
213+
toSend = append(toSend, nack)
214+
}
215+
n.receiveLogsMu.Unlock()
216+
217+
// send RTCP without holding receiveLogsMu
218+
for _, pkt := range toSend {
219+
if _, err := rtcpWriter.Write([]rtcp.Packet{pkt}, interceptor.Attributes{}); err != nil {
220+
n.log.Warnf("failed sending nack: %+v", err)
221+
}
222+
}
223+
219224
case <-n.close:
220225
return
221226
}

pkg/nack/generator_interceptor_test.go

Lines changed: 82 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -179,3 +179,85 @@ func TestGeneratorInterceptor_UnbindRemovesCorrespondingSSRC(t *testing.T) {
179179
_, ok = gen.nackCountLogs[ssrc]
180180
assert.False(t, ok, "ssrc should not be present in nackCountLogs")
181181
}
182+
183+
// reentrantRTCPWriter tries to re-acquire GeneratorInterceptor.receiveLogsMu
184+
// inside Write. If loop() calls Write while holding that mutex, this will
185+
// cause a deadlock.
186+
type reentrantRTCPWriter struct {
187+
n *GeneratorInterceptor
188+
called chan struct{}
189+
}
190+
191+
func (w *reentrantRTCPWriter) Write(pkts []rtcp.Packet, attrs interceptor.Attributes) (int, error) {
192+
// signal to the test that Write was entered.
193+
select {
194+
case <-w.called:
195+
// already closed
196+
default:
197+
close(w.called)
198+
}
199+
200+
// re-enter the interceptor's lock
201+
w.n.receiveLogsMu.Lock()
202+
defer w.n.receiveLogsMu.Unlock()
203+
204+
return len(pkts), nil
205+
}
206+
207+
// this fails if loop() calls rtcpWriter.Write while holding receiveLogsMu
208+
// but will pass if Write is called after releasing receiveLogsMu.
209+
func TestGeneratorInterceptor_NoDeadlockWithReentrantRTCPWriter(t *testing.T) {
210+
const interval = time.Millisecond * 5
211+
212+
f, err := NewGeneratorInterceptor(
213+
GeneratorSize(64),
214+
GeneratorInterval(interval),
215+
)
216+
assert.NoError(t, err)
217+
218+
i, err := f.NewInterceptor("")
219+
assert.NoError(t, err)
220+
gen, ok := i.(*GeneratorInterceptor)
221+
assert.True(t, ok, "expected *GeneratorInterceptor, got %T", i)
222+
223+
writer := &reentrantRTCPWriter{
224+
n: gen,
225+
called: make(chan struct{}),
226+
}
227+
_ = gen.BindRTCPWriter(writer)
228+
229+
// set receiveLog with a gap so that missingSeqNumbers()
230+
// returns something and causes a NACK -> Write() call.
231+
rl, err := newReceiveLog(gen.size)
232+
assert.NoError(t, err)
233+
234+
gen.receiveLogsMu.Lock()
235+
gen.receiveLogs[1] = rl
236+
gen.receiveLogsMu.Unlock()
237+
238+
// 100 and 102 received -> 101 is missing.
239+
rl.add(100)
240+
rl.add(102)
241+
242+
// wait until the writer was actually called at least once.
243+
select {
244+
case <-writer.called:
245+
// good: generator loop attempted to send a NACK
246+
case <-time.After(time.Second):
247+
assert.Fail(t, "generator did not call RTCP writer")
248+
}
249+
250+
// verify that Close() does not deadlock.
251+
done := make(chan struct{})
252+
go func() {
253+
_ = gen.Close()
254+
close(done)
255+
}()
256+
257+
select {
258+
case <-done:
259+
// no deadlock with reentrant writer
260+
case <-time.After(time.Second):
261+
assert.Fail(t, "GeneratorInterceptor.Close deadlocked with reentrant RTCP writer")
262+
}
263+
}

pkg/nack/responder_interceptor.go

Lines changed: 8 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -126,10 +126,10 @@ func (n *ResponderInterceptor) BindLocalStream(
126126
if err != nil {
127127
return 0, err
128128
}
129-
stream.rtpBufferMutex.Lock()
130-
defer stream.rtpBufferMutex.Unlock()
131129

132-
rtpBuffer.Add(pkt)
130+
stream.rtpBufferMutex.Lock()
131+
stream.rtpBuffer.Add(pkt)
132+
stream.rtpBufferMutex.Unlock()
133133

134134
return writer.Write(header, payload, attributes)
135135
},
@@ -153,10 +153,13 @@ func (n *ResponderInterceptor) resendPackets(nack *rtcp.TransportLayerNack) {
153153

154154
for i := range nack.Nacks {
155155
nack.Nacks[i].Range(func(seq uint16) bool {
156+
// save the packet under the buffer lock
156157
stream.rtpBufferMutex.Lock()
157-
defer stream.rtpBufferMutex.Unlock()
158+
p := stream.rtpBuffer.Get(seq)
159+
stream.rtpBufferMutex.Unlock()
158160

159-
if p := stream.rtpBuffer.Get(seq); p != nil {
161+
if p != nil {
162+
// send without holding rtpBufferMutex
160163
if _, err := stream.rtpWriter.Write(p.Header(), p.Payload(), interceptor.Attributes{}); err != nil {
161164
n.log.Warnf("failed resending nacked packet: %+v", err)
162165
}

pkg/nack/responder_interceptor_test.go

Lines changed: 84 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -398,3 +398,87 @@ func TestResponderInterceptor_BypassUnknownSSRCs(t *testing.T) {
398398
}
399399
}
400400
}
401+
402+
// reentrantRTPWriter tries to re-acquire localStream.rtpBufferMutex inside Write.
403+
// If BindLocalStream's wrapper calls writer.Write while holding that mutex, this
404+
// will deadlock.
405+
type reentrantRTPWriter struct {
406+
stream *localStream
407+
called chan struct{}
408+
}
409+
410+
func (w *reentrantRTPWriter) Write(header *rtp.Header, payload []byte, attrs interceptor.Attributes) (int, error) {
411+
// signal to the test that Write was entered.
412+
if w.called != nil {
413+
select {
414+
case <-w.called:
415+
// already closed
416+
default:
417+
close(w.called)
418+
}
419+
}
420+
421+
// re-enter the same mutex.
422+
w.stream.rtpBufferMutex.Lock()
423+
defer w.stream.rtpBufferMutex.Unlock()
424+
425+
return len(payload), nil
426+
}
427+
428+
// this fails if writer.Write is called while holding rtpBufferMutex and
429+
// will pass when the mutex is released before calling writer.Write.
430+
func TestResponderInterceptor_NoDeadlockWithReentrantRTPWriter(t *testing.T) {
431+
f, err := NewResponderInterceptor(
432+
ResponderSize(8),
433+
ResponderLog(logging.NewDefaultLoggerFactory().NewLogger("test")),
434+
)
435+
require.NoError(t, err)
436+
437+
i, err := f.NewInterceptor("")
438+
require.NoError(t, err)
439+
440+
resp, ok := i.(*ResponderInterceptor)
441+
require.True(t, ok, "expected *ResponderInterceptor, got %T", i)
442+
443+
info := &interceptor.StreamInfo{
444+
SSRC: 1,
445+
RTCPFeedback: []interceptor.RTCPFeedback{{Type: "nack"}},
446+
}
447+
448+
writer := &reentrantRTPWriter{
449+
called: make(chan struct{}),
450+
}
451+
452+
// BindLocalStream wraps the writer and stores a localStream in resp.streams.
453+
wrapped := resp.BindLocalStream(info, writer)
454+
455+
// fill the writer with the actual localStream instance.
456+
resp.streamsMu.Lock()
457+
writer.stream = resp.streams[info.SSRC]
458+
resp.streamsMu.Unlock()
459+
require.NotNil(t, writer.stream, "localStream should not be nil")
460+
461+
header := &rtp.Header{SSRC: 1, SequenceNumber: 1}
462+
payload := []byte{0x01}
463+
464+
done := make(chan struct{})
465+
go func() {
466+
_, _ = wrapped.Write(header, payload, interceptor.Attributes{})
467+
close(done)
468+
}()
469+
470+
// make sure the reentrant writer was actually invoked.
471+
select {
472+
case <-writer.called:
473+
// good: reentrant path hit
474+
case <-time.After(time.Second):
475+
assert.Fail(t, "wrapped writer was never called")
476+
}
477+
478+
select {
479+
case <-done:
480+
// no deadlock with reentrant writer
481+
case <-time.After(time.Second):
482+
assert.Fail(t, "ResponderInterceptor.Write deadlocked with reentrant RTP writer")
483+
}
484+
}

0 commit comments

Comments
 (0)