88 "math"
99 "time"
1010
11+ "github.com/pion/interceptor/pkg/jitterbuffer"
1112 "github.com/pion/rtp"
1213 "github.com/pion/webrtc/v4/pkg/media"
1314)
@@ -16,7 +17,7 @@ import (
1617type SampleBuilder struct {
1718 maxLate uint16 // how many packets to wait until we get a valid Sample
1819 maxLateTimestamp uint32 // max timestamp between old and new timestamps before dropping packets
19- buffer [ math . MaxUint16 + 1 ] * rtp. Packet
20+ buffer * jitterbuffer. JitterBuffer
2021 preparedSamples [math .MaxUint16 + 1 ]* media.Sample
2122
2223 // Interface that allows us to take RTP packets to samples
@@ -60,7 +61,7 @@ type SampleBuilder struct {
6061// The depacketizer extracts media samples from RTP packets.
6162// Several depacketizers are available in package github.com/pion/rtp/codecs.
6263func New (maxLate uint16 , depacketizer rtp.Depacketizer , sampleRate uint32 , opts ... Option ) * SampleBuilder {
63- s := & SampleBuilder {maxLate : maxLate , depacketizer : depacketizer , sampleRate : sampleRate }
64+ s := & SampleBuilder {maxLate : maxLate , depacketizer : depacketizer , sampleRate : sampleRate , buffer : jitterbuffer . New ( jitterbuffer . WithMinimumPacketCount ( 1 )) }
6465 for _ , o := range opts {
6566 o (s )
6667 }
@@ -76,7 +77,7 @@ func (s *SampleBuilder) tooOld(location sampleSequenceLocation) bool {
7677 var foundTail * rtp.Packet
7778
7879 for i := location .head ; i != location .tail ; i ++ {
79- if packet := s .buffer [ i ] ; packet != nil {
80+ if packet , _ := s .buffer . PeekAtSequence ( i ) ; packet != nil {
8081 foundHead = packet
8182 break
8283 }
@@ -87,7 +88,7 @@ func (s *SampleBuilder) tooOld(location sampleSequenceLocation) bool {
8788 }
8889
8990 for i := location .tail - 1 ; i != location .head ; i -- {
90- if packet := s .buffer [ i ] ; packet != nil {
91+ if packet , _ := s .buffer . PeekAtSequence ( i ) ; packet != nil {
9192 foundTail = packet
9293 break
9394 }
@@ -105,7 +106,7 @@ func (s *SampleBuilder) fetchTimestamp(location sampleSequenceLocation) (timesta
105106 if location .empty () {
106107 return 0 , false
107108 }
108- packet := s .buffer [ location .head ]
109+ packet , _ := s .buffer . PeekAtSequence ( location .head )
109110 if packet == nil {
110111 return 0 , false
111112 }
@@ -114,7 +115,7 @@ func (s *SampleBuilder) fetchTimestamp(location sampleSequenceLocation) (timesta
114115
115116func (s * SampleBuilder ) releasePacket (i uint16 ) {
116117 var p * rtp.Packet
117- p , s . buffer [ i ] = s .buffer [ i ], nil
118+ p , _ = s .buffer . PopAtSequence ( i )
118119 if p != nil && s .packetReleaseHandler != nil {
119120 s .packetReleaseHandler (p )
120121 }
@@ -178,7 +179,7 @@ func (s *SampleBuilder) purgeBuffers(flush bool) {
178179// Push does not copy the input. If you wish to reuse
179180// this memory make sure to copy before calling Push
180181func (s * SampleBuilder ) Push (p * rtp.Packet ) {
181- s .buffer [ p . SequenceNumber ] = p
182+ s .buffer . Push ( p )
182183
183184 switch s .filled .compare (p .SequenceNumber ) {
184185 case slCompareVoid :
@@ -220,14 +221,18 @@ func (s *SampleBuilder) buildSample(purgingBuffers bool) *media.Sample {
220221
221222 var consume sampleSequenceLocation
222223
223- for i := s .active .head ; s .buffer [i ] != nil && s .active .compare (i ) != slCompareAfter ; i ++ {
224- if s .depacketizer .IsPartitionTail (s .buffer [i ].Marker , s .buffer [i ].Payload ) {
224+ for i := s .active .head ; s .active .compare (i ) != slCompareAfter ; i ++ {
225+ pkt , err := s .buffer .PeekAtSequence (i )
226+ if pkt == nil || err != nil {
227+ break
228+ }
229+ if s .depacketizer .IsPartitionTail (pkt .Marker , pkt .Payload ) {
225230 consume .head = s .active .head
226231 consume .tail = i + 1
227232 break
228233 }
229234 headTimestamp , hasData := s .fetchTimestamp (s .active )
230- if hasData && s . buffer [ i ] .Timestamp != headTimestamp {
235+ if hasData && pkt .Timestamp != headTimestamp {
231236 consume .head = s .active .head
232237 consume .tail = i
233238 break
@@ -237,8 +242,8 @@ func (s *SampleBuilder) buildSample(purgingBuffers bool) *media.Sample {
237242 if consume .empty () {
238243 return nil
239244 }
240-
241- if ! purgingBuffers && s . buffer [ consume . tail ] == nil {
245+ pkt , _ := s . buffer . PeekAtSequence ( consume . tail )
246+ if ! purgingBuffers && pkt == nil {
242247 // wait for the next packet after this set of packets to arrive
243248 // to ensure at least one post sample timestamp is known
244249 // (unless we have to release right now)
@@ -250,8 +255,9 @@ func (s *SampleBuilder) buildSample(purgingBuffers bool) *media.Sample {
250255
251256 // scan for any packet after the current and use that time stamp as the diff point
252257 for i := consume .tail ; i < s .active .tail ; i ++ {
253- if s .buffer [i ] != nil {
254- afterTimestamp = s .buffer [i ].Timestamp
258+ pkt , _ := s .buffer .PeekAtSequence (i )
259+ if pkt != nil {
260+ afterTimestamp = pkt .Timestamp
255261 break
256262 }
257263 }
@@ -261,10 +267,12 @@ func (s *SampleBuilder) buildSample(purgingBuffers bool) *media.Sample {
261267
262268 // prior to decoding all the packets, check if this packet
263269 // would end being disposed anyway
264- if ! s .depacketizer .IsPartitionHead (s .buffer [consume .head ].Payload ) {
270+ pkt , err := s .buffer .PeekAtSequence (consume .head )
271+ if pkt != nil && err == nil && ! s .depacketizer .IsPartitionHead (pkt .Payload ) {
265272 isPadding := false
266273 for i := consume .head ; i != consume .tail ; i ++ {
267- if s .lastSampleTimestamp != nil && * s .lastSampleTimestamp == s .buffer [i ].Timestamp && len (s .buffer [i ].Payload ) == 0 {
274+ pkt , _ := s .buffer .PeekAtSequence (i )
275+ if s .lastSampleTimestamp != nil && * s .lastSampleTimestamp == pkt .Timestamp && len (pkt .Payload ) == 0 {
268276 isPadding = true
269277 }
270278 }
@@ -282,15 +290,23 @@ func (s *SampleBuilder) buildSample(purgingBuffers bool) *media.Sample {
282290 var metadata interface {}
283291 var rtpHeaders []* rtp.Header
284292 for i := consume .head ; i != consume .tail ; i ++ {
285- p , err := s .depacketizer .Unmarshal (s .buffer [i ].Payload )
293+ pkt , _ := s .buffer .PeekAtSequence (i )
294+ if pkt == nil {
295+ return nil
296+ }
297+ p , err := s .depacketizer .Unmarshal (pkt .Payload )
286298 if err != nil {
287299 return nil
288300 }
289301 if i == consume .head && s .packetHeadHandler != nil {
290302 metadata = s .packetHeadHandler (s .depacketizer )
291303 }
292304 if s .returnRTPHeaders {
293- h := s .buffer [i ].Header .Clone ()
305+ pkt , err := s .buffer .PeekAtSequence (i )
306+ if err != nil {
307+ return nil
308+ }
309+ h := pkt .Header .Clone ()
294310 rtpHeaders = append (rtpHeaders , & h )
295311 }
296312
0 commit comments