Skip to content

Commit 01419b0

Browse files
authored
Address publish race. (#556)
With a single channel to receive track published response, responses could have been mixed up with requests. Demux using request Cid. For #555
1 parent 54b2212 commit 01419b0

File tree

2 files changed

+50
-26
lines changed

2 files changed

+50
-26
lines changed

engine.go

Lines changed: 39 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -37,17 +37,22 @@ const (
3737
)
3838

3939
type RTCEngine struct {
40-
log protoLogger.Logger
41-
pclock sync.Mutex
42-
publisher *PCTransport
43-
subscriber *PCTransport
44-
client *SignalClient
45-
dclock sync.RWMutex
46-
reliableDC *webrtc.DataChannel
47-
lossyDC *webrtc.DataChannel
48-
reliableDCSub *webrtc.DataChannel
49-
lossyDCSub *webrtc.DataChannel
50-
trackPublishedChan chan *livekit.TrackPublishedResponse
40+
log protoLogger.Logger
41+
42+
pclock sync.Mutex
43+
publisher *PCTransport
44+
subscriber *PCTransport
45+
client *SignalClient
46+
47+
dclock sync.RWMutex
48+
reliableDC *webrtc.DataChannel
49+
lossyDC *webrtc.DataChannel
50+
reliableDCSub *webrtc.DataChannel
51+
lossyDCSub *webrtc.DataChannel
52+
53+
trackPublishedListenersLock sync.Mutex
54+
trackPublishedListeners map[string]chan *livekit.TrackPublishedResponse
55+
5156
subscriberPrimary bool
5257
hasConnected atomic.Bool
5358
hasPublish atomic.Bool
@@ -79,10 +84,10 @@ type RTCEngine struct {
7984

8085
func NewRTCEngine() *RTCEngine {
8186
e := &RTCEngine{
82-
log: logger,
83-
client: NewSignalClient(),
84-
trackPublishedChan: make(chan *livekit.TrackPublishedResponse, 1),
85-
JoinTimeout: 15 * time.Second,
87+
log: logger,
88+
client: NewSignalClient(),
89+
trackPublishedListeners: make(map[string]chan *livekit.TrackPublishedResponse),
90+
JoinTimeout: 15 * time.Second,
8691
}
8792

8893
e.client.OnParticipantUpdate = func(info []*livekit.ParticipantInfo) {
@@ -207,10 +212,6 @@ func (e *RTCEngine) Subscriber() (*PCTransport, bool) {
207212
return e.subscriber, e.subscriber != nil
208213
}
209214

210-
func (e *RTCEngine) TrackPublishedChan() <-chan *livekit.TrackPublishedResponse {
211-
return e.trackPublishedChan
212-
}
213-
214215
func (e *RTCEngine) setRTT(rtt uint32) {
215216
if subscriber, ok := e.Subscriber(); ok {
216217
subscriber.SetRTT(rtt)
@@ -472,8 +473,26 @@ func (e *RTCEngine) dataPubChannelReady() bool {
472473
return e.reliableDC.ReadyState() == webrtc.DataChannelStateOpen && e.lossyDC.ReadyState() == webrtc.DataChannelStateOpen
473474
}
474475

476+
func (e *RTCEngine) RegisterTrackPublishedListener(cid string, c chan *livekit.TrackPublishedResponse) {
477+
e.trackPublishedListenersLock.Lock()
478+
e.trackPublishedListeners[cid] = c
479+
e.trackPublishedListenersLock.Unlock()
480+
}
481+
482+
func (e *RTCEngine) UnregisterTrackPublishedListener(cid string) {
483+
e.trackPublishedListenersLock.Lock()
484+
delete(e.trackPublishedListeners, cid)
485+
e.trackPublishedListenersLock.Unlock()
486+
}
487+
475488
func (e *RTCEngine) handleLocalTrackPublished(res *livekit.TrackPublishedResponse) {
476-
e.trackPublishedChan <- res
489+
e.trackPublishedListenersLock.Lock()
490+
listener, ok := e.trackPublishedListeners[res.Cid]
491+
e.trackPublishedListenersLock.Unlock()
492+
493+
if ok {
494+
listener <- res
495+
}
477496
}
478497

479498
func (e *RTCEngine) handleLocalTrackUnpublished(res *livekit.TrackUnpublishedResponse) {

localparticipant.go

Lines changed: 11 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -107,24 +107,26 @@ func (p *LocalParticipant) PublishTrack(track webrtc.TrackLocal, opts *TrackPubl
107107

108108
publisher.Negotiate()
109109

110-
pubChan := p.engine.TrackPublishedChan()
111-
var pubRes *livekit.TrackPublishedResponse
110+
pubChan := make(chan *livekit.TrackPublishedResponse, 1)
111+
p.engine.RegisterTrackPublishedListener(track.ID(), pubChan)
112112

113+
var pubRes *livekit.TrackPublishedResponse
113114
select {
114115
case pubRes = <-pubChan:
115116
break
116117
case <-time.After(trackPublishTimeout):
118+
p.engine.UnregisterTrackPublishedListener(track.ID())
117119
return nil, ErrTrackPublishTimeout
118120
}
119121

122+
p.engine.UnregisterTrackPublishedListener(track.ID())
120123
pub.updateInfo(pubRes.Track)
121124
p.addPublication(pub)
122125

123126
p.Callback.OnLocalTrackPublished(pub, p)
124127
p.roomCallback.OnLocalTrackPublished(pub, p)
125128

126129
p.engine.log.Infow("published track", "name", opts.Name, "source", opts.Source.String(), "trackID", pubRes.Track.Sid)
127-
128130
return pub, nil
129131
}
130132

@@ -191,16 +193,19 @@ func (p *LocalParticipant) PublishSimulcastTrack(tracks []*LocalTrack, opts *Tra
191193
return nil, err
192194
}
193195

194-
pubChan := p.engine.TrackPublishedChan()
195-
var pubRes *livekit.TrackPublishedResponse
196+
pubChan := make(chan *livekit.TrackPublishedResponse, 1)
197+
p.engine.RegisterTrackPublishedListener(mainTrack.ID(), pubChan)
196198

199+
var pubRes *livekit.TrackPublishedResponse
197200
select {
198201
case pubRes = <-pubChan:
199202
break
200203
case <-time.After(trackPublishTimeout):
204+
p.engine.UnregisterTrackPublishedListener(mainTrack.ID())
201205
return nil, ErrTrackPublishTimeout
202206
}
203207

208+
p.engine.UnregisterTrackPublishedListener(mainTrack.ID())
204209
publisher, ok := p.engine.Publisher()
205210
if !ok {
206211
return nil, ErrNoPeerConnection
@@ -428,7 +433,7 @@ func (p *LocalParticipant) UnpublishTrack(sid string) error {
428433
p.Callback.OnLocalTrackUnpublished(pub, p)
429434
p.roomCallback.OnLocalTrackUnpublished(pub, p)
430435

431-
p.engine.log.Infow("unpublished track", "name", pub.Name(), "sid", sid)
436+
p.engine.log.Infow("unpublished track", "name", pub.Name(), "trackID", sid)
432437

433438
return err
434439
}

0 commit comments

Comments
 (0)