diff --git a/pipeline/ffmpeg.go b/pipeline/ffmpeg.go index b8cb9fc7..0153588a 100644 --- a/pipeline/ffmpeg.go +++ b/pipeline/ffmpeg.go @@ -183,6 +183,7 @@ func (f *ffmpeg) handleStartUploadJob(job *JobInfo, reencodeSegmentation bool) ( if err != nil { return nil, err } + log.Log(job.RequestID, "Finished probing source segments") outputs, transcodedSegments, err := transcode.RunTranscodeProcess(transcodeRequest, job.StreamName, inputInfo, f.Broadcaster) if err != nil { @@ -308,25 +309,60 @@ func (f *ffmpeg) probeSourceSegments(job *JobInfo, sourceSegments []*m3u8.MediaS if job.InputFileInfo.Format == "hls" { return nil } + const ( + keyframeCheckSegments = 12 + fullProbeSegments = 4 + ) + + // First, ensure the first frame of more segments is a keyframe. + if err := f.probeSegmentsWith(job, sourceSegments, keyframeCheckSegments, f.checkFirstFrame); err != nil { + return err + } + + // Then, run the heavier probing logic on a smaller subset. + if err := f.probeSegmentsWith(job, sourceSegments, fullProbeSegments, f.probeSourceSegment); err != nil { + return err + } + + return nil +} + +func (f *ffmpeg) probeSegmentsWith( + job *JobInfo, + sourceSegments []*m3u8.MediaSegment, + n int, + probeFn func(requestID string, seg *m3u8.MediaSegment, sourceManifestURL string) error, +) error { segCount := len(sourceSegments) - if segCount < 6 { - for _, segment := range sourceSegments { - if err := f.probeSourceSegment(job.RequestID, segment, job.SegmentingTargetURL); err != nil { + if segCount == 0 || n <= 0 { + return nil + } + + // For short streams, just probe every segment. + if segCount <= n { + for i := 0; i < segCount; i++ { + if err := probeFn(job.RequestID, sourceSegments[i], job.SegmentingTargetURL); err != nil { return err } } return nil } - segmentsToCheck := []int{0, 1, 2, 3, segCount - 2, segCount - 1} - for _, i := range segmentsToCheck { - if err := f.probeSourceSegment(job.RequestID, sourceSegments[i], job.SegmentingTargetURL); err != nil { + + // For longer streams, probe the first (n-2) and the last two segments. + for i := 0; i < n-2; i++ { + if err := probeFn(job.RequestID, sourceSegments[i], job.SegmentingTargetURL); err != nil { + return err + } + } + for i := segCount - 2; i < segCount; i++ { + if err := probeFn(job.RequestID, sourceSegments[i], job.SegmentingTargetURL); err != nil { return err } } return nil } -func (f *ffmpeg) probeSourceSegment(requestID string, seg *m3u8.MediaSegment, sourceManifestURL string) error { +func (f *ffmpeg) checkFirstFrame(requestID string, seg *m3u8.MediaSegment, sourceManifestURL string) error { u, err := clients.ManifestURLToSegmentURL(sourceManifestURL, seg.URI) if err != nil { return fmt.Errorf("error checking source segments: %w", err) @@ -344,13 +380,26 @@ func (f *ffmpeg) probeSourceSegment(requestID string, seg *m3u8.MediaSegment, so } // ffprobe should print I for i-frame if !strings.HasPrefix(output, "I") || strings.Contains(output, "non-existing PPS") { - return fmt.Errorf("segment does not start with keyframe: %w", ErrKeyframe) + return backoff.Permanent(fmt.Errorf("segment does not start with keyframe: %w", ErrKeyframe)) } return nil }, retries(6)); err != nil { return err } + return nil +} + +func (f *ffmpeg) probeSourceSegment(requestID string, seg *m3u8.MediaSegment, sourceManifestURL string) error { + u, err := clients.ManifestURLToSegmentURL(sourceManifestURL, seg.URI) + if err != nil { + return fmt.Errorf("error checking source segments: %w", err) + } + probeURL, err := clients.SignURL(u) + if err != nil { + return fmt.Errorf("failed to create signed url for %s: %w", u.Redacted(), err) + } + if err := backoff.Retry(func() error { _, err = f.probe.ProbeFile(requestID, probeURL) if err != nil { diff --git a/pipeline/ffmpeg_test.go b/pipeline/ffmpeg_test.go index e31790a8..c6ff0d43 100644 --- a/pipeline/ffmpeg_test.go +++ b/pipeline/ffmpeg_test.go @@ -261,6 +261,6 @@ func Test_probeSegments(t *testing.T) { require.Equal(t, []string{"/0.ts", "/0.ts", "/1.ts", "/1.ts", "/2.ts", "/2.ts", "/3.ts", "/3.ts"}, probe.probedUrls) probe.probedUrls = []string{} - _ = f.probeSourceSegments(job, []*m3u8.MediaSegment{{URI: "0.ts"}, {URI: "1.ts"}, {URI: "2.ts"}, {URI: "3.ts"}, {URI: "4.ts"}, {URI: "5.ts"}, {URI: "6.ts"}}) - require.Equal(t, []string{"/0.ts", "/0.ts", "/1.ts", "/1.ts", "/2.ts", "/2.ts", "/3.ts", "/3.ts", "/5.ts", "/5.ts", "/6.ts", "/6.ts"}, probe.probedUrls) + _ = f.probeSourceSegments(job, []*m3u8.MediaSegment{{URI: "0.ts"}, {URI: "1.ts"}, {URI: "2.ts"}, {URI: "3.ts"}, {URI: "4.ts"}, {URI: "5.ts"}}) + require.Equal(t, []string{"/0.ts", "/0.ts", "/1.ts", "/1.ts", "/4.ts", "/4.ts", "/5.ts", "/5.ts"}, probe.probedUrls) }