Skip to content

Commit 993d585

Browse files
conference: fix the subscribe/unsubscribe logic
1 parent bd0ed3b commit 993d585

File tree

6 files changed

+44
-35
lines changed

6 files changed

+44
-35
lines changed

pkg/conference/matrix.go

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -21,7 +21,6 @@ func (c *Conference) onNewParticipant(participantID ParticipantID, inviteEvent *
2121
logger := c.logger.WithFields(logrus.Fields{
2222
"user_id": participantID.UserID,
2323
"device_id": participantID.DeviceID,
24-
"call_id": participantID.CallID,
2524
})
2625

2726
logger.Info("Incoming call invite")

pkg/conference/processor.go

Lines changed: 27 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,7 @@ import (
77
"github.com/matrix-org/waterfall/pkg/common"
88
"github.com/matrix-org/waterfall/pkg/peer"
99
"github.com/pion/webrtc/v3"
10+
"golang.org/x/exp/slices"
1011
"maunium.net/go/mautrix/event"
1112
)
1213

@@ -85,7 +86,7 @@ func (c *Conference) processPeerMessage(message common.Message[ParticipantID, pe
8586
}
8687

8788
case peer.NewICECandidate:
88-
participant.logger.Info("Received a new local ICE candidate")
89+
participant.logger.Debug("Received a new local ICE candidate")
8990

9091
// Convert WebRTC ICE candidate to Matrix ICE candidate.
9192
jsonCandidate := msg.Candidate.ToJSON()
@@ -111,7 +112,7 @@ func (c *Conference) processPeerMessage(message common.Message[ParticipantID, pe
111112
})
112113

113114
case peer.DataChannelMessage:
114-
participant.logger.Info("Sent data channel message")
115+
participant.logger.Debug("Received data channel message")
115116
var sfuMessage event.SFUMessage
116117
if err := json.Unmarshal([]byte(msg.Message), &sfuMessage); err != nil {
117118
c.logger.Errorf("Failed to unmarshal SFU message: %v", err)
@@ -136,30 +137,44 @@ func (c *Conference) processPeerMessage(message common.Message[ParticipantID, pe
136137
func (c *Conference) handleDataChannelMessage(participant *Participant, sfuMessage event.SFUMessage) {
137138
switch sfuMessage.Op {
138139
case event.SFUOperationSelect:
139-
participant.logger.Info("Sent select request over DC")
140-
141-
// Get the tracks that correspond to the tracks that the participant wants to receive.
142-
for _, track := range c.getTracks(sfuMessage.Start) {
143-
if track == nil {
144-
participant.logger.Errorf("Bug, track is nil")
140+
participant.logger.Info("Received select request over DC")
141+
142+
// Find tracks based on what we were asked for.
143+
tracks := c.getTracks(sfuMessage.Start)
144+
145+
// Let's check if we have all the tracks that we were asked for are there.
146+
// If not, we will list which are not available (later on we must inform participant
147+
// about it unless the participant retries it).
148+
if len(tracks) != len(sfuMessage.Start) {
149+
for _, expected := range sfuMessage.Start {
150+
found := slices.IndexFunc(tracks, func(track *webrtc.TrackLocalStaticRTP) bool {
151+
return track.StreamID() == expected.StreamID && track.ID() == expected.TrackID
152+
})
153+
154+
if found == -1 {
155+
c.logger.Warnf("Track not found: %s", expected.TrackID)
156+
}
145157
}
158+
}
146159

160+
// Subscribe to the found tracks.
161+
for _, track := range tracks {
147162
if err := participant.peer.SubscribeTo(track); err != nil {
148163
participant.logger.Errorf("Failed to subscribe to track: %v", err)
149164
return
150165
}
151166
}
152167

153168
case event.SFUOperationAnswer:
154-
participant.logger.Info("Sent SDP answer over DC")
169+
participant.logger.Info("Received SDP answer over DC")
155170

156171
if err := participant.peer.ProcessSDPAnswer(sfuMessage.SDP); err != nil {
157172
participant.logger.Errorf("Failed to set SDP answer: %v", err)
158173
return
159174
}
160175

161176
case event.SFUOperationPublish:
162-
participant.logger.Info("Sent SDP offer over DC")
177+
participant.logger.Info("Received SDP offer over DC")
163178

164179
answer, err := participant.peer.ProcessSDPOffer(sfuMessage.SDP)
165180
if err != nil {
@@ -173,13 +188,13 @@ func (c *Conference) handleDataChannelMessage(participant *Participant, sfuMessa
173188
})
174189

175190
case event.SFUOperationUnpublish:
176-
participant.logger.Info("Sent unpublish over DC")
191+
participant.logger.Info("Received unpublish over DC")
177192

178193
// TODO: Clarify the semantics of unpublish.
179194
case event.SFUOperationAlive:
180195
// FIXME: Handle the heartbeat message here (updating the last timestamp etc).
181196
case event.SFUOperationMetadata:
182-
participant.logger.Info("Sent metadata over DC")
197+
participant.logger.Info("Received metadata over DC")
183198

184199
participant.streamMetadata = sfuMessage.Metadata
185200
c.resendMetadataToAllExcept(participant.id)

pkg/conference/state.go

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -88,6 +88,7 @@ func (c *Conference) getTracks(identifiers []event.SFUTrackDescription) []*webrt
8888
}
8989
}
9090
}
91+
9192
return tracks
9293
}
9394

pkg/peer/peer.go

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -106,6 +106,10 @@ func (p *Peer[ID]) SubscribeTo(track *webrtc.TrackLocalStaticRTP) error {
106106
func (p *Peer[ID]) UnsubscribeFrom(tracks []*webrtc.TrackLocalStaticRTP) {
107107
// That's unfortunately an O(m*n) operation, but we don't expect the number of tracks to be big.
108108
for _, presentTrack := range p.peerConnection.GetSenders() {
109+
if presentTrack.Track() == nil {
110+
continue
111+
}
112+
109113
for _, trackToUnsubscribe := range tracks {
110114
presentTrackID, presentStreamID := presentTrack.Track().ID(), presentTrack.Track().StreamID()
111115
trackID, streamID := trackToUnsubscribe.ID(), trackToUnsubscribe.StreamID()
@@ -165,7 +169,6 @@ func (p *Peer[ID]) ProcessSDPAnswer(sdpAnswer string) error {
165169

166170
// Applies the sdp offer received from the remote peer and generates an SDP answer.
167171
func (p *Peer[ID]) ProcessSDPOffer(sdpOffer string) (*webrtc.SessionDescription, error) {
168-
p.logger.WithField("sdpOffer", sdpOffer).Debug("processing SDP offer")
169172
err := p.peerConnection.SetRemoteDescription(webrtc.SessionDescription{
170173
Type: webrtc.SDPTypeOffer,
171174
SDP: sdpOffer,

pkg/peer/webrtc.go

Lines changed: 12 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,7 @@ func (p *Peer[ID]) onRtpTrackReceived(remoteTrack *webrtc.TrackRemote, receiver
2323
rtcp := []rtcp.Packet{&rtcp.PictureLossIndication{MediaSSRC: uint32(remoteTrack.SSRC())}}
2424
if rtcpSendErr := p.peerConnection.WriteRTCP(rtcp); rtcpSendErr != nil {
2525
p.logger.Errorf("Failed to send RTCP PLI: %v", rtcpSendErr)
26+
return
2627
}
2728
}
2829
}()
@@ -56,12 +57,14 @@ func (p *Peer[ID]) onRtpTrackReceived(remoteTrack *webrtc.TrackRemote, receiver
5657
p.logger.WithError(readErr).Error("failed to read from remote track")
5758
}
5859
p.sink.Send(PublishedTrackFailed{Track: localTrack})
60+
return
5961
}
6062

6163
// ErrClosedPipe means we don't have any subscribers, this is ok if no peers have connected yet.
6264
if _, err = localTrack.Write(rtpBuf[:index]); err != nil && !errors.Is(err, io.ErrClosedPipe) {
6365
p.logger.WithError(err).Error("failed to write to local track")
6466
p.sink.Send(PublishedTrackFailed{Track: localTrack})
67+
return
6568
}
6669
}
6770
}()
@@ -98,7 +101,7 @@ func (p *Peer[ID]) onNegotiationNeeded() {
98101

99102
// A callback that is called once we receive an ICE connection state change for this peer connection.
100103
func (p *Peer[ID]) onICEConnectionStateChanged(state webrtc.ICEConnectionState) {
101-
p.logger.WithField("state", state).Debug("ICE connection state changed")
104+
p.logger.WithField("state", state).Info("ICE connection state changed")
102105

103106
// TODO: Ask Simon if we should do it here as in the previous implementation.
104107
switch state {
@@ -120,7 +123,7 @@ func (p *Peer[ID]) onSignalingStateChanged(state webrtc.SignalingState) {
120123
}
121124

122125
func (p *Peer[ID]) onConnectionStateChanged(state webrtc.PeerConnectionState) {
123-
p.logger.WithField("state", state).Debug("connection state changed")
126+
p.logger.WithField("state", state).Info("Connection state changed")
124127

125128
switch state {
126129
case webrtc.PeerConnectionStateFailed, webrtc.PeerConnectionStateDisconnected, webrtc.PeerConnectionStateClosed:
@@ -136,33 +139,33 @@ func (p *Peer[ID]) onDataChannelReady(dc *webrtc.DataChannel) {
136139
defer p.dataChannelMutex.Unlock()
137140

138141
if p.dataChannel != nil {
139-
p.logger.Error("data channel already exists")
142+
p.logger.Error("Data channel already exists")
140143
p.dataChannel.Close()
141144
return
142145
}
143146

144147
p.dataChannel = dc
145-
p.logger.WithField("label", dc.Label()).Info("data channel ready")
148+
p.logger.WithField("label", dc.Label()).Info("Data channel ready")
146149

147150
dc.OnOpen(func() {
148-
p.logger.Info("data channel opened")
151+
p.logger.Info("Data channel opened")
149152
p.sink.Send(DataChannelAvailable{})
150153
})
151154

152155
dc.OnMessage(func(msg webrtc.DataChannelMessage) {
153-
p.logger.WithField("message", msg).Debug("data channel message received")
156+
p.logger.WithField("message", msg).Debug("Data channel message received")
154157
if msg.IsString {
155158
p.sink.Send(DataChannelMessage{Message: string(msg.Data)})
156159
} else {
157-
p.logger.Warn("data channel message is not a string, ignoring")
160+
p.logger.Warn("Data channel message is not a string, ignoring")
158161
}
159162
})
160163

161164
dc.OnError(func(err error) {
162-
p.logger.WithError(err).Error("data channel error")
165+
p.logger.WithError(err).Error("Data channel error")
163166
})
164167

165168
dc.OnClose(func() {
166-
p.logger.Info("data channel closed")
169+
p.logger.Info("Data channel closed")
167170
})
168171
}

pkg/signaling/matrix.go

Lines changed: 0 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -17,8 +17,6 @@ limitations under the License.
1717
package signaling
1818

1919
import (
20-
"encoding/json"
21-
2220
"github.com/sirupsen/logrus"
2321
"maunium.net/go/mautrix"
2422
"maunium.net/go/mautrix/event"
@@ -144,16 +142,6 @@ func (m *MatrixForConference) sendToDevice(user MatrixRecipient, eventType event
144142
},
145143
}
146144

147-
{
148-
// TODO: Remove this once
149-
serialized, err := json.Marshal(sendRequest)
150-
if err != nil {
151-
logger.WithError(err).Error("Failed to serialize to-device message")
152-
return
153-
}
154-
logger.Debugf("Sending to-device message: %s", string(serialized))
155-
}
156-
157145
if _, err := m.client.SendToDevice(eventType, sendRequest); err != nil {
158146
logger.Errorf("failed to send to-device event: %w", err)
159147
}

0 commit comments

Comments
 (0)