@@ -18,6 +18,7 @@ package main
1818
1919import (
2020 "encoding/json"
21+ "sync"
2122 "time"
2223
2324 "github.com/pion/webrtc/v3"
@@ -28,18 +29,49 @@ import (
2829)
2930
3031type Call struct {
31- CallID string
32- UserID id.UserID
33- DeviceID id.DeviceID
34- LocalSessionID id.SessionID
35- RemoteSessionID id.SessionID
36- Client * mautrix.Client
37- PeerConnection * webrtc.PeerConnection
38- Conf * Conference
32+ PeerConnection * webrtc.PeerConnection
33+
34+ CallID string
35+ UserID id.UserID
36+ DeviceID id.DeviceID
37+ LocalSessionID id.SessionID
38+ RemoteSessionID id.SessionID
39+
40+ Publishers []* Publisher
41+ Subscribers []* Subscriber
42+
43+ mutex sync.RWMutex
44+ logger * logrus.Entry
45+ client * mautrix.Client
46+ conf * Conference
47+
3948 dataChannel * webrtc.DataChannel
4049 lastKeepAliveTimestamp time.Time
4150 sentEndOfCandidates bool
42- logger * logrus.Entry
51+ }
52+
53+ func NewCall (callID string , conf * Conference ) * Call {
54+ call := new (Call )
55+
56+ call .CallID = callID
57+ call .conf = conf
58+
59+ return call
60+ }
61+
62+ func (c * Call ) InitWithInvite (evt * event.Event , client * mautrix.Client ) {
63+ invite := evt .Content .AsCallInvite ()
64+
65+ c .UserID = evt .Sender
66+ c .DeviceID = invite .DeviceID
67+ // XXX: What if an SFU gets restarted?
68+ c .LocalSessionID = localSessionID
69+ c .RemoteSessionID = invite .SenderSessionID
70+ c .client = client
71+ c .logger = logrus .WithFields (logrus.Fields {
72+ "user_id" : evt .Sender ,
73+ "conf_id" : invite .ConfID ,
74+ })
4375}
4476
4577func (c * Call ) onDCSelect (start []event.SFUTrackDescription ) {
@@ -55,21 +87,9 @@ func (c *Call) onDCSelect(start []event.SFUTrackDescription) {
5587
5688 trackLogger .Info ("selecting track" )
5789
58- foundTracks := c .Conf .GetLocalTrackByInfo (LocalTrackInfo {
59- StreamID : trackDesc .StreamID ,
60- TrackID : trackDesc .TrackID ,
61- })
62-
63- if len (foundTracks ) == 0 {
64- trackLogger .Info ("no track found" )
65- continue
66- }
67-
68- for _ , track := range foundTracks {
69- if _ , err := c .PeerConnection .AddTrack (track ); err == nil {
70- trackLogger .Info ("added track" )
71- } else {
72- trackLogger .WithError (err ).Error ("failed to add track" )
90+ for _ , publisher := range c .conf .GetPublishers () {
91+ if publisher .Matches (trackDesc ) {
92+ publisher .Subscribe (c )
7393 }
7494 }
7595 }
@@ -114,12 +134,19 @@ func (c *Call) onDCUnpublish(stop []event.SFUTrackDescription, sdp string) {
114134
115135 trackLogger .Info ("unpublishing track" )
116136
117- if removedTracksCount := c .Conf .RemoveTracksFromPeerConnectionsByInfo (LocalTrackInfo {
118- StreamID : trackDesc .StreamID ,
119- TrackID : trackDesc .TrackID ,
120- }); removedTracksCount == 0 {
121- trackLogger .Info ("no tracks to remove" )
137+ newPublishers := []* Publisher {}
138+
139+ c .mutex .Lock ()
140+ for _ , publisher := range c .Publishers {
141+ if publisher .Matches (trackDesc ) {
142+ publisher .Stop ()
143+ } else {
144+ newPublishers = append (newPublishers , publisher )
145+ }
122146 }
147+
148+ c .Publishers = newPublishers
149+ c .mutex .Unlock ()
123150 }
124151
125152 err := c .PeerConnection .SetRemoteDescription (webrtc.SessionDescription {
@@ -169,7 +196,7 @@ func (c *Call) onDCAlive() {
169196func (c * Call ) onDCMetadata () {
170197 c .logger .Info ("received DC metadata" )
171198
172- c .Conf .SendUpdatedMetadataFromCall (c .CallID )
199+ c .conf .SendUpdatedMetadataFromCall (c .CallID )
173200}
174201
175202func (c * Call ) dataChannelHandler (channel * webrtc.DataChannel ) {
@@ -196,7 +223,7 @@ func (c *Call) dataChannelHandler(channel *webrtc.DataChannel) {
196223 }
197224
198225 if msg .Metadata != nil {
199- c .Conf . UpdateSDPStreamMetadata (c .DeviceID , msg .Metadata )
226+ c .conf . Metadata . Update (c .DeviceID , msg .Metadata )
200227 }
201228
202229 switch msg .Op {
@@ -255,11 +282,11 @@ func (c *Call) iceCandidateHandler(candidate *webrtc.ICECandidate) {
255282 Parsed : event.CallCandidatesEventContent {
256283 BaseCallEventContent : event.BaseCallEventContent {
257284 CallID : c .CallID ,
258- ConfID : c .Conf .ConfID ,
259- DeviceID : c .Client .DeviceID ,
285+ ConfID : c .conf .ConfID ,
286+ DeviceID : c .client .DeviceID ,
260287 SenderSessionID : c .LocalSessionID ,
261288 DestSessionID : c .RemoteSessionID ,
262- PartyID : string (c .Client .DeviceID ),
289+ PartyID : string (c .client .DeviceID ),
263290 Version : event .CallVersion ("1" ),
264291 },
265292 Candidates : []event.CallCandidate {{
@@ -273,43 +300,13 @@ func (c *Call) iceCandidateHandler(candidate *webrtc.ICECandidate) {
273300}
274301
275302func (c * Call ) trackHandler (trackRemote * webrtc.TrackRemote ) {
276- trackLogger := c .logger .WithFields (logrus.Fields {
277- "track_id" : trackRemote .ID (),
278- "track_kind" : trackRemote .Kind (),
279- "stream_id" : trackRemote .StreamID (),
280- })
303+ publisher := NewPublisher (trackRemote , c )
281304
282- go WriteRTCP (trackRemote , c .PeerConnection , trackLogger )
305+ c .mutex .Lock ()
306+ c .Publishers = append (c .Publishers , publisher )
307+ c .mutex .Unlock ()
283308
284- trackLocal , err := webrtc .NewTrackLocalStaticRTP (
285- trackRemote .Codec ().RTPCodecCapability ,
286- trackRemote .ID (),
287- trackRemote .StreamID (),
288- )
289- if err != nil {
290- trackLogger .
291- WithField ("capability" , trackRemote .Codec ().RTPCodecCapability ).
292- WithError (err ).
293- Error ("failed to create new track local static RTP - ignoring" )
294-
295- return
296- }
297-
298- c .Conf .Tracks .Mutex .Lock ()
299- c .Conf .Tracks .Tracks = append (c .Conf .Tracks .Tracks , LocalTrackWithInfo {
300- Track : trackLocal ,
301- Info : LocalTrackInfo {
302- TrackID : trackLocal .ID (),
303- StreamID : trackLocal .StreamID (),
304- Call : c ,
305- },
306- })
307- c .Conf .Tracks .Mutex .Unlock ()
308-
309- trackLogger .Info ("published track" )
310-
311- go c .Conf .SendUpdatedMetadataFromCall (c .CallID )
312- go CopyRemoteToLocal (trackRemote , trackLocal , trackLogger )
309+ go c .conf .SendUpdatedMetadataFromCall (c .CallID )
313310}
314311
315312func (c * Call ) iceConnectionStateHandler (state webrtc.ICEConnectionState ) {
@@ -322,11 +319,11 @@ func (c *Call) iceConnectionStateHandler(state webrtc.ICEConnectionState) {
322319 Parsed : event.CallCandidatesEventContent {
323320 BaseCallEventContent : event.BaseCallEventContent {
324321 CallID : c .CallID ,
325- ConfID : c .Conf .ConfID ,
326- DeviceID : c .Client .DeviceID ,
322+ ConfID : c .conf .ConfID ,
323+ DeviceID : c .client .DeviceID ,
327324 SenderSessionID : c .LocalSessionID ,
328325 DestSessionID : c .RemoteSessionID ,
329- PartyID : string (c .Client .DeviceID ),
326+ PartyID : string (c .client .DeviceID ),
330327 Version : event .CallVersion ("1" ),
331328 },
332329 Candidates : []event.CallCandidate {{Candidate : "" }},
@@ -339,7 +336,7 @@ func (c *Call) iceConnectionStateHandler(state webrtc.ICEConnectionState) {
339336}
340337
341338func (c * Call ) OnInvite (content * event.CallInviteEventContent ) {
342- c .Conf . UpdateSDPStreamMetadata (c .DeviceID , content .SDPStreamMetadata )
339+ c .conf . Metadata . Update (c .DeviceID , content .SDPStreamMetadata )
343340 offer := content .Offer
344341
345342 peerConnection , err := webrtc .NewPeerConnection (webrtc.Configuration {})
@@ -394,26 +391,26 @@ func (c *Call) OnInvite(content *event.CallInviteEventContent) {
394391 Parsed : event.CallAnswerEventContent {
395392 BaseCallEventContent : event.BaseCallEventContent {
396393 CallID : c .CallID ,
397- ConfID : c .Conf .ConfID ,
398- DeviceID : c .Client .DeviceID ,
394+ ConfID : c .conf .ConfID ,
395+ DeviceID : c .client .DeviceID ,
399396 SenderSessionID : c .LocalSessionID ,
400397 DestSessionID : c .RemoteSessionID ,
401- PartyID : string (c .Client .DeviceID ),
398+ PartyID : string (c .client .DeviceID ),
402399 Version : event .CallVersion ("1" ),
403400 },
404401 Answer : event.CallData {
405402 Type : "answer" ,
406403 SDP : peerConnection .LocalDescription ().SDP ,
407404 },
408- SDPStreamMetadata : c .Conf . GetRemoteMetadataForDevice (c .DeviceID ),
405+ SDPStreamMetadata : c .conf . Metadata . GetForDevice (c .DeviceID ),
409406 },
410407 }
411408 c .sendToDevice (event .CallAnswer , answerEvtContent )
412409}
413410
414411func (c * Call ) OnSelectAnswer (content * event.CallSelectAnswerEventContent ) {
415412 selectedPartyID := content .SelectedPartyID
416- if selectedPartyID != string (c .Client .DeviceID ) {
413+ if selectedPartyID != string (c .client .DeviceID ) {
417414 c .logger .WithField ("selected_party_id" , selectedPartyID ).Warn ("call was answered on a different device" )
418415 c .Terminate ()
419416 }
@@ -446,27 +443,31 @@ func (c *Call) Terminate() {
446443 c .logger .WithError (err ).Error ("error closing peer connection" )
447444 }
448445
449- c .Conf . Calls . CallsMu .Lock ()
450- delete (c .Conf . Calls .Calls , c .CallID )
451- c .Conf . Calls . CallsMu .Unlock ()
446+ c .conf . mutex .Lock ()
447+ delete (c .conf .Calls , c .CallID )
448+ c .conf . mutex .Unlock ()
452449
453- info := LocalTrackInfo {Call : c }
454- c .Conf .RemoveTracksFromPeerConnectionsByInfo (info )
455- c .Conf .RemoveTracksFromConfByInfo (info )
456- c .Conf .RemoveMetadataByDeviceID (c .DeviceID )
457- c .Conf .SendUpdatedMetadataFromCall (c .CallID )
450+ for _ , publisher := range c .Publishers {
451+ publisher .Stop ()
452+ }
453+
454+ for _ , subscriber := range c .Subscribers {
455+ subscriber .Unsubscribe ()
456+ }
457+
458+ c .conf .SendUpdatedMetadataFromCall (c .CallID )
458459}
459460
460461func (c * Call ) Hangup (reason event.CallHangupReason ) {
461462 hangupEvtContent := & event.Content {
462463 Parsed : event.CallHangupEventContent {
463464 BaseCallEventContent : event.BaseCallEventContent {
464465 CallID : c .CallID ,
465- ConfID : c .Conf .ConfID ,
466- DeviceID : c .Client .DeviceID ,
466+ ConfID : c .conf .ConfID ,
467+ DeviceID : c .client .DeviceID ,
467468 SenderSessionID : c .LocalSessionID ,
468469 DestSessionID : c .RemoteSessionID ,
469- PartyID : string (c .Client .DeviceID ),
470+ PartyID : string (c .client .DeviceID ),
470471 Version : event .CallVersion ("1" ),
471472 },
472473 Reason : reason ,
@@ -495,7 +496,7 @@ func (c *Call) sendToDevice(callType event.Type, content *event.Content) {
495496
496497 // TODO: E2EE
497498 // TODO: to-device reliability
498- if _ , err := c .Client .SendToDevice (callType , toDevice ); err != nil {
499+ if _ , err := c .client .SendToDevice (callType , toDevice ); err != nil {
499500 evtLogger .WithField ("content" , content ).WithError (err ).Error ("error sending to-device" )
500501 }
501502}
@@ -509,9 +510,11 @@ func (c *Call) SendDataChannelMessage(msg event.SFUMessage) {
509510 "op" : msg .Op ,
510511 })
511512
512- msg .Metadata = c .Conf .GetRemoteMetadataForDevice (c .DeviceID )
513- if msg .Op == "metadata" && len (msg .Metadata ) == 0 {
514- return
513+ if msg .Metadata == nil {
514+ msg .Metadata = c .conf .Metadata .GetForDevice (c .DeviceID )
515+ if msg .Op == event .SFUOperationMetadata && len (msg .Metadata ) == 0 {
516+ return
517+ }
515518 }
516519
517520 marshaled , err := json .Marshal (msg )
@@ -541,3 +544,17 @@ func (c *Call) CheckKeepAliveTimestamp() {
541544 }
542545 }
543546}
547+
548+ func (c * Call ) RemoveSubscriber (toDelete * Subscriber ) {
549+ newSubscribers := []* Subscriber {}
550+
551+ c .mutex .Lock ()
552+ for _ , subscriber := range c .Subscribers {
553+ if subscriber != toDelete {
554+ newSubscribers = append (newSubscribers , subscriber )
555+ }
556+ }
557+
558+ c .Subscribers = newSubscribers
559+ c .mutex .Unlock ()
560+ }
0 commit comments