Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
111 changes: 110 additions & 1 deletion jitter/buffer.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
package jitter

import (
"math"
"sync"
"time"

Expand Down Expand Up @@ -43,6 +44,7 @@ type Buffer struct {

initialized bool
prevSN uint16
prevTS uint32
head *packet
tail *packet

Expand All @@ -51,6 +53,9 @@ type Buffer struct {

pool *packet
size int

maxSequenceJump *uint16
maxTimestampJump *uint32
}

type Option func(*Buffer)
Expand Down Expand Up @@ -112,6 +117,18 @@ func WithPacketLossHandler(handler func()) Option {
}
}

func WithMaxSequenceJump(max uint16) Option {
return func(b *Buffer) {
b.maxSequenceJump = &max
}
}

func WithMaxTimestampJump(max uint32) Option {
return func(b *Buffer) {
b.maxTimestampJump = &max
}
}

func (b *Buffer) WithLogger(logger logger.Logger) *Buffer {
b.logger = logger
return b
Expand Down Expand Up @@ -187,6 +204,86 @@ func (b *Buffer) Close() {
b.closed.Break()
}

func (b *Buffer) isLargeTimestampJump(current, prev uint32) bool {
if b.maxTimestampJump == nil || !b.initialized {
return false
}

maxJump := uint32(*b.maxTimestampJump)

cur := int64(current)
prv := int64(prev)

forwardDiff := cur - prv
if forwardDiff < 0 {
forwardDiff += int64(math.MaxUint32) + 1
}

backwardDiff := prv - cur
if backwardDiff < 0 {
backwardDiff += int64(math.MaxUint32) + 1
}

return min(backwardDiff, forwardDiff) > int64(maxJump)
}

func (b *Buffer) isLargeSequenceJump(current, prev uint16) bool {
if b.maxSequenceJump == nil || !b.initialized {
return false
}

maxJump := int32(*b.maxSequenceJump)

cur := int32(current)
prv := int32(prev)

forwardDiff := cur - prv
if forwardDiff < 0 {
forwardDiff += int32(math.MaxUint16) + 1
}

backwardDiff := prv - cur
if backwardDiff < 0 {
backwardDiff += int32(math.MaxUint16) + 1
}

return min(backwardDiff, forwardDiff) > maxJump
}

func (b *Buffer) reset() {
b.logger.Infow("resetting jitter buffer due to RTP discontinuity")

dropped := 0
for b.head != nil {
next := b.head.next
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

At this point packets are dropped - might be good to update PacketsDropped stats field and invoke packet loss callback which API consumer might use to hook up e.g sending packet loss indicator.

if !b.head.extPacket.Padding {
dropped++
}
b.free(b.head)
b.head = next
}
b.tail = nil

if dropped > 0 {
b.stats.PacketsDropped += uint64(dropped)
if b.onPacketLoss != nil {
Copy link
Contributor

@milos-lk milos-lk Nov 5, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thinking about this again - if there was a huge gap in seen timestamps or sequence numbers it might be good to call onPacketLoss callback regardless of whether we actually had and dropped packets. It might be more robust to move it outside of if dropped > 0 block.

b.onPacketLoss()
}
}

b.initialized = false
b.prevSN = 0
b.prevTS = 0

if !b.timer.Stop() {
select {
case <-b.timer.C:
default:
}
}
b.timer.Reset(b.latency)
}

// push adds a packet to the buffer
func (b *Buffer) push(pkt *rtp.Packet, receivedAt time.Time) {
b.stats.PacketsPushed++
Expand All @@ -197,8 +294,19 @@ func (b *Buffer) push(pkt *rtp.Packet, receivedAt time.Time) {
}
}

if b.isLargeTimestampJump(pkt.Timestamp, b.prevTS) ||
b.isLargeSequenceJump(pkt.SequenceNumber, b.prevSN) {
b.logger.Infow("large RTP discontinuity detected",
"current_ts", pkt.Timestamp,
"prev_ts", b.prevTS,
"current_sn", pkt.SequenceNumber,
"prev_sn", b.prevSN,
)
b.reset()
}

if b.initialized && before(pkt.SequenceNumber, b.prevSN) {
// packet expired
// packet expired (not after discontinuity reset)
if !pkt.Padding {
b.stats.PacketsDropped++
if b.onPacketLoss != nil {
Expand Down Expand Up @@ -354,6 +462,7 @@ func (b *Buffer) popSample() []ExtPacket {
func (b *Buffer) popHead() *packet {
c := b.head
b.prevSN = c.extPacket.SequenceNumber
b.prevTS = c.extPacket.Timestamp
b.head = c.next
if b.head == nil {
b.tail = nil
Expand Down
104 changes: 104 additions & 0 deletions jitter/buffer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -232,6 +232,80 @@ func TestDroppedPackets(t *testing.T) {
})
}

func TestLargeSequenceJump(t *testing.T) {
out := make(chan []ExtPacket, 100)
b := NewBuffer(&testDepacketizer{}, testBufferLatency, chanFunc(t, out), WithMaxSequenceJump(1000))
s := newTestStream()

// push some normal packets
for i := 0; i < 10; i++ {
b.Push(s.gen(true, true))
checkSample(t, out, 1)
}

// simulate large sequence jump (should trigger reset)
s.largeSeqJump()

// buffer should reset and accept new packets
for i := 0; i < 10; i++ {
b.Push(s.gen(true, true))
checkSample(t, out, 1)
}

stats := b.Stats()
require.Equal(t, uint64(20), stats.PacketsPushed)
require.Equal(t, uint64(20), stats.PacketsPopped)
require.Equal(t, uint64(20), stats.SamplesPopped)
}

func TestLargeTimestampJump(t *testing.T) {
out := make(chan []ExtPacket, 100)
b := NewBuffer(&testDepacketizer{}, testBufferLatency, chanFunc(t, out), WithMaxTimestampJump(48000*30))
s := &timestampStream{
seq: uint16(rand.Uint32()),
ts: uint32(rand.Uint32()),
}

// push some normal packets
for i := 0; i < 10; i++ {
b.Push(s.gen(true, true))
checkSample(t, out, 1)
}

// simulate large timestamp jump (should trigger reset)
s.largeTimestampJump()

// buffer should reset and accept new packets
for i := 0; i < 10; i++ {
b.Push(s.gen(true, true))
checkSample(t, out, 1)
}

stats := b.Stats()
require.Equal(t, uint64(20), stats.PacketsPushed)
require.Equal(t, uint64(20), stats.PacketsPopped)
require.Equal(t, uint64(20), stats.SamplesPopped)
}

func TestSequenceWraparound(t *testing.T) {
out := make(chan []ExtPacket, 100)
b := NewBuffer(&testDepacketizer{}, testBufferLatency, chanFunc(t, out))
s := &stream{
seq: math.MaxUint16 - 5, // start near wrap point
}

// push packets across wraparound boundary
for i := 0; i < 15; i++ {
b.Push(s.gen(true, true))
checkSample(t, out, 1)
}

stats := b.Stats()
require.Equal(t, uint64(15), stats.PacketsPushed)
require.Equal(t, uint64(15), stats.PacketsPopped)
require.Equal(t, uint64(0), stats.PacketsLost)
}

func checkSample(t *testing.T, out chan []ExtPacket, expected int) {
select {
case sample := <-out:
Expand Down Expand Up @@ -285,6 +359,36 @@ func (s *stream) discont() {
s.seq += math.MaxUint16 / 2
}

func (s *stream) largeSeqJump() {
s.seq += 2000 // more than MAX_SEQUENCE_JUMP (1000)
}

type timestampStream struct {
seq uint16
ts uint32
}

func (s *timestampStream) gen(head, tail bool) *rtp.Packet {
p := &rtp.Packet{
Header: rtp.Header{
Marker: tail,
SequenceNumber: s.seq,
Timestamp: s.ts,
},
Payload: make([]byte, defaultPacketSize),
}
if head {
copy(p.Payload, headerBytes)
}
s.seq++
s.ts += 960 // typical increment for 20ms at 48kHz
return p
}

func (s *timestampStream) largeTimestampJump() {
s.ts += 8000 * 60 // 60 seconds worth of samples (more than MAX_TIMESTAMP_JUMP)
}

const defaultPacketSize = 200

var headerBytes = []byte{0xaa, 0xaa}
Expand Down
Loading