Skip to content

Commit ef418da

Browse files
authored
Split out probing and keyframe checks (#1409)
We most likely want to check more segments for the keyframe issue to be on the safe side. While we don't want to add a bunch more processing time by performing the other probing checks on those segments too, so these checks are now separated out.
1 parent 3683288 commit ef418da

File tree

2 files changed

+59
-10
lines changed

2 files changed

+59
-10
lines changed

pipeline/ffmpeg.go

Lines changed: 57 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -183,6 +183,7 @@ func (f *ffmpeg) handleStartUploadJob(job *JobInfo, reencodeSegmentation bool) (
183183
if err != nil {
184184
return nil, err
185185
}
186+
log.Log(job.RequestID, "Finished probing source segments")
186187

187188
outputs, transcodedSegments, err := transcode.RunTranscodeProcess(transcodeRequest, job.StreamName, inputInfo, f.Broadcaster)
188189
if err != nil {
@@ -308,25 +309,60 @@ func (f *ffmpeg) probeSourceSegments(job *JobInfo, sourceSegments []*m3u8.MediaS
308309
if job.InputFileInfo.Format == "hls" {
309310
return nil
310311
}
312+
const (
313+
keyframeCheckSegments = 12
314+
fullProbeSegments = 4
315+
)
316+
317+
// First, ensure the first frame of more segments is a keyframe.
318+
if err := f.probeSegmentsWith(job, sourceSegments, keyframeCheckSegments, f.checkFirstFrame); err != nil {
319+
return err
320+
}
321+
322+
// Then, run the heavier probing logic on a smaller subset.
323+
if err := f.probeSegmentsWith(job, sourceSegments, fullProbeSegments, f.probeSourceSegment); err != nil {
324+
return err
325+
}
326+
327+
return nil
328+
}
329+
330+
func (f *ffmpeg) probeSegmentsWith(
331+
job *JobInfo,
332+
sourceSegments []*m3u8.MediaSegment,
333+
n int,
334+
probeFn func(requestID string, seg *m3u8.MediaSegment, sourceManifestURL string) error,
335+
) error {
311336
segCount := len(sourceSegments)
312-
if segCount < 6 {
313-
for _, segment := range sourceSegments {
314-
if err := f.probeSourceSegment(job.RequestID, segment, job.SegmentingTargetURL); err != nil {
337+
if segCount == 0 || n <= 0 {
338+
return nil
339+
}
340+
341+
// For short streams, just probe every segment.
342+
if segCount <= n {
343+
for i := 0; i < segCount; i++ {
344+
if err := probeFn(job.RequestID, sourceSegments[i], job.SegmentingTargetURL); err != nil {
315345
return err
316346
}
317347
}
318348
return nil
319349
}
320-
segmentsToCheck := []int{0, 1, 2, 3, segCount - 2, segCount - 1}
321-
for _, i := range segmentsToCheck {
322-
if err := f.probeSourceSegment(job.RequestID, sourceSegments[i], job.SegmentingTargetURL); err != nil {
350+
351+
// For longer streams, probe the first (n-2) and the last two segments.
352+
for i := 0; i < n-2; i++ {
353+
if err := probeFn(job.RequestID, sourceSegments[i], job.SegmentingTargetURL); err != nil {
354+
return err
355+
}
356+
}
357+
for i := segCount - 2; i < segCount; i++ {
358+
if err := probeFn(job.RequestID, sourceSegments[i], job.SegmentingTargetURL); err != nil {
323359
return err
324360
}
325361
}
326362
return nil
327363
}
328364

329-
func (f *ffmpeg) probeSourceSegment(requestID string, seg *m3u8.MediaSegment, sourceManifestURL string) error {
365+
func (f *ffmpeg) checkFirstFrame(requestID string, seg *m3u8.MediaSegment, sourceManifestURL string) error {
330366
u, err := clients.ManifestURLToSegmentURL(sourceManifestURL, seg.URI)
331367
if err != nil {
332368
return fmt.Errorf("error checking source segments: %w", err)
@@ -344,13 +380,26 @@ func (f *ffmpeg) probeSourceSegment(requestID string, seg *m3u8.MediaSegment, so
344380
}
345381
// ffprobe should print I for i-frame
346382
if !strings.HasPrefix(output, "I") || strings.Contains(output, "non-existing PPS") {
347-
return fmt.Errorf("segment does not start with keyframe: %w", ErrKeyframe)
383+
return backoff.Permanent(fmt.Errorf("segment does not start with keyframe: %w", ErrKeyframe))
348384
}
349385
return nil
350386
}, retries(6)); err != nil {
351387
return err
352388
}
353389

390+
return nil
391+
}
392+
393+
func (f *ffmpeg) probeSourceSegment(requestID string, seg *m3u8.MediaSegment, sourceManifestURL string) error {
394+
u, err := clients.ManifestURLToSegmentURL(sourceManifestURL, seg.URI)
395+
if err != nil {
396+
return fmt.Errorf("error checking source segments: %w", err)
397+
}
398+
probeURL, err := clients.SignURL(u)
399+
if err != nil {
400+
return fmt.Errorf("failed to create signed url for %s: %w", u.Redacted(), err)
401+
}
402+
354403
if err := backoff.Retry(func() error {
355404
_, err = f.probe.ProbeFile(requestID, probeURL)
356405
if err != nil {

pipeline/ffmpeg_test.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -261,6 +261,6 @@ func Test_probeSegments(t *testing.T) {
261261
require.Equal(t, []string{"/0.ts", "/0.ts", "/1.ts", "/1.ts", "/2.ts", "/2.ts", "/3.ts", "/3.ts"}, probe.probedUrls)
262262

263263
probe.probedUrls = []string{}
264-
_ = f.probeSourceSegments(job, []*m3u8.MediaSegment{{URI: "0.ts"}, {URI: "1.ts"}, {URI: "2.ts"}, {URI: "3.ts"}, {URI: "4.ts"}, {URI: "5.ts"}, {URI: "6.ts"}})
265-
require.Equal(t, []string{"/0.ts", "/0.ts", "/1.ts", "/1.ts", "/2.ts", "/2.ts", "/3.ts", "/3.ts", "/5.ts", "/5.ts", "/6.ts", "/6.ts"}, probe.probedUrls)
264+
_ = f.probeSourceSegments(job, []*m3u8.MediaSegment{{URI: "0.ts"}, {URI: "1.ts"}, {URI: "2.ts"}, {URI: "3.ts"}, {URI: "4.ts"}, {URI: "5.ts"}})
265+
require.Equal(t, []string{"/0.ts", "/0.ts", "/1.ts", "/1.ts", "/4.ts", "/4.ts", "/5.ts", "/5.ts"}, probe.probedUrls)
266266
}

0 commit comments

Comments
 (0)