Skip to content

Commit 7a76905

Browse files
Merge pull request #45 from matrix-org/SimonBrandner/feat/forward-rtcp
2 parents 6af99c8 + 8dd122a commit 7a76905

File tree

3 files changed

+79
-59
lines changed

3 files changed

+79
-59
lines changed

src/publisher.go

Lines changed: 49 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -20,13 +20,19 @@ import (
2020
"errors"
2121
"io"
2222
"sync"
23+
"sync/atomic"
24+
"time"
2325

26+
"github.com/pion/rtcp"
2427
"github.com/pion/webrtc/v3"
2528
"github.com/sirupsen/logrus"
2629
"maunium.net/go/mautrix/event"
2730
)
2831

29-
const bufferSize = 1500
32+
const (
33+
minimalPLIInterval = time.Millisecond * 500
34+
bufferSize = 1500
35+
)
3036

3137
type Publisher struct {
3238
Track *webrtc.TrackRemote
@@ -35,6 +41,8 @@ type Publisher struct {
3541
mutex sync.RWMutex
3642
logger *logrus.Entry
3743
subscribers []*Subscriber
44+
45+
lastPLI atomic.Int64
3846
}
3947

4048
func NewPublisher(
@@ -57,7 +65,6 @@ func NewPublisher(
5765
call.Publishers = append(call.Publishers, publisher)
5866
call.mutex.Unlock()
5967

60-
go WriteRTCP(track, call.PeerConnection, publisher.logger)
6168
go publisher.WriteToSubscribers()
6269

6370
publisher.logger.Info("published track")
@@ -118,6 +125,46 @@ func (p *Publisher) Matches(trackDescription event.SFUTrackDescription) bool {
118125
return true
119126
}
120127

128+
func (p *Publisher) WriteRTCP(packets []rtcp.Packet) {
129+
packetsToSend := []rtcp.Packet{}
130+
readSSRC := uint32(p.Track.SSRC())
131+
132+
for _, packet := range packets {
133+
switch typedPacket := packet.(type) {
134+
// We mung the packets here, so that the SSRCs match what the
135+
// receiver expects:
136+
// The media SSRC is the SSRC of the media about which the packet is
137+
// reporting; therefore, we mung it to be the SSRC of the publishing
138+
// participant's track. Without this, it would be SSRC of the SFU's
139+
// track which isn't right
140+
case *rtcp.PictureLossIndication:
141+
// Since we sometimes spam the sender with PLIs, make sure we don't send
142+
// them way too often
143+
if time.Now().UnixNano()-p.lastPLI.Load() < minimalPLIInterval.Nanoseconds() {
144+
continue
145+
}
146+
147+
p.lastPLI.Store(time.Now().UnixNano())
148+
149+
typedPacket.MediaSSRC = readSSRC
150+
packetsToSend = append(packetsToSend, typedPacket)
151+
case *rtcp.FullIntraRequest:
152+
typedPacket.MediaSSRC = readSSRC
153+
packetsToSend = append(packetsToSend, typedPacket)
154+
}
155+
156+
packetsToSend = append(packetsToSend, packet)
157+
}
158+
159+
if len(packetsToSend) != 0 {
160+
if err := p.Call.PeerConnection.WriteRTCP(packetsToSend); err != nil {
161+
if !errors.Is(err, io.ErrClosedPipe) {
162+
p.logger.WithError(err).Warn("failed to write RTCP on track")
163+
}
164+
}
165+
}
166+
}
167+
121168
func (p *Publisher) WriteToSubscribers() {
122169
buff := make([]byte, bufferSize)
123170

src/subscriber.go

Lines changed: 30 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,8 @@ limitations under the License.
1717
package main
1818

1919
import (
20+
"errors"
21+
"io"
2022
"sync"
2123

2224
"github.com/pion/webrtc/v3"
@@ -59,6 +61,10 @@ func (s *Subscriber) initLoggingWithTrack(track *webrtc.TrackRemote) {
5961
func (s *Subscriber) Subscribe(publisher *Publisher) {
6062
s.initLoggingWithTrack(publisher.Track)
6163

64+
if s.publisher != nil {
65+
s.logger.Error("cannot subscribe, if we already are")
66+
}
67+
6268
track, err := webrtc.NewTrackLocalStaticRTP(
6369
publisher.Track.Codec().RTPCodecCapability,
6470
publisher.Track.ID(),
@@ -79,6 +85,10 @@ func (s *Subscriber) Subscribe(publisher *Publisher) {
7985
s.publisher = publisher
8086
s.mutex.Unlock()
8187

88+
if s.Track.Kind() == webrtc.RTPCodecTypeVideo {
89+
go s.forwardRTCP()
90+
}
91+
8292
publisher.AddSubscriber(s)
8393

8494
s.logger.Info("subscribed")
@@ -104,3 +114,23 @@ func (s *Subscriber) Unsubscribe() {
104114

105115
s.logger.Info("unsubscribed")
106116
}
117+
118+
func (s *Subscriber) forwardRTCP() {
119+
for {
120+
// If we unsubscribed, stop forwarding RTCP packets
121+
if s.publisher == nil {
122+
return
123+
}
124+
125+
packets, _, err := s.sender.ReadRTCP()
126+
if err != nil {
127+
if errors.Is(err, io.ErrClosedPipe) || errors.Is(err, io.EOF) {
128+
return
129+
}
130+
131+
s.logger.WithError(err).Warn("failed to read RTCP on track")
132+
}
133+
134+
s.publisher.WriteRTCP(packets)
135+
}
136+
}

src/utils.go

Lines changed: 0 additions & 57 deletions
This file was deleted.

0 commit comments

Comments
 (0)