Skip to content

Commit 9699a40

Browse files
louislouis
authored andcommitted
jitter: Add automatic buffer reset on large RTP discontinuities
This commit improves the resilience of the jitter buffer by automatically detecting and handling large RTP discontinuities in both sequence numbers and timestamps. New features: - Detect large sequence number jumps (>1000) and timestamp jumps (>30s) - Automatically reset buffer state when discontinuities are detected - Add warning logs for operational visibility when jumps occur - Track previous timestamp to enable timestamp discontinuity detection Bug fixes: - Prevent buffer from staying in invalid state after large gaps - Improve packet expiration handling after discontinuities to reduce premature drops and delayed emission The reset mechanism clears all buffered packets and reinitializes the buffer state, ensuring clean recovery from stream interruptions. Tests: - Add TestLargeSequenceJump to verify sequence discontinuity handling - Add TestLargeTimestampJump to verify timestamp discontinuity handling - Add TestSequenceWraparound to ensure wraparound boundaries work correctly
1 parent a84c3b4 commit 9699a40

File tree

2 files changed

+187
-1
lines changed

2 files changed

+187
-1
lines changed

jitter/buffer.go

Lines changed: 83 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -43,6 +43,7 @@ type Buffer struct {
4343

4444
initialized bool
4545
prevSN uint16
46+
prevTS uint32
4647
head *packet
4748
tail *packet
4849

@@ -187,6 +188,75 @@ func (b *Buffer) Close() {
187188
b.closed.Break()
188189
}
189190

191+
func (b *Buffer) isLargeTimestampJump(current, prev uint32) bool {
192+
const MAX_TIMESTAMP_JUMP = 8000 * 30 // 30 seconds at 8kHz
193+
194+
if !b.initialized {
195+
return false
196+
}
197+
198+
cur := int64(current)
199+
prv := int64(prev)
200+
201+
forwardDiff := cur - prv
202+
if forwardDiff < 0 {
203+
forwardDiff += (1 << 32) // handle 32-bit wrap-around
204+
}
205+
206+
backwardDiff := prv - cur
207+
if backwardDiff < 0 {
208+
backwardDiff += (1 << 32) // handle 32-bit wrap-around
209+
}
210+
211+
return min(backwardDiff, forwardDiff) > MAX_TIMESTAMP_JUMP
212+
}
213+
214+
func (b *Buffer) isLargeSequenceJump(current, prev uint16) bool {
215+
const MAX_SEQUENCE_JUMP = 1000
216+
217+
if !b.initialized {
218+
return false
219+
}
220+
221+
cur := int32(current)
222+
prv := int32(prev)
223+
224+
forwardDiff := cur - prv
225+
if forwardDiff < 0 {
226+
forwardDiff += 65536 // handle wrap-around
227+
}
228+
229+
backwardDiff := prv - cur
230+
if backwardDiff < 0 {
231+
backwardDiff += 65536 // handle wrap-around
232+
}
233+
234+
return min(backwardDiff, forwardDiff) > MAX_SEQUENCE_JUMP
235+
}
236+
237+
func (b *Buffer) reset() {
238+
b.logger.Infow("resetting jitter buffer due to RTP discontinuity")
239+
240+
for b.head != nil {
241+
next := b.head.next
242+
b.free(b.head)
243+
b.head = next
244+
}
245+
b.tail = nil
246+
247+
b.initialized = false
248+
b.prevSN = 0
249+
b.prevTS = 0
250+
251+
if !b.timer.Stop() {
252+
select {
253+
case <-b.timer.C:
254+
default:
255+
}
256+
}
257+
b.timer.Reset(b.latency)
258+
}
259+
190260
// push adds a packet to the buffer
191261
func (b *Buffer) push(pkt *rtp.Packet, receivedAt time.Time) {
192262
b.stats.PacketsPushed++
@@ -197,8 +267,19 @@ func (b *Buffer) push(pkt *rtp.Packet, receivedAt time.Time) {
197267
}
198268
}
199269

270+
if b.isLargeTimestampJump(pkt.Timestamp, b.prevTS) ||
271+
b.isLargeSequenceJump(pkt.SequenceNumber, b.prevSN) {
272+
b.logger.Infow("large RTP discontinuity detected",
273+
"current_ts", pkt.Timestamp,
274+
"prev_ts", b.prevTS,
275+
"current_sn", pkt.SequenceNumber,
276+
"prev_sn", b.prevSN,
277+
)
278+
b.reset()
279+
}
280+
200281
if b.initialized && before(pkt.SequenceNumber, b.prevSN) {
201-
// packet expired
282+
// packet expired (not after discontinuity reset)
202283
if !pkt.Padding {
203284
b.stats.PacketsDropped++
204285
if b.onPacketLoss != nil {
@@ -354,6 +435,7 @@ func (b *Buffer) popSample() []ExtPacket {
354435
func (b *Buffer) popHead() *packet {
355436
c := b.head
356437
b.prevSN = c.extPacket.SequenceNumber
438+
b.prevTS = c.extPacket.Timestamp
357439
b.head = c.next
358440
if b.head == nil {
359441
b.tail = nil

jitter/buffer_test.go

Lines changed: 104 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -232,6 +232,80 @@ func TestDroppedPackets(t *testing.T) {
232232
})
233233
}
234234

235+
func TestLargeSequenceJump(t *testing.T) {
236+
out := make(chan []ExtPacket, 100)
237+
b := NewBuffer(&testDepacketizer{}, testBufferLatency, chanFunc(t, out))
238+
s := newTestStream()
239+
240+
// push some normal packets
241+
for i := 0; i < 10; i++ {
242+
b.Push(s.gen(true, true))
243+
checkSample(t, out, 1)
244+
}
245+
246+
// simulate large sequence jump (should trigger reset)
247+
s.largeSeqJump()
248+
249+
// buffer should reset and accept new packets
250+
for i := 0; i < 10; i++ {
251+
b.Push(s.gen(true, true))
252+
checkSample(t, out, 1)
253+
}
254+
255+
stats := b.Stats()
256+
require.Equal(t, uint64(20), stats.PacketsPushed)
257+
require.Equal(t, uint64(20), stats.PacketsPopped)
258+
require.Equal(t, uint64(20), stats.SamplesPopped)
259+
}
260+
261+
func TestLargeTimestampJump(t *testing.T) {
262+
out := make(chan []ExtPacket, 100)
263+
b := NewBuffer(&testDepacketizer{}, testBufferLatency, chanFunc(t, out))
264+
s := &timestampStream{
265+
seq: uint16(rand.Uint32()),
266+
ts: uint32(rand.Uint32()),
267+
}
268+
269+
// push some normal packets
270+
for i := 0; i < 10; i++ {
271+
b.Push(s.gen(true, true))
272+
checkSample(t, out, 1)
273+
}
274+
275+
// simulate large timestamp jump (should trigger reset)
276+
s.largeTimestampJump()
277+
278+
// buffer should reset and accept new packets
279+
for i := 0; i < 10; i++ {
280+
b.Push(s.gen(true, true))
281+
checkSample(t, out, 1)
282+
}
283+
284+
stats := b.Stats()
285+
require.Equal(t, uint64(20), stats.PacketsPushed)
286+
require.Equal(t, uint64(20), stats.PacketsPopped)
287+
require.Equal(t, uint64(20), stats.SamplesPopped)
288+
}
289+
290+
func TestSequenceWraparound(t *testing.T) {
291+
out := make(chan []ExtPacket, 100)
292+
b := NewBuffer(&testDepacketizer{}, testBufferLatency, chanFunc(t, out))
293+
s := &stream{
294+
seq: math.MaxUint16 - 5, // start near wrap point
295+
}
296+
297+
// push packets across wraparound boundary
298+
for i := 0; i < 15; i++ {
299+
b.Push(s.gen(true, true))
300+
checkSample(t, out, 1)
301+
}
302+
303+
stats := b.Stats()
304+
require.Equal(t, uint64(15), stats.PacketsPushed)
305+
require.Equal(t, uint64(15), stats.PacketsPopped)
306+
require.Equal(t, uint64(0), stats.PacketsLost)
307+
}
308+
235309
func checkSample(t *testing.T, out chan []ExtPacket, expected int) {
236310
select {
237311
case sample := <-out:
@@ -285,6 +359,36 @@ func (s *stream) discont() {
285359
s.seq += math.MaxUint16 / 2
286360
}
287361

362+
func (s *stream) largeSeqJump() {
363+
s.seq += 2000 // more than MAX_SEQUENCE_JUMP (1000)
364+
}
365+
366+
type timestampStream struct {
367+
seq uint16
368+
ts uint32
369+
}
370+
371+
func (s *timestampStream) gen(head, tail bool) *rtp.Packet {
372+
p := &rtp.Packet{
373+
Header: rtp.Header{
374+
Marker: tail,
375+
SequenceNumber: s.seq,
376+
Timestamp: s.ts,
377+
},
378+
Payload: make([]byte, defaultPacketSize),
379+
}
380+
if head {
381+
copy(p.Payload, headerBytes)
382+
}
383+
s.seq++
384+
s.ts += 160 // typical increment for 20ms at 8kHz
385+
return p
386+
}
387+
388+
func (s *timestampStream) largeTimestampJump() {
389+
s.ts += 8000 * 60 // 60 seconds worth of samples (more than MAX_TIMESTAMP_JUMP)
390+
}
391+
288392
const defaultPacketSize = 200
289393

290394
var headerBytes = []byte{0xaa, 0xaa}

0 commit comments

Comments
 (0)