Skip to content

Commit e42b67a

Browse files
authored
Make sure EOS reaches sinks (livekit#408)
* Make sure EOS reaches sinks New sample callback (app sink callback) is doing push to a blocking queue and in case we are e.g blocked inside output writeSample - inside WaitForMediaTime , there is nothing to pump the data out and the app sink thread gets stuck on the blocking push, not being able to process EOS on time The change singnals EOS seen on the source out of band - so the output gets a chance to decide to wait a bit for remaining data to go through but also to cancel and close if it takes too much. This should prevent pipeline frozen scenario (where EOS doesn't reach the sink). Eventually we can get rid of the existing workaround - but leaving it just as a safty net for now. * tidy logging
1 parent 4354eae commit e42b67a

File tree

5 files changed

+97
-13
lines changed

5 files changed

+97
-13
lines changed

pkg/media/eos_dispatcher.go

Lines changed: 43 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,43 @@
1+
package media
2+
3+
import "sync"
4+
5+
// eosDispatcher broadcasts a single EOS signal to any listener that registers,
6+
// replaying the signal to late subscribers if it already fired.
7+
type eosDispatcher struct {
8+
mu sync.Mutex
9+
fired bool
10+
listeners []func()
11+
}
12+
13+
func newEOSDispatcher() *eosDispatcher {
14+
return &eosDispatcher{}
15+
}
16+
17+
func (d *eosDispatcher) Fire() {
18+
d.mu.Lock()
19+
if d.fired {
20+
d.mu.Unlock()
21+
return
22+
}
23+
d.fired = true
24+
listeners := append([]func(){}, d.listeners...)
25+
d.listeners = nil
26+
d.mu.Unlock()
27+
28+
for _, l := range listeners {
29+
go l()
30+
}
31+
}
32+
33+
func (d *eosDispatcher) AddListener(f func()) {
34+
d.mu.Lock()
35+
if d.fired {
36+
d.mu.Unlock()
37+
// Replay immediately if EOS already fired
38+
go f()
39+
return
40+
}
41+
d.listeners = append(d.listeners, f)
42+
d.mu.Unlock()
43+
}

pkg/media/input.go

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -71,6 +71,8 @@ type Input struct {
7171
gateOffset time.Duration
7272
gateTerminator sync.Once
7373
latencyCaps *gst.Caps
74+
75+
onEOS func()
7476
}
7577

7678
type OutputReadyFunc func(pad *gst.Pad, kind types.StreamKind)
@@ -143,8 +145,16 @@ func (i *Input) OnOutputReady(f OutputReadyFunc) {
143145
i.onOutputReady = f
144146
}
145147

148+
func (i *Input) SetOnEOS(f func()) {
149+
i.onEOS = f
150+
}
151+
146152
func (i *Input) Start(ctx context.Context, onCloseTimeout func(ctx context.Context)) error {
147153
return i.source.Start(ctx, func() {
154+
if i.onEOS != nil {
155+
i.onEOS()
156+
}
157+
148158
go func() {
149159
t := time.NewTimer(5 * time.Second)
150160
select {

pkg/media/output.go

Lines changed: 33 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -41,6 +41,7 @@ const (
4141

4242
queueCapacity = 5
4343
latencySampleInterval = 500 * time.Millisecond
44+
eosQueueDrainTimeout = 2 * time.Second
4445
)
4546

4647
// Output manages GStreamer elements that converts & encodes video to the specification that's
@@ -56,6 +57,7 @@ type Output struct {
5657
isPlayingTooSlow func() bool
5758
trackStatsGatherer *stats.MediaTrackStatGatherer
5859
queue *utils.BlockingQueue[*sample]
60+
eos *eosDispatcher
5961

6062
localTrack atomic.Pointer[lksdk_output.LocalTrack]
6163
stopDropping func()
@@ -84,8 +86,8 @@ type AudioOutput struct {
8486
codec livekit.AudioCodec
8587
}
8688

87-
func NewVideoOutput(codec livekit.VideoCodec, layer *livekit.VideoLayer, outputSync *utils.TrackOutputSynchronizer, isPlayingTooSlow func() bool, statsGatherer *stats.LocalMediaStatsGatherer) (*VideoOutput, error) {
88-
e, err := newVideoOutput(codec, outputSync, isPlayingTooSlow)
89+
func NewVideoOutput(codec livekit.VideoCodec, layer *livekit.VideoLayer, outputSync *utils.TrackOutputSynchronizer, isPlayingTooSlow func() bool, statsGatherer *stats.LocalMediaStatsGatherer, eos *eosDispatcher) (*VideoOutput, error) {
90+
e, err := newVideoOutput(codec, outputSync, isPlayingTooSlow, eos)
8991
if err != nil {
9092
return nil, err
9193
}
@@ -248,8 +250,8 @@ func NewVideoOutput(codec livekit.VideoCodec, layer *livekit.VideoLayer, outputS
248250
return e, nil
249251
}
250252

251-
func NewAudioOutput(options *livekit.IngressAudioEncodingOptions, outputSync *utils.TrackOutputSynchronizer, isPlayingTooSlow func() bool, statsGatherer *stats.LocalMediaStatsGatherer) (*AudioOutput, error) {
252-
e, err := newAudioOutput(options.AudioCodec, outputSync, isPlayingTooSlow)
253+
func NewAudioOutput(options *livekit.IngressAudioEncodingOptions, outputSync *utils.TrackOutputSynchronizer, isPlayingTooSlow func() bool, statsGatherer *stats.LocalMediaStatsGatherer, eos *eosDispatcher) (*AudioOutput, error) {
254+
e, err := newAudioOutput(options.AudioCodec, outputSync, isPlayingTooSlow, eos)
253255
if err != nil {
254256
return nil, err
255257
}
@@ -354,8 +356,8 @@ func NewAudioOutput(options *livekit.IngressAudioEncodingOptions, outputSync *ut
354356
return e, nil
355357
}
356358

357-
func newVideoOutput(codec livekit.VideoCodec, outputSync *utils.TrackOutputSynchronizer, isPlayingTooSlow func() bool) (*VideoOutput, error) {
358-
e, err := newOutput(outputSync, isPlayingTooSlow)
359+
func newVideoOutput(codec livekit.VideoCodec, outputSync *utils.TrackOutputSynchronizer, isPlayingTooSlow func() bool, eos *eosDispatcher) (*VideoOutput, error) {
360+
e, err := newOutput(outputSync, isPlayingTooSlow, eos)
359361
if err != nil {
360362
return nil, err
361363
}
@@ -373,8 +375,8 @@ func newVideoOutput(codec livekit.VideoCodec, outputSync *utils.TrackOutputSynch
373375
return o, nil
374376
}
375377

376-
func newAudioOutput(codec livekit.AudioCodec, outputSync *utils.TrackOutputSynchronizer, isPlayingTooSlow func() bool) (*AudioOutput, error) {
377-
e, err := newOutput(outputSync, isPlayingTooSlow)
378+
func newAudioOutput(codec livekit.AudioCodec, outputSync *utils.TrackOutputSynchronizer, isPlayingTooSlow func() bool, eos *eosDispatcher) (*AudioOutput, error) {
379+
e, err := newOutput(outputSync, isPlayingTooSlow, eos)
378380
if err != nil {
379381
return nil, err
380382
}
@@ -392,7 +394,7 @@ func newAudioOutput(codec livekit.AudioCodec, outputSync *utils.TrackOutputSynch
392394
return o, nil
393395
}
394396

395-
func newOutput(outputSync *utils.TrackOutputSynchronizer, isPlayingTooSlow func() bool) (*Output, error) {
397+
func newOutput(outputSync *utils.TrackOutputSynchronizer, isPlayingTooSlow func() bool, eos *eosDispatcher) (*Output, error) {
396398
sink, err := app.NewAppSink()
397399
if err != nil {
398400
return nil, err
@@ -403,6 +405,11 @@ func newOutput(outputSync *utils.TrackOutputSynchronizer, isPlayingTooSlow func(
403405
sink: sink,
404406
outputSync: outputSync,
405407
isPlayingTooSlow: isPlayingTooSlow,
408+
eos: eos,
409+
}
410+
411+
if e.eos != nil {
412+
e.eos.AddListener(e.onSourceEOS)
406413
}
407414

408415
e.start()
@@ -512,6 +519,7 @@ func (e *Output) QueueLength() int {
512519
}
513520

514521
func (e *Output) Close() error {
522+
e.logger.Debugw("closing output")
515523

516524
e.closed.Break()
517525
e.outputSync.Close()
@@ -520,6 +528,22 @@ func (e *Output) Close() error {
520528
return nil
521529
}
522530

531+
func (e *Output) onSourceEOS() {
532+
e.logger.Debugw("eos received, eventually closing queue after timeout")
533+
go func() {
534+
timer := time.NewTimer(eosQueueDrainTimeout)
535+
defer timer.Stop()
536+
537+
select {
538+
case <-timer.C:
539+
e.Close()
540+
e.logger.Debugw("output closed on eos timeout")
541+
case <-e.closed.Watch():
542+
// already closed as a result of handling EOS in-band
543+
}
544+
}()
545+
}
546+
523547
func (e *VideoOutput) handleSample(sink *app.Sink) gst.FlowReturn {
524548
// Return an error if the last write failed
525549
if errPtr := e.pipelineErr.Load(); errPtr != nil {

pkg/media/pipeline.go

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -52,6 +52,8 @@ type Pipeline struct {
5252
cancel atomic.Pointer[context.CancelFunc]
5353

5454
pipelineErr chan error
55+
56+
eos *eosDispatcher
5557
}
5658

5759
func New(ctx context.Context, conf *config.Config, params *params.Params, g *stats.LocalMediaStatsGatherer) (*Pipeline, error) {
@@ -83,8 +85,11 @@ func New(ctx context.Context, conf *config.Config, params *params.Params, g *sta
8385
pipeline: pipeline,
8486
input: input,
8587
pipelineErr: make(chan error, 1),
88+
eos: newEOSDispatcher(),
8689
}
8790

91+
input.SetOnEOS(p.eos.Fire)
92+
8893
sink, err := NewWebRTCSink(ctx, params, func() {
8994
if cancel := p.cancel.Load(); cancel != nil {
9095
(*cancel)()
@@ -93,7 +98,7 @@ func New(ctx context.Context, conf *config.Config, params *params.Params, g *sta
9398
if p.loop != nil {
9499
p.loop.Quit()
95100
}
96-
}, g)
101+
}, g, p.eos)
97102
if err != nil {
98103
return nil, err
99104
}

pkg/media/webrtc_sink.go

Lines changed: 5 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -54,13 +54,14 @@ type WebRTCSink struct {
5454
outputSync *utils.OutputSynchronizer
5555
spliceProcessor *SpliceProcessor
5656
statsGatherer *stats.LocalMediaStatsGatherer
57+
eos *eosDispatcher
5758

5859
// logging
5960
tooSlowThrottle core.Throttle
6061
tooSlowLogEvents atomic.Int32
6162
}
6263

63-
func NewWebRTCSink(ctx context.Context, p *params.Params, onFailure func(), statsGatherer *stats.LocalMediaStatsGatherer) (*WebRTCSink, error) {
64+
func NewWebRTCSink(ctx context.Context, p *params.Params, onFailure func(), statsGatherer *stats.LocalMediaStatsGatherer, eos *eosDispatcher) (*WebRTCSink, error) {
6465
ctx, span := tracer.Start(ctx, "media.NewWebRTCSink")
6566
defer span.End()
6667

@@ -70,6 +71,7 @@ func NewWebRTCSink(ctx context.Context, p *params.Params, onFailure func(), stat
7071
errChan: make(chan error),
7172
outputSync: utils.NewOutputSynchronizer(),
7273
statsGatherer: statsGatherer,
74+
eos: eos,
7375
tooSlowThrottle: core.NewThrottle(5 * time.Second),
7476
}
7577

@@ -104,7 +106,7 @@ func NewWebRTCSink(ctx context.Context, p *params.Params, onFailure func(), stat
104106
}
105107

106108
func (s *WebRTCSink) addAudioTrack() (*Output, error) {
107-
output, err := NewAudioOutput(s.params.AudioEncodingOptions, s.outputSync.AddTrack(), s.isPlayingTooSlow, s.statsGatherer)
109+
output, err := NewAudioOutput(s.params.AudioEncodingOptions, s.outputSync.AddTrack(), s.isPlayingTooSlow, s.statsGatherer, s.eos)
108110
if err != nil {
109111
logger.Errorw("could not create output", err)
110112
return nil, err
@@ -190,7 +192,7 @@ func (s *WebRTCSink) addVideoTrack(w, h int) ([]*Output, error) {
190192
sortedLayers := filterAndSortLayersByQuality(s.params.VideoEncodingOptions.Layers, w, h)
191193

192194
for _, layer := range sortedLayers {
193-
output, err := NewVideoOutput(s.params.VideoEncodingOptions.VideoCodec, layer, s.outputSync.AddTrack(), s.isPlayingTooSlow, s.statsGatherer)
195+
output, err := NewVideoOutput(s.params.VideoEncodingOptions.VideoCodec, layer, s.outputSync.AddTrack(), s.isPlayingTooSlow, s.statsGatherer, s.eos)
194196
if err != nil {
195197
return nil, err
196198
}

0 commit comments

Comments
 (0)