88 "math"
99 "time"
1010
11+ "github.com/pion/interceptor/pkg/jitterbuffer"
1112 "github.com/pion/rtp"
1213 "github.com/pion/webrtc/v4/pkg/media"
1314)
@@ -16,7 +17,7 @@ import (
1617type SampleBuilder struct {
1718 maxLate uint16 // how many packets to wait until we get a valid Sample
1819 maxLateTimestamp uint32 // max timestamp between old and new timestamps before dropping packets
19- buffer [ math . MaxUint16 + 1 ] * rtp. Packet
20+ buffer * jitterbuffer. JitterBuffer
2021 preparedSamples [math .MaxUint16 + 1 ]* media.Sample
2122
2223 // Interface that allows us to take RTP packets to samples
@@ -60,7 +61,7 @@ type SampleBuilder struct {
6061// The depacketizer extracts media samples from RTP packets.
6162// Several depacketizers are available in package github.com/pion/rtp/codecs.
6263func New (maxLate uint16 , depacketizer rtp.Depacketizer , sampleRate uint32 , opts ... Option ) * SampleBuilder {
63- s := & SampleBuilder {maxLate : maxLate , depacketizer : depacketizer , sampleRate : sampleRate }
64+ s := & SampleBuilder {maxLate : maxLate , depacketizer : depacketizer , sampleRate : sampleRate , buffer : jitterbuffer . New ( jitterbuffer . WithMinimumPacketCount ( 1 )) }
6465 for _ , o := range opts {
6566 o (s )
6667 }
@@ -77,7 +78,7 @@ func (s *SampleBuilder) tooOld(location sampleSequenceLocation) bool {
7778 var foundTail * rtp.Packet
7879
7980 for i := location .head ; i != location .tail ; i ++ {
80- if packet := s .buffer [ i ] ; packet != nil {
81+ if packet , _ := s .buffer . PeekAtSequence ( i ) ; packet != nil {
8182 foundHead = packet
8283
8384 break
@@ -89,7 +90,7 @@ func (s *SampleBuilder) tooOld(location sampleSequenceLocation) bool {
8990 }
9091
9192 for i := location .tail - 1 ; i != location .head ; i -- {
92- if packet := s .buffer [ i ] ; packet != nil {
93+ if packet , _ := s .buffer . PeekAtSequence ( i ) ; packet != nil {
9394 foundTail = packet
9495
9596 break
@@ -108,7 +109,7 @@ func (s *SampleBuilder) fetchTimestamp(location sampleSequenceLocation) (timesta
108109 if location .empty () {
109110 return 0 , false
110111 }
111- packet := s .buffer [ location .head ]
112+ packet , _ := s .buffer . PeekAtSequence ( location .head )
112113 if packet == nil {
113114 return 0 , false
114115 }
@@ -118,7 +119,7 @@ func (s *SampleBuilder) fetchTimestamp(location sampleSequenceLocation) (timesta
118119
119120func (s * SampleBuilder ) releasePacket (i uint16 ) {
120121 var p * rtp.Packet
121- p , s . buffer [ i ] = s .buffer [ i ], nil
122+ p , _ = s .buffer . PopAtSequence ( i )
122123 if p != nil && s .packetReleaseHandler != nil {
123124 s .packetReleaseHandler (p )
124125 }
@@ -183,7 +184,7 @@ func (s *SampleBuilder) purgeBuffers(flush bool) {
183184// Push does not copy the input. If you wish to reuse
184185// this memory make sure to copy before calling Push.
185186func (s * SampleBuilder ) Push (packet * rtp.Packet ) {
186- s .buffer [ packet . SequenceNumber ] = packet
187+ s .buffer . Push ( packet )
187188
188189 switch s .filled .compare (packet .SequenceNumber ) {
189190 case slCompareVoid :
@@ -226,15 +227,19 @@ func (s *SampleBuilder) buildSample(purgingBuffers bool) *media.Sample {
226227
227228 var consume sampleSequenceLocation
228229
229- for i := s .active .head ; s .buffer [i ] != nil && s .active .compare (i ) != slCompareAfter ; i ++ {
230- if s .depacketizer .IsPartitionTail (s .buffer [i ].Marker , s .buffer [i ].Payload ) {
230+ for i := s .active .head ; s .active .compare (i ) != slCompareAfter ; i ++ {
231+ pkt , err := s .buffer .PeekAtSequence (i )
232+ if pkt == nil || err != nil {
233+ break
234+ }
235+ if s .depacketizer .IsPartitionTail (pkt .Marker , pkt .Payload ) {
231236 consume .head = s .active .head
232237 consume .tail = i + 1
233238
234239 break
235240 }
236241 headTimestamp , hasData := s .fetchTimestamp (s .active )
237- if hasData && s . buffer [ i ] .Timestamp != headTimestamp {
242+ if hasData && pkt .Timestamp != headTimestamp {
238243 consume .head = s .active .head
239244 consume .tail = i
240245
@@ -245,8 +250,8 @@ func (s *SampleBuilder) buildSample(purgingBuffers bool) *media.Sample {
245250 if consume .empty () {
246251 return nil
247252 }
248-
249- if ! purgingBuffers && s . buffer [ consume . tail ] == nil {
253+ pkt , _ := s . buffer . PeekAtSequence ( consume . tail )
254+ if ! purgingBuffers && pkt == nil {
250255 // wait for the next packet after this set of packets to arrive
251256 // to ensure at least one post sample timestamp is known
252257 // (unless we have to release right now)
@@ -258,9 +263,9 @@ func (s *SampleBuilder) buildSample(purgingBuffers bool) *media.Sample {
258263
259264 // scan for any packet after the current and use that time stamp as the diff point
260265 for i := consume .tail ; i < s .active .tail ; i ++ {
261- if s .buffer [ i ] != nil {
262- afterTimestamp = s . buffer [ i ]. Timestamp
263-
266+ pkt , _ := s .buffer . PeekAtSequence ( i )
267+ if pkt != nil {
268+ afterTimestamp = pkt . Timestamp
264269 break
265270 }
266271 }
@@ -270,10 +275,12 @@ func (s *SampleBuilder) buildSample(purgingBuffers bool) *media.Sample {
270275
271276 // prior to decoding all the packets, check if this packet
272277 // would end being disposed anyway
273- if ! s .depacketizer .IsPartitionHead (s .buffer [consume .head ].Payload ) {
278+ pkt , err := s .buffer .PeekAtSequence (consume .head )
279+ if pkt != nil && err == nil && ! s .depacketizer .IsPartitionHead (pkt .Payload ) {
274280 isPadding := false
275281 for i := consume .head ; i != consume .tail ; i ++ {
276- if s .lastSampleTimestamp != nil && * s .lastSampleTimestamp == s .buffer [i ].Timestamp && len (s .buffer [i ].Payload ) == 0 {
282+ pkt , _ := s .buffer .PeekAtSequence (i )
283+ if s .lastSampleTimestamp != nil && * s .lastSampleTimestamp == pkt .Timestamp && len (pkt .Payload ) == 0 {
277284 isPadding = true
278285 }
279286 }
@@ -292,15 +299,23 @@ func (s *SampleBuilder) buildSample(purgingBuffers bool) *media.Sample {
292299 var metadata interface {}
293300 var rtpHeaders []* rtp.Header
294301 for i := consume .head ; i != consume .tail ; i ++ {
295- payload , err := s .depacketizer .Unmarshal (s .buffer [i ].Payload )
302+ pkt , _ := s .buffer .PeekAtSequence (i )
303+ if pkt == nil {
304+ return nil
305+ }
306+ p , err := s .depacketizer .Unmarshal (pkt .Payload )
296307 if err != nil {
297308 return nil
298309 }
299310 if i == consume .head && s .packetHeadHandler != nil {
300311 metadata = s .packetHeadHandler (s .depacketizer )
301312 }
302313 if s .returnRTPHeaders {
303- h := s .buffer [i ].Header .Clone ()
314+ pkt , err := s .buffer .PeekAtSequence (i )
315+ if err != nil {
316+ return nil
317+ }
318+ h := pkt .Header .Clone ()
304319 rtpHeaders = append (rtpHeaders , & h )
305320 }
306321
0 commit comments