Skip to content

Commit 8350bef

Browse files
committed
Change event shape to match MSC3898
Signed-off-by: Šimon Brandner <[email protected]>
1 parent 9679c7f commit 8350bef

File tree

10 files changed

+157
-75
lines changed

10 files changed

+157
-75
lines changed

pkg/conference/config.go

Lines changed: 6 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,11 @@ package conference
22

33
// Configuration for the group conferences (calls).
44
type Config struct {
5-
// Keep-alive timeout for WebRTC connections. If no keep-alive has been received
6-
// from the client for this duration, the connection is considered dead (in seconds).
5+
// Keep-alive timeout for WebRTC connections. If the client doesn't respond
6+
// to an `m.call.ping` with an `m.call.pong` for this amount of time, the
7+
// connection is considered dead. (in seconds, no greater then 30)
78
KeepAliveTimeout int `yaml:"timeout"`
9+
// The time after which we should send another m.call.ping event to the
10+
// client. (in seconds, greater then 30)
11+
PingInterval int `yaml:"pingInterval"`
812
}

pkg/conference/data_channel_message_processor.go

Lines changed: 54 additions & 32 deletions
Original file line numberDiff line numberDiff line change
@@ -2,22 +2,30 @@ package conference
22

33
import (
44
"github.com/pion/webrtc/v3"
5+
"github.com/sirupsen/logrus"
56
"golang.org/x/exp/slices"
67
"maunium.net/go/mautrix/event"
78
)
89

