Skip to content

Commit 07ed8d7

Browse files
committed
Further refactor code
Signed-off-by: Šimon Brandner <[email protected]>
1 parent 5a83a8f commit 07ed8d7

File tree

5 files changed

+299
-236
lines changed

5 files changed

+299
-236
lines changed
Lines changed: 78 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,78 @@
1+
package conference
2+
3+
import (
4+
"github.com/pion/webrtc/v3"
5+
"golang.org/x/exp/slices"
6+
"maunium.net/go/mautrix/event"
7+
)
8+
9+
// Handle the `SFUMessage` event from the DataChannel message.
10+
func (c *Conference) processSelectDCMessage(participant *Participant, msg event.SFUMessage) {
11+
participant.logger.Info("Received select request over DC")
12+
13+
// Find tracks based on what we were asked for.
14+
tracks := c.getTracks(msg.Start)
15+
16+
// Let's check if we have all the tracks that we were asked for are there.
17+
// If not, we will list which are not available (later on we must inform participant
18+
// about it unless the participant retries it).
19+
if len(tracks) != len(msg.Start) {
20+
for _, expected := range msg.Start {
21+
found := slices.IndexFunc(tracks, func(track *webrtc.TrackLocalStaticRTP) bool {
22+
return track.StreamID() == expected.StreamID && track.ID() == expected.TrackID
23+
})
24+
25+
if found == -1 {
26+
c.logger.Warnf("Track not found: %s", expected.TrackID)
27+
}
28+
}
29+
}
30+
31+
// Subscribe to the found tracks.
32+
for _, track := range tracks {
33+
if err := participant.peer.SubscribeTo(track); err != nil {
34+
participant.logger.Errorf("Failed to subscribe to track: %v", err)
35+
return
36+
}
37+
}
38+
}
39+
40+
func (c *Conference) processAnswerDCMessage(participant *Participant, msg event.SFUMessage) {
41+
participant.logger.Info("Received SDP answer over DC")
42+
43+
if err := participant.peer.ProcessSDPAnswer(msg.SDP); err != nil {
44+
participant.logger.Errorf("Failed to set SDP answer: %v", err)
45+
return
46+
}
47+
}
48+
49+
func (c *Conference) processPublishDCMessage(participant *Participant, msg event.SFUMessage) {
50+
participant.logger.Info("Received SDP offer over DC")
51+
52+
answer, err := participant.peer.ProcessSDPOffer(msg.SDP)
53+
if err != nil {
54+
participant.logger.Errorf("Failed to set SDP offer: %v", err)
55+
return
56+
}
57+
58+
participant.streamMetadata = msg.Metadata
59+
60+
participant.sendDataChannelMessage(event.SFUMessage{
61+
Op: event.SFUOperationAnswer,
62+
SDP: answer.SDP,
63+
Metadata: c.getAvailableStreamsFor(participant.id),
64+
})
65+
}
66+
67+
func (c *Conference) processUnpublishDCMessage(participant *Participant) {
68+
participant.logger.Info("Received unpublish over DC")
69+
}
70+
71+
func (c *Conference) processAliveDCMessage(participant *Participant) {
72+
participant.peer.ProcessHeartbeat()
73+
}
74+
75+
func (c *Conference) processMetadataDCMessage(participant *Participant, msg event.SFUMessage) {
76+
participant.streamMetadata = msg.Metadata
77+
c.resendMetadataToAllExcept(participant.id)
78+
}
File renamed without changes.
Lines changed: 87 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,87 @@
1+
package conference
2+
3+
import (
4+
"errors"
5+
6+
"github.com/matrix-org/waterfall/pkg/common"
7+
"github.com/matrix-org/waterfall/pkg/peer"
8+
"maunium.net/go/mautrix/event"
9+
)
10+
11+
// Listen on messages from incoming channels and process them.
12+
// This is essentially the main loop of the conference.
13+
// If this function returns, the conference is over.
14+
func (c *Conference) processMessages() {
15+
for {
16+
select {
17+
case msg := <-c.peerMessages:
18+
c.processPeerMessage(msg)
19+
case msg := <-c.matrixMessages.Channel:
20+
c.processMatrixMessage(msg)
21+
}
22+
23+
// If there are no more participants, stop the conference.
24+
if len(c.participants) == 0 {
25+
c.logger.Info("No more participants, stopping the conference")
26+
// Close the channel so that the sender can't push any messages.
27+
unreadMessages := c.matrixMessages.Close()
28+
29+
// Send the information that we ended to the owner and pass the message
30+
// that we did not process (so that we don't drop it silently).
31+
c.endNotifier.Notify(unreadMessages)
32+
return
33+
}
34+
}
35+
}
36+
37+
// Process a message from a local peer.
38+
func (c *Conference) processPeerMessage(message common.Message[ParticipantID, peer.MessageContent]) {
39+
participant := c.getParticipant(message.Sender, errors.New("received a message from a deleted participant"))
40+
if participant == nil {
41+
return
42+
}
43+
44+
// Since Go does not support ADTs, we have to use a switch statement to
45+
// determine the actual type of the message.
46+
switch msg := message.Content.(type) {
47+
case peer.JoinedTheCall:
48+
c.processJoinedTheCallMessage(participant, msg)
49+
case peer.LeftTheCall:
50+
c.processLeftTheCallMessage(participant, msg)
51+
case peer.NewTrackPublished:
52+
c.processNewTrackPublishedMessage(participant, msg)
53+
case peer.PublishedTrackFailed:
54+
c.processPublishedTrackFailedMessage(participant, msg)
55+
case peer.NewICECandidate:
56+
c.processNewICECandidateMessage(participant, msg)
57+
case peer.ICEGatheringComplete:
58+
c.processICEGatheringCompleteMessage(participant, msg)
59+
case peer.RenegotiationRequired:
60+
c.processRenegotiationRequiredMessage(participant, msg)
61+
case peer.DataChannelMessage:
62+
c.processDataChannelMessage(participant, msg)
63+
case peer.DataChannelAvailable:
64+
c.processDataChannelAvailableMessage(participant, msg)
65+
case peer.ForwardRTCP:
66+
c.processForwardRTCPMessage(msg)
67+
case peer.PLISent:
68+
c.processPLISentMessage(msg)
69+
default:
70+
c.logger.Errorf("Unknown message type: %T", msg)
71+
}
72+
}
73+
74+
func (c *Conference) processMatrixMessage(msg MatrixMessage) {
75+
switch ev := msg.Content.(type) {
76+
case *event.CallInviteEventContent:
77+
c.onNewParticipant(msg.Sender, ev)
78+
case *event.CallCandidatesEventContent:
79+
c.onCandidates(msg.Sender, ev)
80+
case *event.CallSelectAnswerEventContent:
81+
c.onSelectAnswer(msg.Sender, ev)
82+
case *event.CallHangupEventContent:
83+
c.onHangup(msg.Sender, ev)
84+
default:
85+
c.logger.Errorf("Unexpected event type: %T", ev)
86+
}
87+
}
Lines changed: 134 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,134 @@
1+
package conference
2+
3+
import (
4+
"encoding/json"
5+
6+
"github.com/matrix-org/waterfall/pkg/peer"
7+
"github.com/pion/webrtc/v3"
8+
"maunium.net/go/mautrix/event"
9+
)
10+
11+
func (c *Conference) processJoinedTheCallMessage(participant *Participant, message peer.JoinedTheCall) {
12+
participant.logger.Info("Joined the call")
13+
}
14+
15+
func (c *Conference) processLeftTheCallMessage(participant *Participant, msg peer.LeftTheCall) {
16+
participant.logger.Info("Left the call: %s", msg.Reason)
17+
c.removeParticipant(participant.id)
18+
c.signaling.SendHangup(participant.asMatrixRecipient(), msg.Reason)
19+
}
20+
21+
func (c *Conference) processNewTrackPublishedMessage(participant *Participant, msg peer.NewTrackPublished) {
22+
participant.logger.Infof("Published new track: %s", msg.Track.ID())
23+
key := event.SFUTrackDescription{
24+
StreamID: msg.Track.StreamID(),
25+
TrackID: msg.Track.ID(),
26+
}
27+
28+
if _, ok := participant.publishedTracks[key]; ok {
29+
c.logger.Errorf("Track already published: %v", key)
30+
return
31+
}
32+
33+
participant.publishedTracks[key] = PublishedTrack{Track: msg.Track}
34+
c.resendMetadataToAllExcept(participant.id)
35+
}
36+
37+
func (c *Conference) processPublishedTrackFailedMessage(participant *Participant, msg peer.PublishedTrackFailed) {
38+
participant.logger.Infof("Failed published track: %s", msg.Track.ID())
39+
delete(participant.publishedTracks, event.SFUTrackDescription{
40+
StreamID: msg.Track.StreamID(),
41+
TrackID: msg.Track.ID(),
42+
})
43+
44+
for _, otherParticipant := range c.participants {
45+
if otherParticipant.id == participant.id {
46+
continue
47+
}
48+
49+
otherParticipant.peer.UnsubscribeFrom([]*webrtc.TrackLocalStaticRTP{msg.Track})
50+
}
51+
52+
c.resendMetadataToAllExcept(participant.id)
53+
}
54+
55+
func (c *Conference) processNewICECandidateMessage(participant *Participant, msg peer.NewICECandidate) {
56+
participant.logger.Debug("Received a new local ICE candidate")
57+
58+
// Convert WebRTC ICE candidate to Matrix ICE candidate.
59+
jsonCandidate := msg.Candidate.ToJSON()
60+
candidates := []event.CallCandidate{{
61+
Candidate: jsonCandidate.Candidate,
62+
SDPMLineIndex: int(*jsonCandidate.SDPMLineIndex),
63+
SDPMID: *jsonCandidate.SDPMid,
64+
}}
65+
c.signaling.SendICECandidates(participant.asMatrixRecipient(), candidates)
66+
}
67+
68+
func (c *Conference) processICEGatheringCompleteMessage(participant *Participant, msg peer.ICEGatheringComplete) {
69+
participant.logger.Info("Completed local ICE gathering")
70+
71+
// Send an empty array of candidates to indicate that ICE gathering is complete.
72+
c.signaling.SendCandidatesGatheringFinished(participant.asMatrixRecipient())
73+
}
74+
75+
func (c *Conference) processRenegotiationRequiredMessage(participant *Participant, msg peer.RenegotiationRequired) {
76+
participant.logger.Info("Started renegotiation")
77+
participant.sendDataChannelMessage(event.SFUMessage{
78+
Op: event.SFUOperationOffer,
79+
SDP: msg.Offer.SDP,
80+
Metadata: c.getAvailableStreamsFor(participant.id),
81+
})
82+
}
83+
84+
func (c *Conference) processDataChannelMessage(participant *Participant, msg peer.DataChannelMessage) {
85+
participant.logger.Debug("Received data channel message")
86+
var sfuMessage event.SFUMessage
87+
if err := json.Unmarshal([]byte(msg.Message), &sfuMessage); err != nil {
88+
c.logger.Errorf("Failed to unmarshal SFU message: %v", err)
89+
return
90+
}
91+
92+
switch sfuMessage.Op {
93+
case event.SFUOperationSelect:
94+
c.processSelectDCMessage(participant, sfuMessage)
95+
case event.SFUOperationAnswer:
96+
c.processAnswerDCMessage(participant, sfuMessage)
97+
case event.SFUOperationPublish:
98+
c.processPublishDCMessage(participant, sfuMessage)
99+
case event.SFUOperationUnpublish:
100+
c.processUnpublishDCMessage(participant)
101+
case event.SFUOperationAlive:
102+
c.processAliveDCMessage(participant)
103+
case event.SFUOperationMetadata:
104+
c.processMetadataDCMessage(participant, sfuMessage)
105+
}
106+
}
107+
108+
func (c *Conference) processDataChannelAvailableMessage(participant *Participant, msg peer.DataChannelAvailable) {
109+
participant.logger.Info("Connected data channel")
110+
participant.sendDataChannelMessage(event.SFUMessage{
111+
Op: event.SFUOperationMetadata,
112+
Metadata: c.getAvailableStreamsFor(participant.id),
113+
})
114+
}
115+
116+
func (c *Conference) processForwardRTCPMessage(msg peer.ForwardRTCP) {
117+
for _, participant := range c.participants {
118+
for _, publishedTrack := range participant.publishedTracks {
119+
if publishedTrack.Track.StreamID() == msg.StreamID && publishedTrack.Track.ID() == msg.TrackID {
120+
participant.peer.WriteRTCP(msg.Packets, msg.StreamID, msg.TrackID, publishedTrack.LastPLITimestamp.Load())
121+
}
122+
}
123+
}
124+
}
125+
126+
func (c *Conference) processPLISentMessage(msg peer.PLISent) {
127+
for _, participant := range c.participants {
128+
for _, publishedTrack := range participant.publishedTracks {
129+
if publishedTrack.Track.StreamID() == msg.StreamID && publishedTrack.Track.ID() == msg.TrackID {
130+
publishedTrack.LastPLITimestamp.Store(msg.Timestamp)
131+
}
132+
}
133+
}
134+
}

0 commit comments

Comments
 (0)