Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
65 changes: 57 additions & 8 deletions pipeline/ffmpeg.go
Original file line number Diff line number Diff line change
Expand Up @@ -183,6 +183,7 @@ func (f *ffmpeg) handleStartUploadJob(job *JobInfo, reencodeSegmentation bool) (
if err != nil {
return nil, err
}
log.Log(job.RequestID, "Finished probing source segments")

outputs, transcodedSegments, err := transcode.RunTranscodeProcess(transcodeRequest, job.StreamName, inputInfo, f.Broadcaster)
if err != nil {
Expand Down Expand Up @@ -308,25 +309,60 @@ func (f *ffmpeg) probeSourceSegments(job *JobInfo, sourceSegments []*m3u8.MediaS
if job.InputFileInfo.Format == "hls" {
return nil
}
const (
keyframeCheckSegments = 12
fullProbeSegments = 4
)

// First, ensure the first frame of more segments is a keyframe.
if err := f.probeSegmentsWith(job, sourceSegments, keyframeCheckSegments, f.checkFirstFrame); err != nil {
return err
}

// Then, run the heavier probing logic on a smaller subset.
if err := f.probeSegmentsWith(job, sourceSegments, fullProbeSegments, f.probeSourceSegment); err != nil {
return err
}

return nil
}

func (f *ffmpeg) probeSegmentsWith(
job *JobInfo,
sourceSegments []*m3u8.MediaSegment,
n int,
probeFn func(requestID string, seg *m3u8.MediaSegment, sourceManifestURL string) error,
) error {
segCount := len(sourceSegments)
if segCount < 6 {
for _, segment := range sourceSegments {
if err := f.probeSourceSegment(job.RequestID, segment, job.SegmentingTargetURL); err != nil {
if segCount == 0 || n <= 0 {
return nil
}

// For short streams, just probe every segment.
if segCount <= n {
for i := 0; i < segCount; i++ {
if err := probeFn(job.RequestID, sourceSegments[i], job.SegmentingTargetURL); err != nil {
return err
}
}
return nil
}
segmentsToCheck := []int{0, 1, 2, 3, segCount - 2, segCount - 1}
for _, i := range segmentsToCheck {
if err := f.probeSourceSegment(job.RequestID, sourceSegments[i], job.SegmentingTargetURL); err != nil {

// For longer streams, probe the first (n-2) and the last two segments.
for i := 0; i < n-2; i++ {
if err := probeFn(job.RequestID, sourceSegments[i], job.SegmentingTargetURL); err != nil {
return err
}
}
for i := segCount - 2; i < segCount; i++ {
if err := probeFn(job.RequestID, sourceSegments[i], job.SegmentingTargetURL); err != nil {
return err
}
}
return nil
}

func (f *ffmpeg) probeSourceSegment(requestID string, seg *m3u8.MediaSegment, sourceManifestURL string) error {
func (f *ffmpeg) checkFirstFrame(requestID string, seg *m3u8.MediaSegment, sourceManifestURL string) error {
u, err := clients.ManifestURLToSegmentURL(sourceManifestURL, seg.URI)
if err != nil {
return fmt.Errorf("error checking source segments: %w", err)
Expand All @@ -344,13 +380,26 @@ func (f *ffmpeg) probeSourceSegment(requestID string, seg *m3u8.MediaSegment, so
}
// ffprobe should print I for i-frame
if !strings.HasPrefix(output, "I") || strings.Contains(output, "non-existing PPS") {
return fmt.Errorf("segment does not start with keyframe: %w", ErrKeyframe)
return backoff.Permanent(fmt.Errorf("segment does not start with keyframe: %w", ErrKeyframe))
}
return nil
}, retries(6)); err != nil {
return err
}

return nil
}

func (f *ffmpeg) probeSourceSegment(requestID string, seg *m3u8.MediaSegment, sourceManifestURL string) error {
u, err := clients.ManifestURLToSegmentURL(sourceManifestURL, seg.URI)
if err != nil {
return fmt.Errorf("error checking source segments: %w", err)
}
probeURL, err := clients.SignURL(u)
if err != nil {
return fmt.Errorf("failed to create signed url for %s: %w", u.Redacted(), err)
}

if err := backoff.Retry(func() error {
_, err = f.probe.ProbeFile(requestID, probeURL)
if err != nil {
Expand Down
4 changes: 2 additions & 2 deletions pipeline/ffmpeg_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -261,6 +261,6 @@ func Test_probeSegments(t *testing.T) {
require.Equal(t, []string{"/0.ts", "/0.ts", "/1.ts", "/1.ts", "/2.ts", "/2.ts", "/3.ts", "/3.ts"}, probe.probedUrls)

probe.probedUrls = []string{}
_ = f.probeSourceSegments(job, []*m3u8.MediaSegment{{URI: "0.ts"}, {URI: "1.ts"}, {URI: "2.ts"}, {URI: "3.ts"}, {URI: "4.ts"}, {URI: "5.ts"}, {URI: "6.ts"}})
require.Equal(t, []string{"/0.ts", "/0.ts", "/1.ts", "/1.ts", "/2.ts", "/2.ts", "/3.ts", "/3.ts", "/5.ts", "/5.ts", "/6.ts", "/6.ts"}, probe.probedUrls)
_ = f.probeSourceSegments(job, []*m3u8.MediaSegment{{URI: "0.ts"}, {URI: "1.ts"}, {URI: "2.ts"}, {URI: "3.ts"}, {URI: "4.ts"}, {URI: "5.ts"}})
require.Equal(t, []string{"/0.ts", "/0.ts", "/1.ts", "/1.ts", "/4.ts", "/4.ts", "/5.ts", "/5.ts"}, probe.probedUrls)
}
Loading