Skip to content

Commit f7f760f

Browse files
robin-raymondSean DuBois
authored andcommitted
Fix infinitate loop possible in sample builder
Active becomes "empty" after consuming the packets, but instead of the "filled" getting purged by "purgeConsumedBuffers", it doesn't because active is now empty and empty buffers cannot cause any purging. So the solution is to purge the "consumed" area first, then "active". Fixes #1810
1 parent d625f6f commit f7f760f

File tree

2 files changed

+35
-3
lines changed

2 files changed

+35
-3
lines changed

pkg/media/samplebuilder/samplebuilder.go

Lines changed: 21 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -112,7 +112,23 @@ func (s *SampleBuilder) releasePacket(i uint16) {
112112
// purgeConsumedBuffers clears all buffers that have already been consumed by
113113
// popping.
114114
func (s *SampleBuilder) purgeConsumedBuffers() {
115-
for s.active.compare(s.filled.head) == slCompareBefore && s.filled.hasData() {
115+
s.purgeConsumedLocation(s.active, false)
116+
}
117+
118+
// purgeConsumedLocation clears all buffers that have already been consumed
119+
// during a sample building method.
120+
func (s *SampleBuilder) purgeConsumedLocation(consume sampleSequenceLocation, forceConsume bool) {
121+
if !s.filled.hasData() {
122+
return
123+
}
124+
125+
switch consume.compare(s.filled.head) {
126+
case slCompareInside:
127+
if !forceConsume {
128+
break
129+
}
130+
fallthrough
131+
case slCompareBefore:
116132
s.releasePacket(s.filled.head)
117133
s.filled.head++
118134
}
@@ -231,15 +247,16 @@ func (s *SampleBuilder) buildSample(purgingBuffers bool) *media.Sample {
231247
if s.partitionHeadChecker != nil {
232248
if !s.partitionHeadChecker.IsPartitionHead(s.buffer[consume.head].Payload) {
233249
s.droppedPackets += consume.count()
250+
s.purgeConsumedLocation(consume, true)
234251
s.purgeConsumedBuffers()
235252
return nil
236253
}
237254
}
238255

239256
// merge all the buffers into a sample
240257
data := []byte{}
241-
for ; consume.head != consume.tail; consume.head++ {
242-
p, err := s.depacketizer.Unmarshal(s.buffer[consume.head].Payload)
258+
for i := consume.head; i != consume.tail; i++ {
259+
p, err := s.depacketizer.Unmarshal(s.buffer[i].Payload)
243260
if err != nil {
244261
return nil
245262
}
@@ -259,6 +276,7 @@ func (s *SampleBuilder) buildSample(purgingBuffers bool) *media.Sample {
259276
s.preparedSamples[s.prepared.tail] = sample
260277
s.prepared.tail++
261278

279+
s.purgeConsumedLocation(consume, true)
262280
s.purgeConsumedBuffers()
263281

264282
return sample

pkg/media/samplebuilder/samplebuilder_test.go

Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -328,6 +328,20 @@ func TestSampleBuilderCleanReference(t *testing.T) {
328328
}
329329
}
330330

331+
func TestSampleBuilderPushMaxZero(t *testing.T) {
332+
// Test packets released via 'maxLate' of zero.
333+
pkts := []rtp.Packet{
334+
{Header: rtp.Header{SequenceNumber: 0, Timestamp: 0, Marker: true}, Payload: []byte{0x01}},
335+
}
336+
s := New(0, &fakeDepacketizer{}, 1, WithPartitionHeadChecker(
337+
&fakePartitionHeadChecker{headBytes: []byte{0x01}},
338+
))
339+
s.Push(&pkts[0])
340+
if sample := s.Pop(); sample == nil {
341+
t.Error("Should expect a popped sample")
342+
}
343+
}
344+
331345
func TestSampleBuilderWithPacketReleaseHandler(t *testing.T) {
332346
var released []*rtp.Packet
333347
fakePacketReleaseHandler := func(p *rtp.Packet) {

0 commit comments

Comments
 (0)