9-
// Handle the `SFUMessage` event from the DataChannel message.
10-
func (c *Conference) processSelectDCMessage(participant *Participant, msg event.SFUMessage) {
11-
participant.logger.Info("Received select request over DC")
10+
// Handle the `FocusEvent` from the DataChannel message.
11+
func (c *Conference) processTrackSubscriptionDCMessage(
12+
participant *Participant, msg event.FocusCallTrackSubscriptionEventContent,
13+
) {
14+
participant.logger.Info("Received track subscription request over DC")
1215

1316
// Find tracks based on what we were asked for.
14-
tracks := c.getTracks(msg.Start)
17+
tracks := c.getTracks(msg.Subscribe)
18+
19+
participant.logger.WithFields(logrus.Fields{
20+
"tracks_we_got": tracks,
21+
"tracks_we_want": msg,
22+
}).Debug("Tracks to subscribe to")
1523

1624
// Let's check if we have all the tracks that we were asked for are there.
1725
// If not, we will list which are not available (later on we must inform participant
1826
// about it unless the participant retries it).
19-
if len(tracks) != len(msg.Start) {
20-
for _, expected := range msg.Start {
27+
if len(tracks) != len(msg.Subscribe) {
28+
for _, expected := range msg.Subscribe {
2129
found := slices.IndexFunc(tracks, func(track *webrtc.TrackLocalStaticRTP) bool {
2230
return track.ID() == expected.TrackID
2331
})
@@ -28,47 +36,61 @@ func (c *Conference) processSelectDCMessage(participant *Participant, msg event.
2836
}
2937
}
3038

31-
// Subscribe to the found tracks.
39+
// Subscribe to the found tracks
3240
for _, track := range tracks {
41+
participant.logger.WithField("track_id", track.ID()).Debug("Subscribing to track")
3342
if err := participant.peer.SubscribeTo(track); err != nil {
3443
participant.logger.Errorf("Failed to subscribe to track: %v", err)
3544
return
3645
}
3746
}
38-
}
39-
40-
func (c *Conference) processAnswerDCMessage(participant *Participant, msg event.SFUMessage) {
41-
participant.logger.Info("Received SDP answer over DC")
4247

43-
if err := participant.peer.ProcessSDPAnswer(msg.SDP); err != nil {
44-
participant.logger.Errorf("Failed to set SDP answer: %v", err)
45-
return
46-
}
48+
// TODO: Handle unsubscribe
4749
}
4850

49-
func (c *Conference) processPublishDCMessage(participant *Participant, msg event.SFUMessage) {
50-
participant.logger.Info("Received SDP offer over DC")
51+
func (c *Conference) processNegotiateDCMessage(participant *Participant, msg event.FocusCallNegotiateEventContent) {
52+
participant.streamMetadata = msg.SDPStreamMetadata
5153

52-
answer, err := participant.peer.ProcessSDPOffer(msg.SDP)
53-
if err != nil {
54-
participant.logger.Errorf("Failed to set SDP offer: %v", err)
55-
return
56-
}
54+
if msg.Description.Type == event.CallDataTypeOffer {
55+
participant.logger.Info("Received SDP offer over DC")
5756

58-
participant.streamMetadata = msg.Metadata
57+
answer, err := participant.peer.ProcessSDPOffer(msg.Description.SDP)
58+
if err != nil {
59+
participant.logger.Errorf("Failed to set SDP offer: %v", err)
60+
return
61+
}
62+
63+
participant.sendDataChannelMessage(event.Event{
64+
Type: event.FocusCallNegotiate,
65+
Content: event.Content{
66+
Parsed: event.FocusCallNegotiateEventContent{
67+
Description: event.CallData{
68+
Type: event.CallDataType(answer.Type.String()),
69+
SDP: answer.SDP,
70+
},
71+
SDPStreamMetadata: c.getAvailableStreamsFor(participant.id),
72+
},
73+
},
74+
})
75+
} else if msg.Description.Type == event.CallDataTypeAnswer {
76+
participant.logger.Info("Received SDP answer over DC")
5977

60-
participant.sendDataChannelMessage(event.SFUMessage{
61-
Op: event.SFUOperationAnswer,
62-
SDP: answer.SDP,
63-
Metadata: c.getAvailableStreamsFor(participant.id),
64-
})
78+
if err := participant.peer.ProcessSDPAnswer(msg.Description.SDP); err != nil {
79+
participant.logger.Errorf("Failed to set SDP answer: %v", err)
80+
return
81+
}
82+
} else {
83+
participant.logger.Errorf("Unknown SDP description type")
84+
}
6585
}
6686

67-
func (c *Conference) processAliveDCMessage(participant *Participant) {
68-
participant.peer.ProcessHeartbeat()
87+
func (c *Conference) processPongDCMessage(participant *Participant) {
88+
participant.peer.ProcessPong()
6989
}
7090

71-
func (c *Conference) processMetadataDCMessage(participant *Participant, msg event.SFUMessage) {
72-
participant.streamMetadata = msg.Metadata
91+
func (c *Conference) processMetadataDCMessage(
92+
participant *Participant, msg event.FocusCallSDPStreamMetadataChangedEventContent,
93+
) {
94+
participant.streamMetadata = msg.SDPStreamMetadata
7395
c.resendMetadataToAllExcept(participant.id)
7496
}

pkg/conference/matrix_message_processor.go

Lines changed: 15 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -53,7 +53,21 @@ func (c *Conference) onNewParticipant(participantID ParticipantID, inviteEvent *
5353
messageSink := common.NewMessageSink(participantID, c.peerMessages)
5454

5555
keepAliveDeadline := time.Duration(c.config.KeepAliveTimeout) * time.Second
56-
peer, answer, err := peer.NewPeer(inviteEvent.Offer.SDP, messageSink, logger, keepAliveDeadline)
56+
pingInterval := time.Duration(c.config.PingInterval) * time.Second
57+
sendPing := func() {
58+
participant.sendDataChannelMessage(event.Event{
59+
Type: event.FocusCallPing,
60+
Content: event.Content{},
61+
})
62+
}
63+
peer, answer, err := peer.NewPeer(
64+
inviteEvent.Offer.SDP,
65+
messageSink,
66+
logger,
67+
pingInterval,
68+
keepAliveDeadline,
69+
sendPing,
70+
)
5771
if err != nil {
5872
logger.WithError(err).Errorf("Failed to process SDP offer")
5973
return err

pkg/conference/participant.go

Lines changed: 2 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,6 @@
11
package conference
22

33
import (
4-
"encoding/json"
54
"time"
65

76
"github.com/matrix-org/waterfall/pkg/peer"
@@ -46,8 +45,8 @@ func (p *Participant) asMatrixRecipient() signaling.MatrixRecipient {
4645
}
4746
}
4847

49-
func (p *Participant) sendDataChannelMessage(toSend event.SFUMessage) {
50-
jsonToSend, err := json.Marshal(toSend)
48+
func (p *Participant) sendDataChannelMessage(toSend event.Event) {
49+
jsonToSend, err := toSend.MarshalJSON()
5150
if err != nil {
5251
p.logger.Error("Failed to marshal data channel message")
5352
return

pkg/conference/peer_message_processor.go

Lines changed: 35 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,6 @@
11
package conference
22

33
import (
4-
"encoding/json"
54
"time"
65

76
"github.com/matrix-org/waterfall/pkg/peer"
@@ -68,40 +67,55 @@ func (c *Conference) processICEGatheringCompleteMessage(participant *Participant
6867

6968
func (c *Conference) processRenegotiationRequiredMessage(participant *Participant, msg peer.RenegotiationRequired) {
7069
participant.logger.Info("Started renegotiation")
71-
participant.sendDataChannelMessage(event.SFUMessage{
72-
Op: event.SFUOperationOffer,
73-
SDP: msg.Offer.SDP,
74-
Metadata: c.getAvailableStreamsFor(participant.id),
70+
participant.sendDataChannelMessage(event.Event{
71+
Type: event.FocusCallNegotiate,
72+
Content: event.Content{
73+
Parsed: event.FocusCallNegotiateEventContent{
74+
Description: event.CallData{
75+
Type: event.CallDataType(msg.Offer.Type.String()),
76+
SDP: msg.Offer.SDP,
77+
},
78+
SDPStreamMetadata: c.getAvailableStreamsFor(participant.id),
79+
},
80+
},
7581
})
7682
}
7783

7884
func (c *Conference) processDataChannelMessage(participant *Participant, msg peer.DataChannelMessage) {
7985
participant.logger.Debug("Received data channel message")
80-
var sfuMessage event.SFUMessage
81-
if err := json.Unmarshal([]byte(msg.Message), &sfuMessage); err != nil {
86+
var focusEvent event.Event
87+
if err := focusEvent.UnmarshalJSON([]byte(msg.Message)); err != nil {
8288
c.logger.Errorf("Failed to unmarshal SFU message: %v", err)
8389
return
8490
}
8591

86-
switch sfuMessage.Op {
87-
case event.SFUOperationSelect:
88-
c.processSelectDCMessage(participant, sfuMessage)
89-
case event.SFUOperationAnswer:
90-
c.processAnswerDCMessage(participant, sfuMessage)
91-
case event.SFUOperationPublish, event.SFUOperationUnpublish:
92-
c.processPublishDCMessage(participant, sfuMessage)
93-
case event.SFUOperationAlive:
94-
c.processAliveDCMessage(participant)
95-
case event.SFUOperationMetadata:
96-
c.processMetadataDCMessage(participant, sfuMessage)
92+
switch focusEvent.Type.Type {
93+
case event.FocusCallTrackSubscription.Type:
94+
focusEvent.Content.ParseRaw(event.FocusCallTrackSubscription)
95+
c.processTrackSubscriptionDCMessage(participant, *focusEvent.Content.AsFocusCallTrackSubscription())
96+
case event.FocusCallNegotiate.Type:
97+
focusEvent.Content.ParseRaw(event.FocusCallNegotiate)
98+
c.processNegotiateDCMessage(participant, *focusEvent.Content.AsFocusCallNegotiate())
99+
case event.FocusCallPong.Type:
100+
focusEvent.Content.ParseRaw(event.FocusCallPong)
101+
c.processPongDCMessage(participant)
102+
case event.FocusCallSDPStreamMetadataChanged.Type:
103+
focusEvent.Content.ParseRaw(event.FocusCallSDPStreamMetadataChanged)
104+
c.processMetadataDCMessage(participant, *focusEvent.Content.AsFocusCallSDPStreamMetadataChanged())
105+
default:
106+
participant.logger.WithField("type", focusEvent.Type.Type).Warn("Received data channel message of unknown type")
97107
}
98108
}
99109

100110
func (c *Conference) processDataChannelAvailableMessage(participant *Participant, msg peer.DataChannelAvailable) {
101111
participant.logger.Info("Connected data channel")
102-
participant.sendDataChannelMessage(event.SFUMessage{
103-
Op: event.SFUOperationMetadata,
104-
Metadata: c.getAvailableStreamsFor(participant.id),
112+
participant.sendDataChannelMessage(event.Event{
113+
Type: event.FocusCallSDPStreamMetadataChanged,
114+
Content: event.Content{
115+
Parsed: event.FocusCallSDPStreamMetadataChangedEventContent{
116+
SDPStreamMetadata: c.getAvailableStreamsFor(participant.id),
117+
},
118+
},
105119
})
106120
}
107121

pkg/conference/state.go

Lines changed: 8 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -94,7 +94,7 @@ func (c *Conference) getAvailableStreamsFor(forParticipant ParticipantID) event.
9494
}
9595

9696
// Helper that returns the list of streams inside this conference that match the given stream IDs and track IDs.
97-
func (c *Conference) getTracks(identifiers []event.SFUTrackDescription) []*webrtc.TrackLocalStaticRTP {
97+
func (c *Conference) getTracks(identifiers []event.FocusTrackDescription) []*webrtc.TrackLocalStaticRTP {
9898
tracks := make([]*webrtc.TrackLocalStaticRTP, 0)
9999
for _, participant := range c.participants {
100100
// Check if this participant has any of the tracks that we're looking for.
@@ -112,9 +112,13 @@ func (c *Conference) getTracks(identifiers []event.SFUTrackDescription) []*webrt
112112
func (c *Conference) resendMetadataToAllExcept(exceptMe ParticipantID) {
113113
for participantID, participant := range c.participants {
114114
if participantID != exceptMe {
115-
participant.sendDataChannelMessage(event.SFUMessage{
116-
Op: event.SFUOperationMetadata,
117-
Metadata: c.getAvailableStreamsFor(participantID),
115+
participant.sendDataChannelMessage(event.Event{
116+
Type: event.FocusCallSDPStreamMetadataChanged,
117+
Content: event.Content{
118+
Parsed: event.FocusCallSDPStreamMetadataChangedEventContent{
119+
SDPStreamMetadata: c.getAvailableStreamsFor(participantID),
120+
},
121+
},
118122
})
119123
}
120124
}

pkg/config/config.go

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -77,7 +77,9 @@ func LoadConfigFromString(configString string) (*Config, error) {
7777
if config.Matrix.UserID == "" ||
7878
config.Matrix.HomeserverURL == "" ||
7979
config.Matrix.AccessToken == "" ||
80-
config.Conference.KeepAliveTimeout == 0 {
80+
config.Conference.KeepAliveTimeout == 0 ||
81+
config.Conference.KeepAliveTimeout > 30 ||
82+
config.Conference.PingInterval < 30 {
8183
return nil, errors.New("invalid config values")
8284
}
8385

pkg/peer/keepalive.go

Lines changed: 12 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -2,18 +2,26 @@ package peer
22

33
import "time"
44

5-
type HeartBeat struct{}
5+
type Pong struct{}
66

77
// Starts a goroutine that will execute `onDeadLine` closure in case nothing has been published
88
// to the `heartBeat` channel for `deadline` duration. The goroutine stops once the channel is closed.
9-
func startKeepAlive(deadline time.Duration, heartBeat <-chan HeartBeat, onDeadLine func()) {
9+
func startKeepAlive(
10+
interval time.Duration,
11+
deadline time.Duration,
12+
pong <-chan Pong,
13+
sendPing func(),
14+
onDeadLine func(),
15+
) {
1016
go func() {
11-
for {
17+
for range time.Tick(interval) {
18+
sendPing()
19+
1220
select {
1321
case <-time.After(deadline):
1422
onDeadLine()
1523
return
16-
case _, ok := <-heartBeat:
24+
case _, ok := <-pong:
1725
if !ok {
1826
return
1927
}

pkg/peer/peer.go

Lines changed: 14 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -11,7 +11,6 @@ import (
1111
"github.com/pion/webrtc/v3"
1212
"github.com/sirupsen/logrus"
1313
"golang.org/x/exp/slices"
14-
"maunium.net/go/mautrix/event"
1514
)
1615

1716
var (
@@ -34,7 +33,11 @@ type Peer[ID comparable] struct {
3433
logger *logrus.Entry
3534
peerConnection *webrtc.PeerConnection
3635
sink *common.MessageSink[ID, MessageContent]
37-
heartbeat chan HeartBeat
36+
37+
pong chan Pong
38+
sendPing func()
39+
pingInterval time.Duration
40+
keepAliveDeadline time.Duration
3841

3942
dataChannelMutex sync.Mutex
4043
dataChannel *webrtc.DataChannel
@@ -45,7 +48,9 @@ func NewPeer[ID comparable](
4548
sdpOffer string,
4649
sink *common.MessageSink[ID, MessageContent],
4750
logger *logrus.Entry,
51+
pingInterval time.Duration,
4852
keepAliveDeadline time.Duration,
53+
sendPing func(),
4954
) (*Peer[ID], *webrtc.SessionDescription, error) {
5055
peerConnection, err := webrtc.NewPeerConnection(webrtc.Configuration{})
5156
if err != nil {
@@ -57,7 +62,11 @@ func NewPeer[ID comparable](
5762
logger: logger,
5863
peerConnection: peerConnection,
5964
sink: sink,
60-
heartbeat: make(chan HeartBeat, common.UnboundedChannelSize),
65+
66+
pong: make(chan Pong, common.UnboundedChannelSize),
67+
pingInterval: pingInterval,
68+
keepAliveDeadline: keepAliveDeadline,
69+
sendPing: sendPing,
6170
}
6271

6372
peerConnection.OnTrack(peer.onRtpTrackReceived)
@@ -72,8 +81,6 @@ func NewPeer[ID comparable](
7281
if sdpAnswer, err := peer.ProcessSDPOffer(sdpOffer); err != nil {
7382
return nil, nil, err
7483
} else {
75-
onDeadline := func() { peer.sink.Send(LeftTheCall{event.CallHangupKeepAliveTimeout}) }
76-
startKeepAlive(keepAliveDeadline, peer.heartbeat, onDeadline)
7784
return peer, sdpAnswer, nil
7885
}
7986
}
@@ -252,6 +259,6 @@ func (p *Peer[ID]) ProcessSDPOffer(sdpOffer string) (*webrtc.SessionDescription,
252259
// New heartbeat received (keep-alive message that is periodically sent by the remote peer).
253260
// We need to update the last heartbeat time. If the peer is not active for too long, we will
254261
// consider peer's connection as stalled and will close it.
255-
func (p *Peer[ID]) ProcessHeartbeat() {
256-
p.heartbeat <- HeartBeat{}
262+
func (p *Peer[ID]) ProcessPong() {
263+
p.pong <- Pong{}
257264
}

0 commit comments

Comments
 (0)