diff --git a/pkg/rfc8888/interceptor.go b/pkg/rfc8888/interceptor.go index 344ec1be..1329436a 100644 --- a/pkg/rfc8888/interceptor.go +++ b/pkg/rfc8888/interceptor.go @@ -73,7 +73,7 @@ type packet struct { arrival time.Time ssrc uint32 sequenceNumber uint16 - ecn uint8 + ecn rtcp.ECN } // BindRTCPWriter lets you modify any outgoing RTCP packets. It is called once per PeerConnection. The returned method @@ -112,11 +112,20 @@ func (s *SenderInterceptor) BindRemoteStream( return 0, nil, err } + var ecn rtcp.ECN + if e, hasECN := attr["ECN"]; hasECN { + if ecnT, ok := e.(byte); ok { + ecn = rtcp.ECN(ecnT) + } else { + s.log.Error("ECN entry in attributes map is not of type byte") + } + } + p := packet{ arrival: s.now(), ssrc: header.SSRC, sequenceNumber: header.SequenceNumber, - ecn: 0, // ECN is not supported (yet). + ecn: ecn, } s.packetChan <- p diff --git a/pkg/rfc8888/recorder.go b/pkg/rfc8888/recorder.go index c5e81d46..fa489c8d 100644 --- a/pkg/rfc8888/recorder.go +++ b/pkg/rfc8888/recorder.go @@ -12,7 +12,7 @@ import ( type packetReport struct { arrivalTime time.Time - ecn uint8 + ecn rtcp.ECN } // Recorder records incoming RTP packets and their arrival times. Recorder can @@ -30,7 +30,7 @@ func NewRecorder() *Recorder { } // AddPacket writes a packet to the underlying stream. -func (r *Recorder) AddPacket(ts time.Time, ssrc uint32, seq uint16, ecn uint8) { +func (r *Recorder) AddPacket(ts time.Time, ssrc uint32, seq uint16, ecn rtcp.ECN) { stream, ok := r.streams[ssrc] if !ok { stream = newStreamLog(ssrc) diff --git a/pkg/rfc8888/stream_log.go b/pkg/rfc8888/stream_log.go index ed41f952..bf906139 100644 --- a/pkg/rfc8888/stream_log.go +++ b/pkg/rfc8888/stream_log.go @@ -32,7 +32,7 @@ func newStreamLog(ssrc uint32) *streamLog { } } -func (l *streamLog) add(ts time.Time, sequenceNumber uint16, ecn uint8) { +func (l *streamLog) add(ts time.Time, sequenceNumber uint16, ecn rtcp.ECN) { unwrappedSequenceNumber := l.sequence.Unwrap(sequenceNumber) if !l.init { l.init = true @@ -68,7 +68,7 @@ func (l *streamLog) metricsAfter(reference time.Time, maxReportBlocks int64) rtc gapDetected := false for i := offset; i <= l.lastSequenceNumberReceived; i++ { //nolint:varnamelen // i int64 received := false - ecn := uint8(0) + ecn := rtcp.ECNNonECT ato := uint16(0) if report, ok := l.log[i]; ok { received = true @@ -77,7 +77,7 @@ func (l *streamLog) metricsAfter(reference time.Time, maxReportBlocks int64) rtc } metricBlocks[i-offset] = rtcp.CCFeedbackMetricBlock{ Received: received, - ECN: rtcp.ECN(ecn), + ECN: ecn, ArrivalTimeOffset: ato, } diff --git a/pkg/rfc8888/stream_log_test.go b/pkg/rfc8888/stream_log_test.go index d5ac2c95..7906f34c 100644 --- a/pkg/rfc8888/stream_log_test.go +++ b/pkg/rfc8888/stream_log_test.go @@ -193,7 +193,7 @@ func TestStreamLogAdd(t *testing.T) { t.Run(test.name, func(t *testing.T) { sl := newStreamLog(0) for _, input := range test.inputs { - sl.add(input.ts, input.nr, input.ecn) + sl.add(input.ts, input.nr, rtcp.ECN(input.ecn)) } assert.Equal(t, test.expectedNext, sl.nextSequenceNumberToReport) assert.Equal(t, test.expectedLast, sl.lastSequenceNumberReceived) @@ -552,7 +552,7 @@ func TestStreamLogMetricsAfter(t *testing.T) { t.Run(test.name, func(t *testing.T) { sl := newStreamLog(0) for _, input := range test.inputs { - sl.add(input.ts, input.nr, input.ecn) + sl.add(input.ts, input.nr, rtcp.ECN(input.ecn)) } assert.Equal(t, test.expectedNextBefore, sl.nextSequenceNumberToReport) @@ -572,11 +572,11 @@ func TestStreamLogMetricsAfter(t *testing.T) { func TestRemoveOldestPackets(t *testing.T) { sl := newStreamLog(0) - sl.add(time.Time{}.Add(time.Second), 1, 0) + sl.add(time.Time{}.Add(time.Second), 1, rtcp.ECN(0)) now := time.Now().Add(10 * time.Second) for i := 2; i < 16386; i++ { now = now.Add(10 * time.Millisecond) - sl.add(now, uint16(i), 0) //nolint:gosec // G115 + sl.add(now, uint16(i), rtcp.ECN(0)) //nolint:gosec // G115 } metrics := sl.metricsAfter(now, maxReportsPerReportBlock) assert.Equal(t, uint16(2), metrics.BeginSequence)