diff --git a/.github/workflows/test.yaml b/.github/workflows/test.yaml index 80615c012..bb3bccedf 100644 --- a/.github/workflows/test.yaml +++ b/.github/workflows/test.yaml @@ -63,9 +63,16 @@ jobs: - name: Install dependencies uses: awalsh128/cache-apt-pkgs-action@latest with: - packages: ffmpeg + packages: ffmpeg libblas3 liblapack3 version: 1.0 + - name: Fix BLAS library symlinks + run: | + # Create symlinks so libblas.so.3 is in a standard search path + sudo ln -sf /usr/lib/x86_64-linux-gnu/blas/libblas.so.3 /usr/lib/x86_64-linux-gnu/libblas.so.3 + sudo ln -sf /usr/lib/x86_64-linux-gnu/lapack/liblapack.so.3 /usr/lib/x86_64-linux-gnu/liblapack.so.3 + sudo ldconfig + - name: go fmt run: | go fmt ./... @@ -110,9 +117,16 @@ jobs: - name: Install dependencies uses: awalsh128/cache-apt-pkgs-action@latest with: - packages: ffmpeg + packages: ffmpeg libblas3 liblapack3 version: 1.0 + - name: Fix BLAS library symlinks + run: | + # Create symlinks so libblas.so.3 is in a standard search path + sudo ln -sf /usr/lib/x86_64-linux-gnu/blas/libblas.so.3 /usr/lib/x86_64-linux-gnu/libblas.so.3 + sudo ln -sf /usr/lib/x86_64-linux-gnu/lapack/liblapack.so.3 /usr/lib/x86_64-linux-gnu/liblapack.so.3 + sudo ldconfig + - name: Run cucumber tests run: | go install github.com/cucumber/godog/cmd/godog@latest diff --git a/metrics/metrics.go b/metrics/metrics.go index bbd2fe816..7c1c34789 100644 --- a/metrics/metrics.go +++ b/metrics/metrics.go @@ -63,7 +63,7 @@ type CatalystAPIMetrics struct { AnalyticsMetrics AnalyticsMetrics } -var vodLabels = []string{"source_codec_video", "source_codec_audio", "pipeline", "catalyst_region", "num_profiles", "stage", "version", "is_fallback_mode", "is_livepeer_supported", "is_clip", "is_thumbs"} +var vodLabels = []string{"source_codec_video", "source_codec_audio", "pipeline", "catalyst_region", "num_profiles", "stage", "version", "is_fallback_mode", "is_livepeer_supported", "is_clip", "is_thumbs", "is_reencode"} func NewMetrics() *CatalystAPIMetrics { m := &CatalystAPIMetrics{ diff --git a/pipeline/coordinator.go b/pipeline/coordinator.go index 7f4e96741..9708b6bb1 100644 --- a/pipeline/coordinator.go +++ b/pipeline/coordinator.go @@ -64,6 +64,7 @@ type UploadJobPayload struct { FragMp4TargetURL *url.URL ClipTargetURL *url.URL ThumbnailsTargetURL *url.URL + ReencodeSegmentation bool Mp4OnlyShort bool AccessToken string TranscodeAPIUrl string @@ -639,6 +640,7 @@ func (c *Coordinator) finishJob(job *JobInfo, out *HandlerOutput, err error) { strconv.FormatBool(job.LivepeerSupported), strconv.FormatBool(job.ClipStrategy.Enabled), strconv.FormatBool(job.ThumbnailsTargetURL != nil), + strconv.FormatBool(job.ReencodeSegmentation), } metrics.Metrics.VODPipelineMetrics.Count. diff --git a/pipeline/coordinator_test.go b/pipeline/coordinator_test.go index afac5d119..26004978b 100644 --- a/pipeline/coordinator_test.go +++ b/pipeline/coordinator_test.go @@ -556,6 +556,10 @@ type stubFFprobe struct { Err error } +func (f stubFFprobe) CheckFirstFrame(url string) (string, error) { + return "I", nil +} + func (f stubFFprobe) ProbeFile(_, _ string, _ ...string) (video.InputVideo, error) { if f.Err != nil { return video.InputVideo{}, f.Err diff --git a/pipeline/ffmpeg.go b/pipeline/ffmpeg.go index 56952b0a4..b8cb9fc7d 100644 --- a/pipeline/ffmpeg.go +++ b/pipeline/ffmpeg.go @@ -2,6 +2,7 @@ package pipeline import ( "context" + "errors" "fmt" "net/url" "os" @@ -23,6 +24,10 @@ import ( const LocalSourceFilePattern = "sourcevideo*.mp4" +// ErrKeyframe indicates that a probed segment did not start with a keyframe and +// requires re-segmenting with different parameters. +var ErrKeyframe = errors.New("keyframe error") + type ffmpeg struct { // The base of where to output source segments to SourceOutputURL *url.URL @@ -45,7 +50,24 @@ func (f *ffmpeg) Name() string { } func (f *ffmpeg) HandleStartUploadJob(job *JobInfo) (*HandlerOutput, error) { + // First attempt: try cheap "copy" based segmenting. + out, err := f.handleStartUploadJob(job, false) + if err != nil && errors.Is(err, ErrKeyframe) { + // If we hit a keyframe error when probing source segments, re-run the + // whole pipeline from the top but with a more expensive segmentation + // mode that re-encodes and forces keyframes. + log.Log(job.RequestID, "keyframe error while probing source segments, retrying with re-encoding segmentation") + return f.handleStartUploadJob(job, true) + } + return out, err +} + +// handleStartUploadJob contains the core logic of the ffmpeg pipeline. The +// reencodeSegmentation flag controls whether we use a cheap "copy" based +// segmenting pass or a more expensive re-encoding pass that forces keyframes. +func (f *ffmpeg) handleStartUploadJob(job *JobInfo, reencodeSegmentation bool) (*HandlerOutput, error) { log.Log(job.RequestID, "Handling job via FFMPEG/Livepeer pipeline") + job.ReencodeSegmentation = reencodeSegmentation sourceOutputURL := f.SourceOutputURL.JoinPath(job.RequestID) segmentingTargetURL := sourceOutputURL.JoinPath(config.SEGMENTING_SUBDIR, config.SEGMENTING_TARGET_MANIFEST) @@ -58,7 +80,7 @@ func (f *ffmpeg) HandleStartUploadJob(job *JobInfo) (*HandlerOutput, error) { var localSourceTmp string if job.InputFileInfo.Format != "hls" { var err error - localSourceTmp, err = copyFileToLocalTmpAndSegment(job) + localSourceTmp, err = copyFileToLocalTmpAndSegment(job, reencodeSegmentation) if err != nil { return nil, err } @@ -287,7 +309,7 @@ func (f *ffmpeg) probeSourceSegments(job *JobInfo, sourceSegments []*m3u8.MediaS return nil } segCount := len(sourceSegments) - if segCount < 4 { + if segCount < 6 { for _, segment := range sourceSegments { if err := f.probeSourceSegment(job.RequestID, segment, job.SegmentingTargetURL); err != nil { return err @@ -295,7 +317,7 @@ func (f *ffmpeg) probeSourceSegments(job *JobInfo, sourceSegments []*m3u8.MediaS } return nil } - segmentsToCheck := []int{0, 1, segCount - 2, segCount - 1} + segmentsToCheck := []int{0, 1, 2, 3, segCount - 2, segCount - 1} for _, i := range segmentsToCheck { if err := f.probeSourceSegment(job.RequestID, sourceSegments[i], job.SegmentingTargetURL); err != nil { return err @@ -313,6 +335,22 @@ func (f *ffmpeg) probeSourceSegment(requestID string, seg *m3u8.MediaSegment, so if err != nil { return fmt.Errorf("failed to create signed url for %s: %w", u.Redacted(), err) } + + // check that the segment starts with a keyframe + if err := backoff.Retry(func() error { + output, err := f.probe.CheckFirstFrame(probeURL) + if err != nil { + return fmt.Errorf("failed to check segment starts with keyframe: %w", err) + } + // ffprobe should print I for i-frame + if !strings.HasPrefix(output, "I") || strings.Contains(output, "non-existing PPS") { + return fmt.Errorf("segment does not start with keyframe: %w", ErrKeyframe) + } + return nil + }, retries(6)); err != nil { + return err + } + if err := backoff.Retry(func() error { _, err = f.probe.ProbeFile(requestID, probeURL) if err != nil { @@ -335,7 +373,7 @@ func (f *ffmpeg) probeSourceSegment(requestID string, seg *m3u8.MediaSegment, so return nil } -func copyFileToLocalTmpAndSegment(job *JobInfo) (string, error) { +func copyFileToLocalTmpAndSegment(job *JobInfo, reencodeSegmentation bool) (string, error) { // Create a temporary local file to write to localSourceFile, err := os.CreateTemp(os.TempDir(), LocalSourceFilePattern) if err != nil { @@ -358,7 +396,7 @@ func copyFileToLocalTmpAndSegment(job *JobInfo) (string, error) { } // Begin Segmenting - log.Log(job.RequestID, "Beginning segmenting via FFMPEG/Livepeer pipeline") + log.Log(job.RequestID, "Beginning segmenting via FFMPEG/Livepeer pipeline", "reencode", reencodeSegmentation) job.ReportProgress(clients.TranscodeStatusPreparing, 0.5) // FFMPEG fails when presented with a raw IP + Path type URL, so we prepend "http://" to it @@ -368,7 +406,7 @@ func copyFileToLocalTmpAndSegment(job *JobInfo) (string, error) { } destinationURL := fmt.Sprintf("%s/api/ffmpeg/%s/index.m3u8", internalAddress, job.StreamName) - if err := video.Segment(localSourceFile.Name(), destinationURL, job.TargetSegmentSizeSecs); err != nil { + if err := video.Segment(localSourceFile.Name(), destinationURL, job.TargetSegmentSizeSecs, reencodeSegmentation); err != nil { return "", err } diff --git a/pipeline/ffmpeg_test.go b/pipeline/ffmpeg_test.go index 61d4b2ccf..e31790a8d 100644 --- a/pipeline/ffmpeg_test.go +++ b/pipeline/ffmpeg_test.go @@ -232,6 +232,10 @@ func (p *stubProbe) ProbeFile(requestID string, url string, ffProbeOptions ...st return video.InputVideo{}, nil } +func (p *stubProbe) CheckFirstFrame(url string) (string, error) { + return "I", nil +} + func Test_probeSegments(t *testing.T) { probe := stubProbe{} f := ffmpeg{ @@ -257,6 +261,6 @@ func Test_probeSegments(t *testing.T) { require.Equal(t, []string{"/0.ts", "/0.ts", "/1.ts", "/1.ts", "/2.ts", "/2.ts", "/3.ts", "/3.ts"}, probe.probedUrls) probe.probedUrls = []string{} - _ = f.probeSourceSegments(job, []*m3u8.MediaSegment{{URI: "0.ts"}, {URI: "1.ts"}, {URI: "2.ts"}, {URI: "3.ts"}, {URI: "4.ts"}, {URI: "5.ts"}}) - require.Equal(t, []string{"/0.ts", "/0.ts", "/1.ts", "/1.ts", "/4.ts", "/4.ts", "/5.ts", "/5.ts"}, probe.probedUrls) + _ = f.probeSourceSegments(job, []*m3u8.MediaSegment{{URI: "0.ts"}, {URI: "1.ts"}, {URI: "2.ts"}, {URI: "3.ts"}, {URI: "4.ts"}, {URI: "5.ts"}, {URI: "6.ts"}}) + require.Equal(t, []string{"/0.ts", "/0.ts", "/1.ts", "/1.ts", "/2.ts", "/2.ts", "/3.ts", "/3.ts", "/5.ts", "/5.ts", "/6.ts", "/6.ts"}, probe.probedUrls) } diff --git a/video/probe.go b/video/probe.go index c039db36d..c607cf51b 100644 --- a/video/probe.go +++ b/video/probe.go @@ -4,6 +4,7 @@ import ( "context" "errors" "fmt" + "os/exec" "strconv" "strings" "time" @@ -20,6 +21,7 @@ var ( type Prober interface { ProbeFile(requestID, url string, ffProbeOptions ...string) (InputVideo, error) + CheckFirstFrame(url string) (string, error) } type Probe struct { @@ -288,3 +290,9 @@ func containsStr(slc []string, val string) bool { } return false } + +func (p Probe) CheckFirstFrame(url string) (string, error) { + cmd := exec.Command("ffprobe", "-v", "error", "-select_streams", "v:0", "-show_entries", "frame=pict_type", "-of", "csv=p=0", "-read_intervals", "%+#1", url) + output, err := cmd.CombinedOutput() + return string(output), err +} diff --git a/video/segment.go b/video/segment.go index 4f4b9c079..c0324f4b6 100644 --- a/video/segment.go +++ b/video/segment.go @@ -8,28 +8,57 @@ import ( ffmpeg "github.com/u2takey/ffmpeg-go" ) -// Segment splits a source video URL into segments +// Segment splits a source video URL into segments. // // FFMPEG can use remote files, but depending on the layout of the file can get bogged // down and end up making multiple range requests per segment. // Because of this, we download first and then clean up at the end. -func Segment(sourceFilename string, outputManifestURL string, targetSegmentSize int64) error { +// +// The reencode parameter allows callers to force a re-encoding pass that inserts +// keyframes, which can be used as a fallback if simple "copy" based segmenting +// produces segments that don't start on a keyframe. +func Segment(sourceFilename string, outputManifestURL string, targetSegmentSize int64, reencode bool) error { // Do the segmenting, using the local file as source ffmpegErr := bytes.Buffer{} + + var outputArgs ffmpeg.KwArgs + if reencode { + // More expensive path that forces keyframes on a fixed cadence and + // resets timestamps; used as a fallback for problematic inputs. + outputArgs = ffmpeg.KwArgs{ + "c:v": "libx264", + "preset": "veryfast", + "sc_threshold": "0", + "force_key_frames": "expr:gte(t,n_forced*3)", + "c:a": "aac", + "f": "segment", + "segment_list": outputManifestURL, + "segment_list_type": "m3u8", + "segment_format": "mpegts", + "segment_time": targetSegmentSize, + "min_seg_duration": "2", + "reset_timestamps": "1", + } + } else { + // Faster path that keeps the original encoding and simply remuxes to TS. + outputArgs = ffmpeg.KwArgs{ + "c:a": "aac", + "c:v": "copy", + "f": "segment", + "segment_list": outputManifestURL, + "segment_list_type": "m3u8", + "segment_format": "mpegts", + "segment_time": targetSegmentSize, + "min_seg_duration": "2", + } + } + err := ffmpeg.Input(sourceFilename). Output( strings.Replace(outputManifestURL, ".m3u8", "", 1)+"%d.ts", - ffmpeg.KwArgs{ - "c:a": "aac", - "c:v": "copy", - "f": "segment", - "segment_list": outputManifestURL, - "segment_list_type": "m3u8", - "segment_format": "mpegts", - "segment_time": targetSegmentSize, - "min_seg_duration": "2", - }, + outputArgs, ).OverWriteOutput().WithErrorOutput(&ffmpegErr).Run() + if err != nil { return fmt.Errorf("failed to segment source file (%s) [%s]: %s", sourceFilename, ffmpegErr.String(), err) }