Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
69 changes: 50 additions & 19 deletions room.go
Original file line number Diff line number Diff line change
Expand Up @@ -159,7 +159,7 @@ type Room struct {

remoteParticipants map[livekit.ParticipantIdentity]*RemoteParticipant
sidToIdentity map[livekit.ParticipantID]livekit.ParticipantIdentity
sidDefers map[livekit.ParticipantID][]func(p *RemoteParticipant)
sidDefers map[livekit.ParticipantID]map[livekit.TrackID]func(p *RemoteParticipant)
metadata string
activeSpeakers []Participant
serverInfo *livekit.ServerInfo
Expand All @@ -176,7 +176,7 @@ func NewRoom(callback *RoomCallback) *Room {
engine: engine,
remoteParticipants: make(map[livekit.ParticipantIdentity]*RemoteParticipant),
sidToIdentity: make(map[livekit.ParticipantID]livekit.ParticipantIdentity),
sidDefers: make(map[livekit.ParticipantID][]func(*RemoteParticipant)),
sidDefers: make(map[livekit.ParticipantID]map[livekit.TrackID]func(*RemoteParticipant)),
callback: NewRoomCallback(),
sidReady: make(chan struct{}),
connectionState: ConnectionStateDisconnected,
Expand Down Expand Up @@ -343,7 +343,9 @@ func (r *Room) JoinWithToken(url, token string, opts ...ConnectOption) error {
r.LocalParticipant.updateSubscriptionPermission()

for _, pi := range joinRes.OtherParticipants {
r.addRemoteParticipant(pi, true)
rp := r.addRemoteParticipant(pi, true)
r.clearParticipantDefers(livekit.ParticipantID(pi.Sid), pi)
r.runParticipantDefers(livekit.ParticipantID(pi.Sid), rp)
}

return nil
Expand All @@ -370,29 +372,49 @@ func (r *Room) setConnectionState(cs ConnectionState) {
r.lock.Unlock()
}

func (r *Room) deferParticipantUpdate(sid livekit.ParticipantID, fnc func(p *RemoteParticipant)) {
func (r *Room) deferParticipantUpdate(sid livekit.ParticipantID, trackID livekit.TrackID, fnc func(p *RemoteParticipant)) {
r.lock.Lock()
defer r.lock.Unlock()
r.sidDefers[sid] = append(r.sidDefers[sid], fnc)

if r.sidDefers[sid] == nil {
r.sidDefers[sid] = make(map[livekit.TrackID]func(p *RemoteParticipant))
}
r.sidDefers[sid][trackID] = fnc
}

func (r *Room) runParticipantDefers(sid livekit.ParticipantID, p *RemoteParticipant) {
r.lock.RLock()
has := len(r.sidDefers[sid]) != 0
r.lock.RUnlock()
if !has {
return
}
r.lock.Lock()
fncs := r.sidDefers[sid]
delete(r.sidDefers, sid)
r.lock.Unlock()
if len(fncs) == 0 {
return

if len(fncs) != 0 {
r.log.Infow("running deferred updates for participant", "participantID", sid, "updates", len(fncs))
for _, fnc := range fncs {
fnc(p)
}
}
r.log.Infow("running deferred updates for participant", "participantID", sid, "updates", len(fncs))
for _, fnc := range fncs {
fnc(p)
}

func (r *Room) clearParticipantDefers(sid livekit.ParticipantID, pi *livekit.ParticipantInfo) {
r.lock.Lock()
defer r.lock.Unlock()

for trackID := range r.sidDefers[sid] {
found := false
for _, ti := range pi.Tracks {
if livekit.TrackID(ti.GetSid()) == trackID {
found = true
break
}
}
if !found {
r.log.Infow("deleting deferred update for participant", "participantID", sid, "trackID", trackID)
delete(r.sidDefers[sid], trackID)
if len(r.sidDefers[sid]) == 0 {
delete(r.sidDefers, sid)
}
}
}
}

Expand Down Expand Up @@ -483,11 +505,17 @@ func (r *Room) handleMediaTrack(track *webrtc.TrackRemote, receiver *webrtc.RTPR

rp := r.GetParticipantBySID(participantID)
if rp == nil {
r.log.Infow("could not find participant, deferring track update", "participantID", participantID)
r.deferParticipantUpdate(livekit.ParticipantID(participantID), update)
r.log.Infow(
"could not find participant, deferring track update",
"participantID", participantID,
"trackID", trackID,
"streamID", streamID,
)
r.deferParticipantUpdate(livekit.ParticipantID(participantID), livekit.TrackID(trackID), update)
return
}
update(rp)
r.runParticipantDefers(livekit.ParticipantID(participantID), rp)
}

func (r *Room) handleDisconnect(reason DisconnectionReason) {
Expand Down Expand Up @@ -573,6 +601,8 @@ func (r *Room) handleParticipantUpdate(participants []*livekit.ParticipantInfo)
}
} else if isNew {
rp = r.addRemoteParticipant(pi, true)
r.clearParticipantDefers(livekit.ParticipantID(pi.Sid), pi)
r.runParticipantDefers(livekit.ParticipantID(pi.Sid), rp)
go r.callback.OnParticipantConnected(rp)
} else {
oldSid := livekit.ParticipantID(rp.SID())
Expand All @@ -584,8 +614,9 @@ func (r *Room) handleParticipantUpdate(participants []*livekit.ParticipantInfo)
delete(r.sidToIdentity, oldSid)
r.sidToIdentity[newSid] = livekit.ParticipantIdentity(rp.Identity())
r.lock.Unlock()
r.runParticipantDefers(newSid, rp)
}
r.clearParticipantDefers(livekit.ParticipantID(pi.Sid), pi)
r.runParticipantDefers(newSid, rp)
}
}
}
Expand Down