Skip to content

Commit f9a3f82

Browse files
Merge pull request #30 from matrix-org/SimonBrandner/fix/double-unsubs
2 parents 07becdf + 41b4417 commit f9a3f82

File tree

3 files changed

+53
-24
lines changed

3 files changed

+53
-24
lines changed

src/call.go

Lines changed: 26 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -127,26 +127,11 @@ func (c *Call) onDCPublish(sdp string) {
127127

128128
func (c *Call) onDCUnpublish(stop []event.SFUTrackDescription, sdp string) {
129129
for _, trackDesc := range stop {
130-
trackLogger := c.logger.WithFields(logrus.Fields{
131-
"track_id": trackDesc.TrackID,
132-
"stream_id": trackDesc.StreamID,
133-
})
134-
135-
trackLogger.Info("unpublishing track")
136-
137-
newPublishers := []*Publisher{}
138-
139-
c.mutex.Lock()
140130
for _, publisher := range c.Publishers {
141131
if publisher.Matches(trackDesc) {
142132
publisher.Stop()
143-
} else {
144-
newPublishers = append(newPublishers, publisher)
145133
}
146134
}
147-
148-
c.Publishers = newPublishers
149-
c.mutex.Unlock()
150135
}
151136

152137
err := c.PeerConnection.SetRemoteDescription(webrtc.SessionDescription{
@@ -300,11 +285,7 @@ func (c *Call) iceCandidateHandler(candidate *webrtc.ICECandidate) {
300285
}
301286

302287
func (c *Call) trackHandler(trackRemote *webrtc.TrackRemote) {
303-
publisher := NewPublisher(trackRemote, c)
304-
305-
c.mutex.Lock()
306-
c.Publishers = append(c.Publishers, publisher)
307-
c.mutex.Unlock()
288+
NewPublisher(trackRemote, c)
308289

309290
go c.conf.SendUpdatedMetadataFromCall(c.CallID)
310291
}
@@ -545,16 +526,40 @@ func (c *Call) CheckKeepAliveTimestamp() {
545526
}
546527
}
547528

548-
func (c *Call) RemoveSubscriber(toDelete *Subscriber) {
529+
func (c *Call) RemoveSubscriber(toDelete *Subscriber) bool {
530+
removed := false
549531
newSubscribers := []*Subscriber{}
550532

551533
c.mutex.Lock()
552534
for _, subscriber := range c.Subscribers {
553535
if subscriber != toDelete {
536+
removed = true
537+
} else {
554538
newSubscribers = append(newSubscribers, subscriber)
555539
}
556540
}
557541

558542
c.Subscribers = newSubscribers
559543
c.mutex.Unlock()
544+
545+
return removed
546+
}
547+
548+
func (c *Call) RemovePublisher(toDelete *Publisher) bool {
549+
removed := false
550+
newPublishers := []*Publisher{}
551+
552+
c.mutex.Lock()
553+
for _, publisher := range c.Publishers {
554+
if publisher == toDelete {
555+
removed = true
556+
} else {
557+
newPublishers = append(newPublishers, publisher)
558+
}
559+
}
560+
561+
c.Publishers = newPublishers
562+
c.mutex.Unlock()
563+
564+
return removed
560565
}

src/publisher.go

Lines changed: 13 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -53,6 +53,10 @@ func NewPublisher(
5353
"stream_id": track.StreamID(),
5454
})
5555

56+
call.mutex.Lock()
57+
call.Publishers = append(call.Publishers, publisher)
58+
call.mutex.Unlock()
59+
5660
go WriteRTCP(track, call.PeerConnection, publisher.logger)
5761
go publisher.WriteToSubscribers()
5862

@@ -68,10 +72,18 @@ func (p *Publisher) Subscribe(call *Call) {
6872
}
6973

7074
func (p *Publisher) Stop() {
75+
removed := p.Call.RemovePublisher(p)
76+
77+
if len(p.subscribers) == 0 && !removed {
78+
return
79+
}
80+
7181
for _, subscriber := range p.subscribers {
7282
subscriber.Unsubscribe()
7383
p.RemoveSubscriber(subscriber)
7484
}
85+
86+
p.logger.Info("unpublished track")
7587
}
7688

7789
func (p *Publisher) AddSubscriber(subscriber *Subscriber) {
@@ -83,15 +95,13 @@ func (p *Publisher) AddSubscriber(subscriber *Subscriber) {
8395
func (p *Publisher) RemoveSubscriber(toDelete *Subscriber) {
8496
newSubscribers := []*Subscriber{}
8597

86-
p.mutex.RLock()
98+
p.mutex.Lock()
8799
for _, subscriber := range p.subscribers {
88100
if subscriber != toDelete {
89101
newSubscribers = append(newSubscribers, subscriber)
90102
}
91103
}
92-
p.mutex.RUnlock()
93104

94-
p.mutex.Lock()
95105
p.subscribers = newSubscribers
96106
p.mutex.Unlock()
97107
}

src/subscriber.go

Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -39,10 +39,16 @@ func NewSubscriber(call *Call) *Subscriber {
3939
subscriber.call = call
4040
subscriber.logger = call.logger
4141

42+
call.mutex.Lock()
43+
call.Subscribers = append(call.Subscribers, subscriber)
44+
call.mutex.Unlock()
45+
4246
return subscriber
4347
}
4448

4549
func (s *Subscriber) initLoggingWithTrack(track *webrtc.TrackRemote) {
50+
s.mutex.Lock()
51+
defer s.mutex.Unlock()
4652
s.logger = s.call.logger.WithFields(logrus.Fields{
4753
"track_id": (*track).ID(),
4854
"track_kind": (*track).Kind(),
@@ -79,6 +85,10 @@ func (s *Subscriber) Subscribe(publisher *Publisher) {
7985
}
8086

8187
func (s *Subscriber) Unsubscribe() {
88+
if s.publisher == nil {
89+
return
90+
}
91+
8292
if s.call.PeerConnection.ConnectionState() != webrtc.PeerConnectionStateClosed {
8393
err := s.call.PeerConnection.RemoveTrack(s.sender)
8494
if err != nil {
@@ -88,5 +98,9 @@ func (s *Subscriber) Unsubscribe() {
8898

8999
s.call.RemoveSubscriber(s)
90100

101+
s.mutex.Lock()
102+
s.publisher = nil
103+
s.mutex.Unlock()
104+
91105
s.logger.Info("unsubscribed")
92106
}

0 commit comments

Comments
 (0)