Skip to content

Commit c9a6d2c

Browse files
committed
Forward RTCP packets from receiver to sender
Signed-off-by: Šimon Brandner <[email protected]>
1 parent b64edee commit c9a6d2c

File tree

3 files changed

+88
-59
lines changed

3 files changed

+88
-59
lines changed

src/publisher.go

Lines changed: 39 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -20,13 +20,19 @@ import (
2020
"errors"
2121
"io"
2222
"sync"
23+
"sync/atomic"
24+
"time"
2325

26+
"github.com/pion/rtcp"
2427
"github.com/pion/webrtc/v3"
2528
"github.com/sirupsen/logrus"
2629
"maunium.net/go/mautrix/event"
2730
)
2831

29-
const bufferSize = 1500
32+
const (
33+
minimalPLIInterval = time.Millisecond * 500
34+
bufferSize = 1500
35+
)
3036

3137
type Publisher struct {
3238
Track *webrtc.TrackRemote
@@ -35,6 +41,8 @@ type Publisher struct {
3541
mutex sync.RWMutex
3642
logger *logrus.Entry
3743
subscribers []*Subscriber
44+
45+
lastPLI atomic.Int64
3846
}
3947

4048
func NewPublisher(
@@ -57,7 +65,6 @@ func NewPublisher(
5765
call.Publishers = append(call.Publishers, publisher)
5866
call.mutex.Unlock()
5967

60-
go WriteRTCP(track, call.PeerConnection, publisher.logger)
6168
go publisher.WriteToSubscribers()
6269

6370
publisher.logger.Info("published track")
@@ -118,6 +125,36 @@ func (p *Publisher) Matches(trackDescription event.SFUTrackDescription) bool {
118125
return true
119126
}
120127

128+
func (p *Publisher) WriteRTCP(packets []rtcp.Packet) {
129+
packetsToSend := []rtcp.Packet{}
130+
131+
for _, packet := range packets {
132+
// Since we sometimes spam the sender with PLIs, make sure we don't send
133+
// them way too often
134+
if _, ok := packet.(*rtcp.PictureLossIndication); ok {
135+
if time.Now().UnixNano()-p.lastPLI.Load() < minimalPLIInterval.Nanoseconds() {
136+
continue
137+
}
138+
139+
p.lastPLI.Store(time.Now().UnixNano())
140+
}
141+
142+
packetsToSend = append(packetsToSend, packet)
143+
}
144+
145+
if len(packetsToSend) < 1 {
146+
return
147+
}
148+
149+
if err := p.Call.PeerConnection.WriteRTCP(packetsToSend); err != nil {
150+
if errors.Is(err, io.ErrClosedPipe) {
151+
return
152+
}
153+
154+
p.logger.WithError(err).Warn("failed to write RTCP on track")
155+
}
156+
}
157+
121158
func (p *Publisher) WriteToSubscribers() {
122159
buff := make([]byte, bufferSize)
123160

src/subscriber.go

Lines changed: 49 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -17,8 +17,11 @@ limitations under the License.
1717
package main
1818

1919
import (
20+
"errors"
21+
"io"
2022
"sync"
2123

24+
"github.com/pion/rtcp"
2225
"github.com/pion/webrtc/v3"
2326
"github.com/sirupsen/logrus"
2427
)
@@ -31,6 +34,8 @@ type Subscriber struct {
3134
call *Call
3235
sender *webrtc.RTPSender
3336
publisher *Publisher
37+
38+
ssrc uint32
3439
}
3540

3641
func NewSubscriber(call *Call) *Subscriber {
@@ -76,9 +81,12 @@ func (s *Subscriber) Subscribe(publisher *Publisher) {
7681
s.mutex.Lock()
7782
s.Track = track
7883
s.sender = sender
84+
s.ssrc = uint32(sender.GetParameters().Encodings[0].SSRC)
7985
s.publisher = publisher
8086
s.mutex.Unlock()
8187

88+
go s.writeRTCP()
89+
8290
publisher.AddSubscriber(s)
8391

8492
s.logger.Info("subscribed")
@@ -104,3 +112,44 @@ func (s *Subscriber) Unsubscribe() {
104112

105113
s.logger.Info("unsubscribed")
106114
}
115+
116+
func (s *Subscriber) writeRTCP() {
117+
if s.Track.Kind() != webrtc.RTPCodecTypeVideo {
118+
return
119+
}
120+
121+
for {
122+
packets, _, err := s.sender.ReadRTCP()
123+
if err != nil {
124+
if errors.Is(err, io.ErrClosedPipe) || errors.Is(err, io.EOF) {
125+
return
126+
}
127+
128+
s.logger.WithError(err).Warn("failed to read RTCP on track")
129+
}
130+
131+
packetsToForward := []rtcp.Packet{}
132+
readSSRC := uint32(s.publisher.Track.SSRC())
133+
134+
for _, packet := range packets {
135+
switch typedPacket := packet.(type) {
136+
// We mung the packets here, so that the SSRCs match what the
137+
// receiver expects
138+
case *rtcp.PictureLossIndication:
139+
typedPacket.SenderSSRC = s.ssrc
140+
typedPacket.MediaSSRC = readSSRC
141+
packetsToForward = append(packetsToForward, typedPacket)
142+
case *rtcp.FullIntraRequest:
143+
typedPacket.SenderSSRC = s.ssrc
144+
typedPacket.MediaSSRC = readSSRC
145+
packetsToForward = append(packetsToForward, typedPacket)
146+
}
147+
}
148+
149+
if len(packetsToForward) < 1 {
150+
continue
151+
}
152+
153+
s.publisher.WriteRTCP(packetsToForward)
154+
}
155+
}

src/utils.go

Lines changed: 0 additions & 57 deletions
This file was deleted.

0 commit comments

Comments
 (0)