Skip to content

Commit ed0299a

Browse files
committed
Allow WHEP Players to connect before WHIP is publishing
1 parent a74796b commit ed0299a

File tree

7 files changed

+104
-42
lines changed

7 files changed

+104
-42
lines changed

internal/webrtc/sessions/manager/manager.go

Lines changed: 20 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -168,6 +168,26 @@ func (m *SessionManager) GetWHEPSessionByID(sessionID string) (whep *whep.WHEPSe
168168
return whepSession, foundSession
169169
}
170170

171+
func (m *SessionManager) SendPLIByWHEPSessionID(sessionID string) {
172+
streamSession, _, foundSession := m.GetSessionAndWHEPByID(sessionID)
173+
if !foundSession {
174+
log.Println("SessionManager.SendPLIByWHEPSessionID: WHEP session not found", sessionID)
175+
return
176+
}
177+
178+
host := streamSession.Host.Load()
179+
if host == nil {
180+
log.Println(
181+
"SessionManager.SendPLIByWHEPSessionID: WHIP session not found",
182+
"whepSessionID", sessionID,
183+
"streamKey", streamSession.StreamKey,
184+
)
185+
return
186+
}
187+
188+
host.SendPLI()
189+
}
190+
171191
func (m *SessionManager) GetSessionAndWHEPByID(sessionID string) (streamSession *session.Session, whepSession *whep.WHEPSession, foundSession bool) {
172192
m.sessionsLock.RLock()
173193
defer m.sessionsLock.RUnlock()

internal/webrtc/sessions/session/session.go

Lines changed: 6 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -25,31 +25,25 @@ func (session *Session) SetOnClose(onClose func()) {
2525
session.onClose = onClose
2626
}
2727

28-
// Add WHEP session to existing WHIP session
29-
func (s *Session) AddWHEP(whepSessionID string, peerConnection *webrtc.PeerConnection, audioTrack *codecs.TrackMultiCodec, videoTrack *codecs.TrackMultiCodec, videoRTCPSender *webrtc.RTPSender) (err error) {
28+
// Add WHEP viewer session
29+
func (s *Session) AddWHEP(whepSessionID string, peerConnection *webrtc.PeerConnection, audioTrack *codecs.TrackMultiCodec, videoTrack *codecs.TrackMultiCodec, videoRTCPSender *webrtc.RTPSender, pliSender func()) (err error) {
3030
log.Println("WHIPSessionManager.WHIPSession.AddWHEPSession")
3131

32-
host := s.Host.Load()
33-
if host == nil {
34-
return fmt.Errorf("no host was found on the current session")
35-
}
36-
3732
whepSession := whep.CreateNewWHEP(
3833
whepSessionID,
3934
audioTrack,
40-
host.GetHighestPrioritizedAudioTrack(),
4135
videoTrack,
42-
host.GetHighestPrioritizedVideoTrack(),
4336
peerConnection,
44-
host.SendPLI)
37+
pliSender,
38+
)
4539

46-
whepSession.RegisterWHEPHandlers(peerConnection)
40+
whepSession.SetOnClose(s.handleWHEPClose)
4741

4842
s.WHEPSessionsLock.Lock()
4943
s.WHEPSessions[whepSessionID] = whepSession
5044
s.WHEPSessionsLock.Unlock()
5145
s.updateHostWHEPSessionsSnapshot()
52-
whepSession.SetOnClose(s.handleWHEPClose)
46+
whepSession.RegisterWHEPHandlers(peerConnection)
5347
go s.handleWHEPVideoRTCPSender(whepSession, videoRTCPSender)
5448

5549
return nil

internal/webrtc/sessions/whep/whep.go

Lines changed: 33 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -12,9 +12,7 @@ import (
1212
func CreateNewWHEP(
1313
whepSessionID string,
1414
audioTrack *codecs.TrackMultiCodec,
15-
audioLayer string,
1615
videoTrack *codecs.TrackMultiCodec,
17-
videoLayer string,
1816
peerConnection *webrtc.PeerConnection,
1917
pliSender func(),
2018
) (w *WHEPSession) {
@@ -31,10 +29,8 @@ func CreateNewWHEP(
3129
videoBitrateWindowStart: time.Now(),
3230
}
3331

34-
log.Println("WHEPSession.CreateNewWHEP.AudioLayer", audioLayer)
35-
log.Println("WHEPSession.CreateNewWHEP.VideoLayer", videoLayer)
36-
w.AudioLayerCurrent.Store(audioLayer)
37-
w.VideoLayerCurrent.Store(videoLayer)
32+
w.AudioLayerCurrent.Store("")
33+
w.VideoLayerCurrent.Store("")
3834
w.IsWaitingForKeyframe.Store(true)
3935
w.IsSessionClosed.Store(false)
4036
return w
@@ -106,15 +102,15 @@ func (w *WHEPSession) GetWHEPSessionStatus() (state SessionState) {
106102
return
107103
}
108104

109-
// Finds the corresponding WHIP session to the WHEP session id and sets the requested audio layer
105+
// Sets the requested audio layer for this WHEP session.
110106
func (w *WHEPSession) SetAudioLayer(encodingID string) {
111107
log.Println("Setting Audio Layer")
112108
w.AudioLayerCurrent.Store(encodingID)
113109
w.IsWaitingForKeyframe.Store(true)
114110
w.SendPLI()
115111
}
116112

117-
// Finds the corresponding WHIP session to the WHEP session id and sets the requested video layer
113+
// Sets the requested video layer for this WHEP session.
118114
func (w *WHEPSession) SetVideoLayer(encodingID string) {
119115
log.Println("Setting Video Layer")
120116
w.VideoLayerCurrent.Store(encodingID)
@@ -127,10 +123,9 @@ func (w *WHEPSession) SendPLI() {
127123
return
128124
}
129125

130-
if w.pliSender != nil {
131-
w.pliSender()
132-
}
126+
w.pliSender()
133127
}
128+
134129
func (w *WHEPSession) updateVideoBitrateLocked(now time.Time) {
135130
if w.videoBitrateWindowStart.IsZero() {
136131
w.videoBitrateWindowStart = now
@@ -151,3 +146,30 @@ func (w *WHEPSession) updateVideoBitrateLocked(now time.Time) {
151146
w.videoBitrateWindowStart = now
152147
w.videoBitrateWindowBytes = w.VideoBytesWritten
153148
}
149+
150+
func (w *WHEPSession) GetAudioLayerOrDefault(defaultLayer string) string {
151+
w.AudioLock.Lock()
152+
defer w.AudioLock.Unlock()
153+
154+
currentLayer, _ := w.AudioLayerCurrent.Load().(string)
155+
if currentLayer != "" {
156+
return currentLayer
157+
}
158+
159+
w.AudioLayerCurrent.Store(defaultLayer)
160+
return defaultLayer
161+
}
162+
163+
func (w *WHEPSession) GetVideoLayerOrDefault(defaultLayer string) string {
164+
w.VideoLock.Lock()
165+
defer w.VideoLock.Unlock()
166+
167+
currentLayer, _ := w.VideoLayerCurrent.Load().(string)
168+
if currentLayer != "" {
169+
return currentLayer
170+
}
171+
172+
w.VideoLayerCurrent.Store(defaultLayer)
173+
w.IsWaitingForKeyframe.Store(true)
174+
return defaultLayer
175+
}

internal/webrtc/sessions/whip/writers.go

Lines changed: 9 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -66,7 +66,7 @@ func (w *WHIPSession) AudioWriter(remoteTrack *webrtc.TrackRemote, streamKey str
6666
}
6767

6868
for _, whepSession := range sessions {
69-
if whepSession.AudioLayerCurrent.Load() == id {
69+
if whepSession.GetAudioLayerOrDefault(id) == id {
7070
whepSession.SendAudioPacket(packet)
7171
}
7272
}
@@ -181,7 +181,8 @@ func (w *WHIPSession) VideoWriter(remoteTrack *webrtc.TrackRemote, streamKey str
181181
sessions = sessionsAny.(map[string]*whep.WHEPSession)
182182
}
183183

184-
sendVideoPacketToWHEP(id,
184+
sendVideoPacketToWHEP(
185+
id,
185186
sessions,
186187
codecs.TrackPacket{
187188
Layer: id,
@@ -190,15 +191,18 @@ func (w *WHIPSession) VideoWriter(remoteTrack *webrtc.TrackRemote, streamKey str
190191
IsKeyframe: isKeyframe,
191192
TimeDiff: timeDiff,
192193
SequenceDiff: sequenceDiff,
193-
})
194+
},
195+
)
194196
}
195197
}
196198

197199
func sendVideoPacketToWHEP(id string, sessions map[string]*whep.WHEPSession, packet codecs.TrackPacket) {
198200
for _, whepSession := range sessions {
199-
if whepSession.VideoLayerCurrent.Load() == id {
200-
whepSession.SendVideoPacket(packet)
201+
if whepSession.GetVideoLayerOrDefault(id) != id {
202+
continue
201203
}
204+
205+
whepSession.SendVideoPacket(packet)
202206
}
203207
}
204208

internal/webrtc/webrtc.go

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -90,7 +90,11 @@ func HandleWHIPDelete(sessionID string) error {
9090
return errors.New("no session found")
9191
}
9292

93-
session.Close()
93+
session.RemoveHost()
94+
if session.GetStreamStatus().ViewerCount == 0 {
95+
session.Close()
96+
}
97+
9498
return nil
9599
}
96100

internal/webrtc/whep.go

Lines changed: 10 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -60,7 +60,16 @@ func WHEP(offer string, streamKey string) (string, string, error) {
6060
}
6161

6262
// TODO: Should this be before gatherComplete to assure registered events are triggered at correct time?
63-
if err := session.AddWHEP(whepSessionID, peerConnection, audioTrack, videoTrack, videoRTCPSender); err != nil {
63+
if err := session.AddWHEP(
64+
whepSessionID,
65+
peerConnection,
66+
audioTrack,
67+
videoTrack,
68+
videoRTCPSender,
69+
func() {
70+
manager.SessionsManager.SendPLIByWHEPSessionID(whepSessionID)
71+
},
72+
); err != nil {
6473
return "", "", err
6574
}
6675

web/src/components/player/Player.tsx

Lines changed: 21 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -49,17 +49,27 @@ const Player = (props: PlayerProps) => {
4949
layerEndpointRef: layerEndpointRef,
5050
onStateChange: (state) => console.log("PeerConnection.onStateChange", state),
5151
onStreamRestart: () => console.log("PeerConnection.onStreamRestart: Missing setup"),
52-
onAudioLayerChange: (layers) => setAudioLayers(layers),
53-
onVideoLayerChange: (layers) => setVideoLayers(layers),
54-
onLayerStatus: (status) => setCurrentLayersStatus(status),
55-
onStreamStatus: (status) => {
56-
if (!status.isOnline) {
57-
setStreamState("Offline")
58-
}
59-
setCurrentStreamStatus(() => status)
60-
},
61-
onError: () => setStreamState("Error"),
62-
}
52+
onAudioLayerChange: (layers) => setAudioLayers(layers),
53+
onVideoLayerChange: (layers) => setVideoLayers(layers),
54+
onLayerStatus: (status) => setCurrentLayersStatus(status),
55+
onStreamStatus: (status) => {
56+
setCurrentStreamStatus(() => status)
57+
58+
if (!status.isOnline) {
59+
setStreamState("Offline")
60+
return
61+
}
62+
63+
const videoElement = videoRef.current
64+
if (videoElement !== null && !videoElement.paused && videoElement.readyState >= HTMLMediaElement.HAVE_CURRENT_DATA) {
65+
setStreamState("Playing")
66+
return
67+
}
68+
69+
setStreamState("Loading")
70+
},
71+
onError: () => setStreamState("Error"),
72+
}
6373

6474
const resetTimer = (isVisible: boolean) => {
6575
setVideoOverlayVisible(() => isVisible);
@@ -103,7 +113,6 @@ const Player = (props: PlayerProps) => {
103113
player?.addEventListener('mouseleave', () => handleOverlayTimer(false))
104114

105115
peerConnectionConfig.onStreamRestart = () => {
106-
// setCurrentLayersStatus(undefined)
107116
setResetCounter((prev) => prev + 1)
108117

109118
PeerConnectionSetup(peerConnectionConfig)

0 commit comments

Comments
 (0)