Skip to content

Commit 4ae7b80

Browse files
Merge pull request #70 from matrix-org/SimonBrandner/feat/event-shape
2 parents 9679c7f + 20e5dfc commit 4ae7b80

File tree

12 files changed

+172
-99
lines changed

12 files changed

+172
-99
lines changed

go.mod

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -39,4 +39,4 @@ require (
3939
golang.org/x/xerrors v0.0.0-20200804184101-5ec99f83aff1 // indirect
4040
)
4141

42-
replace maunium.net/go/mautrix => github.com/matrix-org/mautrix-go v0.0.0-20220817142816-160ea900a20b
42+
replace maunium.net/go/mautrix => github.com/matrix-org/mautrix-go v0.0.0-20221210135932-bd593dd0204b

go.sum

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -25,8 +25,8 @@ github.com/kr/pretty v0.1.0/go.mod h1:dAy3ld7l9f0ibDNOQOHHMYYIIbhfbHSm3C4ZsoJORN
2525
github.com/kr/pty v1.1.1/go.mod h1:pFQYn66WHrOpPYNljwOMqo10TkYh1fy3cYio2l3bCsQ=
2626
github.com/kr/text v0.1.0 h1:45sCR5RtlFHMR4UwH9sdQ5TC8v0qDQCHnXt+kaKSTVE=
2727
github.com/kr/text v0.1.0/go.mod h1:4Jbv+DJW3UT/LiOwJeYQe1efqtUx/iVham/4vfdArNI=
28-
github.com/matrix-org/mautrix-go v0.0.0-20220817142816-160ea900a20b h1:qKvyphdDykNjyF1vJLaVuWCPfNJWNzP7wHvMV5mw+Ss=
29-
github.com/matrix-org/mautrix-go v0.0.0-20220817142816-160ea900a20b/go.mod h1:hHvNi5iKVAiI2MAdAeXHtP4g9BvNEX2rsQpSF/x6Kx4=
28+
github.com/matrix-org/mautrix-go v0.0.0-20221210135932-bd593dd0204b h1:yMsRQmsBWm7wJurYwnyd7H7wZWawhp52ca62W3MqDA8=
29+
github.com/matrix-org/mautrix-go v0.0.0-20221210135932-bd593dd0204b/go.mod h1:hHvNi5iKVAiI2MAdAeXHtP4g9BvNEX2rsQpSF/x6Kx4=
3030
github.com/nxadm/tail v1.4.4/go.mod h1:kenIhsEOeOJmVchQTgglprH7qJGnHDVpk1VPCcaMI8A=
3131
github.com/nxadm/tail v1.4.8/go.mod h1:+ncqLTQzXmGhMZNUePPaPqPvBxHAIsmXswZKocGu+AU=
3232
github.com/onsi/ginkgo v1.6.0/go.mod h1:lLunBs/Ym6LB5Z9jYTR76FiuTmxDTDusOGeTQH+WWjE=

pkg/conference/config.go

Lines changed: 6 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,11 @@ package conference
22

33
// Configuration for the group conferences (calls).
44
type Config struct {
5-
// Keep-alive timeout for WebRTC connections. If no keep-alive has been received
6-
// from the client for this duration, the connection is considered dead (in seconds).
5+
// Keep-alive timeout for WebRTC connections. If the client doesn't respond
6+
// to an `m.call.ping` with an `m.call.pong` for this amount of time, the
7+
// connection is considered dead. (in seconds, no greater then 30)
78
KeepAliveTimeout int `yaml:"timeout"`
9+
// The time after which we should send another m.call.ping event to the
10+
// client. (in seconds, greater then 30)
11+
PingInterval int `yaml:"pingInterval"`
812
}

pkg/conference/data_channel_message_processor.go

Lines changed: 54 additions & 31 deletions
Original file line numberDiff line numberDiff line change
@@ -2,22 +2,32 @@ package conference
22

33
import (
44
"github.com/pion/webrtc/v3"
5+
"github.com/sirupsen/logrus"
56
"golang.org/x/exp/slices"
67
"maunium.net/go/mautrix/event"
78
)
89

