Skip to content

Commit 289432d

Browse files
authored
Fix race in nack responder (#311)
1 parent 60fb984 commit 289432d

File tree

2 files changed

+37
-4
lines changed

2 files changed

+37
-4
lines changed

pkg/nack/responder_interceptor.go

Lines changed: 5 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -105,11 +105,12 @@ func (n *ResponderInterceptor) BindLocalStream(info *interceptor.StreamInfo, wri
105105

106106
// error is already checked in NewGeneratorInterceptor
107107
rtpBuffer, _ := rtpbuffer.NewRTPBuffer(n.size)
108-
n.streamsMu.Lock()
109-
n.streams[info.SSRC] = &localStream{
108+
stream := &localStream{
110109
rtpBuffer: rtpBuffer,
111110
rtpWriter: writer,
112111
}
112+
n.streamsMu.Lock()
113+
n.streams[info.SSRC] = stream
113114
n.streamsMu.Unlock()
114115

115116
return interceptor.RTPWriterFunc(func(header *rtp.Header, payload []byte, attributes interceptor.Attributes) (int, error) {
@@ -122,8 +123,8 @@ func (n *ResponderInterceptor) BindLocalStream(info *interceptor.StreamInfo, wri
122123
if err != nil {
123124
return 0, err
124125
}
125-
n.streams[info.SSRC].rtpBufferMutex.Lock()
126-
defer n.streams[info.SSRC].rtpBufferMutex.Unlock()
126+
stream.rtpBufferMutex.Lock()
127+
defer stream.rtpBufferMutex.Unlock()
127128

128129
rtpBuffer.Add(pkt)
129130

pkg/nack/responder_interceptor_test.go

Lines changed: 32 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,7 @@ package nack
55

66
import (
77
"encoding/binary"
8+
"sync"
89
"testing"
910
"time"
1011

@@ -153,6 +154,37 @@ func TestResponderInterceptor_Race(t *testing.T) {
153154
}
154155
}
155156

157+
// this test is only useful when being run with the race detector, it won't fail otherwise:
158+
//
159+
// go test -race ./pkg/nack/
160+
func TestResponderInterceptor_RaceConcurrentStreams(t *testing.T) {
161+
f, err := NewResponderInterceptor(
162+
ResponderSize(32768),
163+
ResponderLog(logging.NewDefaultLoggerFactory().NewLogger("test")),
164+
)
165+
require.NoError(t, err)
166+
167+
i, err := f.NewInterceptor("")
168+
require.NoError(t, err)
169+
170+
var wg sync.WaitGroup
171+
for j := 0; j < 5; j++ {
172+
stream := test.NewMockStream(&interceptor.StreamInfo{
173+
SSRC: 1,
174+
RTCPFeedback: []interceptor.RTCPFeedback{{Type: "nack"}},
175+
}, i)
176+
wg.Add(1)
177+
go func() {
178+
for seqNum := uint16(0); seqNum < 500; seqNum++ {
179+
require.NoError(t, stream.WriteRTP(&rtp.Packet{Header: rtp.Header{SequenceNumber: seqNum}}))
180+
}
181+
wg.Done()
182+
}()
183+
}
184+
185+
wg.Wait()
186+
}
187+
156188
func TestResponderInterceptor_StreamFilter(t *testing.T) {
157189
f, err := NewResponderInterceptor(
158190
ResponderSize(8),

0 commit comments

Comments
 (0)