From 64dfb378434d4e2842b0f6ae69abfa117472d7aa Mon Sep 17 00:00:00 2001 From: Amirmohammad Ghasemi Date: Sun, 9 Nov 2025 16:55:13 -0500 Subject: [PATCH 1/2] ECN support --- pkg/rfc8888/interceptor.go | 13 +++++++++++-- pkg/rfc8888/recorder.go | 4 ++-- pkg/rfc8888/stream_log.go | 6 +++--- pkg/rfc8888/stream_log_test.go | 8 ++++---- 4 files changed, 20 insertions(+), 11 deletions(-) diff --git a/pkg/rfc8888/interceptor.go b/pkg/rfc8888/interceptor.go index 344ec1be..bf04ae37 100644 --- a/pkg/rfc8888/interceptor.go +++ b/pkg/rfc8888/interceptor.go @@ -73,7 +73,7 @@ type packet struct { arrival time.Time ssrc uint32 sequenceNumber uint16 - ecn uint8 + ecn rtcp.ECN } // BindRTCPWriter lets you modify any outgoing RTCP packets. It is called once per PeerConnection. The returned method @@ -112,11 +112,20 @@ func (s *SenderInterceptor) BindRemoteStream( return 0, nil, err } + var ecn rtcp.ECN + if e, hasECN := attr["ECN"]; hasECN { + if ecnT, ok := e.(rtcp.ECN); ok { + ecn = ecnT + } else { + s.log.Error("ECN entry in attributes map is not of rtcp.ECN type") + } + } + p := packet{ arrival: s.now(), ssrc: header.SSRC, sequenceNumber: header.SequenceNumber, - ecn: 0, // ECN is not supported (yet). + ecn: ecn, } s.packetChan <- p diff --git a/pkg/rfc8888/recorder.go b/pkg/rfc8888/recorder.go index c5e81d46..fa489c8d 100644 --- a/pkg/rfc8888/recorder.go +++ b/pkg/rfc8888/recorder.go @@ -12,7 +12,7 @@ import ( type packetReport struct { arrivalTime time.Time - ecn uint8 + ecn rtcp.ECN } // Recorder records incoming RTP packets and their arrival times. Recorder can @@ -30,7 +30,7 @@ func NewRecorder() *Recorder { } // AddPacket writes a packet to the underlying stream. -func (r *Recorder) AddPacket(ts time.Time, ssrc uint32, seq uint16, ecn uint8) { +func (r *Recorder) AddPacket(ts time.Time, ssrc uint32, seq uint16, ecn rtcp.ECN) { stream, ok := r.streams[ssrc] if !ok { stream = newStreamLog(ssrc) diff --git a/pkg/rfc8888/stream_log.go b/pkg/rfc8888/stream_log.go index ed41f952..bf906139 100644 --- a/pkg/rfc8888/stream_log.go +++ b/pkg/rfc8888/stream_log.go @@ -32,7 +32,7 @@ func newStreamLog(ssrc uint32) *streamLog { } } -func (l *streamLog) add(ts time.Time, sequenceNumber uint16, ecn uint8) { +func (l *streamLog) add(ts time.Time, sequenceNumber uint16, ecn rtcp.ECN) { unwrappedSequenceNumber := l.sequence.Unwrap(sequenceNumber) if !l.init { l.init = true @@ -68,7 +68,7 @@ func (l *streamLog) metricsAfter(reference time.Time, maxReportBlocks int64) rtc gapDetected := false for i := offset; i <= l.lastSequenceNumberReceived; i++ { //nolint:varnamelen // i int64 received := false - ecn := uint8(0) + ecn := rtcp.ECNNonECT ato := uint16(0) if report, ok := l.log[i]; ok { received = true @@ -77,7 +77,7 @@ func (l *streamLog) metricsAfter(reference time.Time, maxReportBlocks int64) rtc } metricBlocks[i-offset] = rtcp.CCFeedbackMetricBlock{ Received: received, - ECN: rtcp.ECN(ecn), + ECN: ecn, ArrivalTimeOffset: ato, } diff --git a/pkg/rfc8888/stream_log_test.go b/pkg/rfc8888/stream_log_test.go index d5ac2c95..7906f34c 100644 --- a/pkg/rfc8888/stream_log_test.go +++ b/pkg/rfc8888/stream_log_test.go @@ -193,7 +193,7 @@ func TestStreamLogAdd(t *testing.T) { t.Run(test.name, func(t *testing.T) { sl := newStreamLog(0) for _, input := range test.inputs { - sl.add(input.ts, input.nr, input.ecn) + sl.add(input.ts, input.nr, rtcp.ECN(input.ecn)) } assert.Equal(t, test.expectedNext, sl.nextSequenceNumberToReport) assert.Equal(t, test.expectedLast, sl.lastSequenceNumberReceived) @@ -552,7 +552,7 @@ func TestStreamLogMetricsAfter(t *testing.T) { t.Run(test.name, func(t *testing.T) { sl := newStreamLog(0) for _, input := range test.inputs { - sl.add(input.ts, input.nr, input.ecn) + sl.add(input.ts, input.nr, rtcp.ECN(input.ecn)) } assert.Equal(t, test.expectedNextBefore, sl.nextSequenceNumberToReport) @@ -572,11 +572,11 @@ func TestStreamLogMetricsAfter(t *testing.T) { func TestRemoveOldestPackets(t *testing.T) { sl := newStreamLog(0) - sl.add(time.Time{}.Add(time.Second), 1, 0) + sl.add(time.Time{}.Add(time.Second), 1, rtcp.ECN(0)) now := time.Now().Add(10 * time.Second) for i := 2; i < 16386; i++ { now = now.Add(10 * time.Millisecond) - sl.add(now, uint16(i), 0) //nolint:gosec // G115 + sl.add(now, uint16(i), rtcp.ECN(0)) //nolint:gosec // G115 } metrics := sl.metricsAfter(now, maxReportsPerReportBlock) assert.Equal(t, uint16(2), metrics.BeginSequence) From 7f6364bfa44530b712347292a15e5624696f40b1 Mon Sep 17 00:00:00 2001 From: Amirmohammad Ghasemi Date: Fri, 14 Nov 2025 14:27:05 -0500 Subject: [PATCH 2/2] Adapt with transport changes --- pkg/rfc8888/interceptor.go | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/pkg/rfc8888/interceptor.go b/pkg/rfc8888/interceptor.go index bf04ae37..1329436a 100644 --- a/pkg/rfc8888/interceptor.go +++ b/pkg/rfc8888/interceptor.go @@ -114,10 +114,10 @@ func (s *SenderInterceptor) BindRemoteStream( var ecn rtcp.ECN if e, hasECN := attr["ECN"]; hasECN { - if ecnT, ok := e.(rtcp.ECN); ok { - ecn = ecnT + if ecnT, ok := e.(byte); ok { + ecn = rtcp.ECN(ecnT) } else { - s.log.Error("ECN entry in attributes map is not of rtcp.ECN type") + s.log.Error("ECN entry in attributes map is not of type byte") } }