9-
// Handle the `SFUMessage` event from the DataChannel message.
10-
func (c *Conference) processSelectDCMessage(participant *Participant, msg event.SFUMessage) {
11-
participant.logger.Info("Received select request over DC")
10+
// Handle the `FocusEvent` from the DataChannel message.
11+
func (c *Conference) processTrackSubscriptionDCMessage(
12+
participant *Participant, msg event.FocusCallTrackSubscriptionEventContent,
13+
) {
14+
participant.logger.Info("Received track subscription request over DC")
15+
16+
// TODO: Handle unsubscribe
1217

1318
// Find tracks based on what we were asked for.
14-
tracks := c.getTracks(msg.Start)
19+
tracks := c.getTracks(msg.Subscribe)
20+
21+
participant.logger.WithFields(logrus.Fields{
22+
"tracks_we_got": tracks,
23+
"tracks_we_want": msg,
24+
}).Debug("Tracks to subscribe to")
1525

1626
// Let's check if we have all the tracks that we were asked for are there.
1727
// If not, we will list which are not available (later on we must inform participant
1828
// about it unless the participant retries it).
19-
if len(tracks) != len(msg.Start) {
20-
for _, expected := range msg.Start {
29+
if len(tracks) != len(msg.Subscribe) {
30+
for _, expected := range msg.Subscribe {
2131
found := slices.IndexFunc(tracks, func(track *webrtc.TrackLocalStaticRTP) bool {
2232
return track.ID() == expected.TrackID
2333
})
@@ -30,45 +40,58 @@ func (c *Conference) processSelectDCMessage(participant *Participant, msg event.
3040

3141
// Subscribe to the found tracks.
3242
for _, track := range tracks {
43+
participant.logger.WithField("track_id", track.ID()).Debug("Subscribing to track")
3344
if err := participant.peer.SubscribeTo(track); err != nil {
3445
participant.logger.Errorf("Failed to subscribe to track: %v", err)
3546
return
3647
}
3748
}
3849
}
3950

40-
func (c *Conference) processAnswerDCMessage(participant *Participant, msg event.SFUMessage) {
41-
participant.logger.Info("Received SDP answer over DC")
42-
43-
if err := participant.peer.ProcessSDPAnswer(msg.SDP); err != nil {
44-
participant.logger.Errorf("Failed to set SDP answer: %v", err)
45-
return
46-
}
47-
}
51+
func (c *Conference) processNegotiateDCMessage(participant *Participant, msg event.FocusCallNegotiateEventContent) {
52+
participant.streamMetadata = msg.SDPStreamMetadata
4853

49-
func (c *Conference) processPublishDCMessage(participant *Participant, msg event.SFUMessage) {
50-
participant.logger.Info("Received SDP offer over DC")
54+
switch msg.Description.Type {
55+
case event.CallDataTypeOffer:
56+
participant.logger.WithField("SDP", msg.Description.SDP).Trace("Received SDP offer over DC")
5157

52-
answer, err := participant.peer.ProcessSDPOffer(msg.SDP)
53-
if err != nil {
54-
participant.logger.Errorf("Failed to set SDP offer: %v", err)
55-
return
56-
}
58+
answer, err := participant.peer.ProcessSDPOffer(msg.Description.SDP)
59+
if err != nil {
60+
participant.logger.Errorf("Failed to set SDP offer: %v", err)
61+
return
62+
}
5763

58-
participant.streamMetadata = msg.Metadata
64+
participant.sendDataChannelMessage(event.Event{
65+
Type: event.FocusCallNegotiate,
66+
Content: event.Content{
67+
Parsed: event.FocusCallNegotiateEventContent{
68+
Description: event.CallData{
69+
Type: event.CallDataType(answer.Type.String()),
70+
SDP: answer.SDP,
71+
},
72+
SDPStreamMetadata: c.getAvailableStreamsFor(participant.id),
73+
},
74+
},
75+
})
76+
case event.CallDataTypeAnswer:
77+
participant.logger.WithField("SDP", msg.Description.SDP).Trace("Received SDP answer over DC")
5978

60-
participant.sendDataChannelMessage(event.SFUMessage{
61-
Op: event.SFUOperationAnswer,
62-
SDP: answer.SDP,
63-
Metadata: c.getAvailableStreamsFor(participant.id),
64-
})
79+
if err := participant.peer.ProcessSDPAnswer(msg.Description.SDP); err != nil {
80+
participant.logger.Errorf("Failed to set SDP answer: %v", err)
81+
return
82+
}
83+
default:
84+
participant.logger.Errorf("Unknown SDP description type")
85+
}
6586
}
6687

67-
func (c *Conference) processAliveDCMessage(participant *Participant) {
68-
participant.peer.ProcessHeartbeat()
88+
func (c *Conference) processPongDCMessage(participant *Participant) {
89+
participant.peer.ProcessPong()
6990
}
7091

71-
func (c *Conference) processMetadataDCMessage(participant *Participant, msg event.SFUMessage) {
72-
participant.streamMetadata = msg.Metadata
92+
func (c *Conference) processMetadataDCMessage(
93+
participant *Participant, msg event.FocusCallSDPStreamMetadataChangedEventContent,
94+
) {
95+
participant.streamMetadata = msg.SDPStreamMetadata
7396
c.resendMetadataToAllExcept(participant.id)
7497
}

pkg/conference/matrix_message_processor.go

Lines changed: 19 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -52,8 +52,25 @@ func (c *Conference) onNewParticipant(participantID ParticipantID, inviteEvent *
5252
} else {
5353
messageSink := common.NewMessageSink(participantID, c.peerMessages)
5454

55-
keepAliveDeadline := time.Duration(c.config.KeepAliveTimeout) * time.Second
56-
peer, answer, err := peer.NewPeer(inviteEvent.Offer.SDP, messageSink, logger, keepAliveDeadline)
55+
peer, answer, err := peer.NewPeer(
56+
inviteEvent.Offer.SDP,
57+
messageSink,
58+
logger,
59+
peer.PingPongConfig{
60+
Interval: time.Duration(c.config.PingInterval) * time.Second,
61+
Deadline: time.Duration(c.config.KeepAliveTimeout) * time.Second,
62+
PongChannel: make(chan peer.Pong, common.UnboundedChannelSize),
63+
SendPing: func() {
64+
participant.sendDataChannelMessage(event.Event{
65+
Type: event.FocusCallPing,
66+
Content: event.Content{},
67+
})
68+
},
69+
OnDeadLine: func() {
70+
messageSink.Send(peer.LeftTheCall{Reason: event.CallHangupKeepAliveTimeout})
71+
},
72+
},
73+
)
5774
if err != nil {
5875
logger.WithError(err).Errorf("Failed to process SDP offer")
5976
return err

pkg/conference/participant.go

Lines changed: 2 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,6 @@
11
package conference
22

33
import (
4-
"encoding/json"
54
"time"
65

76
"github.com/matrix-org/waterfall/pkg/peer"
@@ -46,8 +45,8 @@ func (p *Participant) asMatrixRecipient() signaling.MatrixRecipient {
4645
}
4746
}
4847

49-
func (p *Participant) sendDataChannelMessage(toSend event.SFUMessage) {
50-
jsonToSend, err := json.Marshal(toSend)
48+
func (p *Participant) sendDataChannelMessage(toSend event.Event) {
49+
jsonToSend, err := toSend.MarshalJSON()
5150
if err != nil {
5251
p.logger.Error("Failed to marshal data channel message")
5352
return

pkg/conference/peer_message_processor.go

Lines changed: 37 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,6 @@
11
package conference
22

33
import (
4-
"encoding/json"
54
"time"
65

76
"github.com/matrix-org/waterfall/pkg/peer"
@@ -68,40 +67,57 @@ func (c *Conference) processICEGatheringCompleteMessage(participant *Participant
6867

6968
func (c *Conference) processRenegotiationRequiredMessage(participant *Participant, msg peer.RenegotiationRequired) {
7069
participant.logger.Info("Started renegotiation")
71-
participant.sendDataChannelMessage(event.SFUMessage{
72-
Op: event.SFUOperationOffer,
73-
SDP: msg.Offer.SDP,
74-
Metadata: c.getAvailableStreamsFor(participant.id),
70+
participant.sendDataChannelMessage(event.Event{
71+
Type: event.FocusCallNegotiate,
72+
Content: event.Content{
73+
Parsed: event.FocusCallNegotiateEventContent{
74+
Description: event.CallData{
75+
Type: event.CallDataType(msg.Offer.Type.String()),
76+
SDP: msg.Offer.SDP,
77+
},
78+
SDPStreamMetadata: c.getAvailableStreamsFor(participant.id),
79+
},
80+
},
7581
})
7682
}
7783

7884
func (c *Conference) processDataChannelMessage(participant *Participant, msg peer.DataChannelMessage) {
7985
participant.logger.Debug("Received data channel message")
80-
var sfuMessage event.SFUMessage
81-
if err := json.Unmarshal([]byte(msg.Message), &sfuMessage); err != nil {
86+
var focusEvent event.Event
87+
if err := focusEvent.UnmarshalJSON([]byte(msg.Message)); err != nil {
8288
c.logger.Errorf("Failed to unmarshal SFU message: %v", err)
8389
return
8490
}
8591

86-
switch sfuMessage.Op {
87-
case event.SFUOperationSelect:
88-
c.processSelectDCMessage(participant, sfuMessage)
89-
case event.SFUOperationAnswer:
90-
c.processAnswerDCMessage(participant, sfuMessage)
91-
case event.SFUOperationPublish, event.SFUOperationUnpublish:
92-
c.processPublishDCMessage(participant, sfuMessage)
93-
case event.SFUOperationAlive:
94-
c.processAliveDCMessage(participant)
95-
case event.SFUOperationMetadata:
96-
c.processMetadataDCMessage(participant, sfuMessage)
92+
// FIXME: We should be able to do
93+
// focusEvent.Content.ParseRaw(focusEvent.Type) but it throws an error.
94+
switch focusEvent.Type.Type {
95+
case event.FocusCallTrackSubscription.Type:
96+
focusEvent.Content.ParseRaw(event.FocusCallTrackSubscription)
97+
c.processTrackSubscriptionDCMessage(participant, *focusEvent.Content.AsFocusCallTrackSubscription())
98+
case event.FocusCallNegotiate.Type:
99+
focusEvent.Content.ParseRaw(event.FocusCallNegotiate)
100+
c.processNegotiateDCMessage(participant, *focusEvent.Content.AsFocusCallNegotiate())
101+
case event.FocusCallPong.Type:
102+
focusEvent.Content.ParseRaw(event.FocusCallPong)
103+
c.processPongDCMessage(participant)
104+
case event.FocusCallSDPStreamMetadataChanged.Type:
105+
focusEvent.Content.ParseRaw(event.FocusCallSDPStreamMetadataChanged)
106+
c.processMetadataDCMessage(participant, *focusEvent.Content.AsFocusCallSDPStreamMetadataChanged())
107+
default:
108+
participant.logger.WithField("type", focusEvent.Type.Type).Warn("Received data channel message of unknown type")
97109
}
98110
}
99111

100112
func (c *Conference) processDataChannelAvailableMessage(participant *Participant, msg peer.DataChannelAvailable) {
101113
participant.logger.Info("Connected data channel")
102-
participant.sendDataChannelMessage(event.SFUMessage{
103-
Op: event.SFUOperationMetadata,
104-
Metadata: c.getAvailableStreamsFor(participant.id),
114+
participant.sendDataChannelMessage(event.Event{
115+
Type: event.FocusCallSDPStreamMetadataChanged,
116+
Content: event.Content{
117+
Parsed: event.FocusCallSDPStreamMetadataChangedEventContent{
118+
SDPStreamMetadata: c.getAvailableStreamsFor(participant.id),
119+
},
120+
},
105121
})
106122
}
107123

pkg/conference/state.go

Lines changed: 8 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -94,7 +94,7 @@ func (c *Conference) getAvailableStreamsFor(forParticipant ParticipantID) event.
9494
}
9595

9696
// Helper that returns the list of streams inside this conference that match the given stream IDs and track IDs.
97-
func (c *Conference) getTracks(identifiers []event.SFUTrackDescription) []*webrtc.TrackLocalStaticRTP {
97+
func (c *Conference) getTracks(identifiers []event.FocusTrackDescription) []*webrtc.TrackLocalStaticRTP {
9898
tracks := make([]*webrtc.TrackLocalStaticRTP, 0)
9999
for _, participant := range c.participants {
100100
// Check if this participant has any of the tracks that we're looking for.
@@ -112,9 +112,13 @@ func (c *Conference) getTracks(identifiers []event.SFUTrackDescription) []*webrt
112112
func (c *Conference) resendMetadataToAllExcept(exceptMe ParticipantID) {
113113
for participantID, participant := range c.participants {
114114
if participantID != exceptMe {
115-
participant.sendDataChannelMessage(event.SFUMessage{
116-
Op: event.SFUOperationMetadata,
117-
Metadata: c.getAvailableStreamsFor(participantID),
115+
participant.sendDataChannelMessage(event.Event{
116+
Type: event.FocusCallSDPStreamMetadataChanged,
117+
Content: event.Content{
118+
Parsed: event.FocusCallSDPStreamMetadataChangedEventContent{
119+
SDPStreamMetadata: c.getAvailableStreamsFor(participantID),
120+
},
121+
},
118122
})
119123
}
120124
}

pkg/config/config.go

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -74,10 +74,13 @@ func LoadConfigFromString(configString string) (*Config, error) {
7474
return nil, fmt.Errorf("failed to unmarshal YAML file: %w", err)
7575
}
7676

77+
// TODO: We should split these up and add error messages
7778
if config.Matrix.UserID == "" ||
7879
config.Matrix.HomeserverURL == "" ||
7980
config.Matrix.AccessToken == "" ||
80-
config.Conference.KeepAliveTimeout == 0 {
81+
config.Conference.KeepAliveTimeout == 0 ||
82+
config.Conference.KeepAliveTimeout > 30 ||
83+
config.Conference.PingInterval < 30 {
8184
return nil, errors.New("invalid config values")
8285
}
8386

pkg/peer/keepalive.go

Lines changed: 0 additions & 23 deletions
This file was deleted.

0 commit comments

Comments
 (0)