Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
18 changes: 16 additions & 2 deletions .github/workflows/test.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -63,9 +63,16 @@ jobs:
- name: Install dependencies
uses: awalsh128/cache-apt-pkgs-action@latest
with:
packages: ffmpeg
packages: ffmpeg libblas3 liblapack3
version: 1.0

- name: Fix BLAS library symlinks
run: |
# Create symlinks so libblas.so.3 is in a standard search path
sudo ln -sf /usr/lib/x86_64-linux-gnu/blas/libblas.so.3 /usr/lib/x86_64-linux-gnu/libblas.so.3
sudo ln -sf /usr/lib/x86_64-linux-gnu/lapack/liblapack.so.3 /usr/lib/x86_64-linux-gnu/liblapack.so.3
sudo ldconfig

- name: go fmt
run: |
go fmt ./...
Expand Down Expand Up @@ -110,9 +117,16 @@ jobs:
- name: Install dependencies
uses: awalsh128/cache-apt-pkgs-action@latest
with:
packages: ffmpeg
packages: ffmpeg libblas3 liblapack3
version: 1.0

- name: Fix BLAS library symlinks
run: |
# Create symlinks so libblas.so.3 is in a standard search path
sudo ln -sf /usr/lib/x86_64-linux-gnu/blas/libblas.so.3 /usr/lib/x86_64-linux-gnu/libblas.so.3
sudo ln -sf /usr/lib/x86_64-linux-gnu/lapack/liblapack.so.3 /usr/lib/x86_64-linux-gnu/liblapack.so.3
sudo ldconfig

- name: Run cucumber tests
run: |
go install github.com/cucumber/godog/cmd/godog@latest
Expand Down
2 changes: 1 addition & 1 deletion metrics/metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,7 @@ type CatalystAPIMetrics struct {
AnalyticsMetrics AnalyticsMetrics
}

var vodLabels = []string{"source_codec_video", "source_codec_audio", "pipeline", "catalyst_region", "num_profiles", "stage", "version", "is_fallback_mode", "is_livepeer_supported", "is_clip", "is_thumbs"}
var vodLabels = []string{"source_codec_video", "source_codec_audio", "pipeline", "catalyst_region", "num_profiles", "stage", "version", "is_fallback_mode", "is_livepeer_supported", "is_clip", "is_thumbs", "is_reencode"}

func NewMetrics() *CatalystAPIMetrics {
m := &CatalystAPIMetrics{
Expand Down
2 changes: 2 additions & 0 deletions pipeline/coordinator.go
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,7 @@ type UploadJobPayload struct {
FragMp4TargetURL *url.URL
ClipTargetURL *url.URL
ThumbnailsTargetURL *url.URL
ReencodeSegmentation bool
Mp4OnlyShort bool
AccessToken string
TranscodeAPIUrl string
Expand Down Expand Up @@ -639,6 +640,7 @@ func (c *Coordinator) finishJob(job *JobInfo, out *HandlerOutput, err error) {
strconv.FormatBool(job.LivepeerSupported),
strconv.FormatBool(job.ClipStrategy.Enabled),
strconv.FormatBool(job.ThumbnailsTargetURL != nil),
strconv.FormatBool(job.ReencodeSegmentation),
}

metrics.Metrics.VODPipelineMetrics.Count.
Expand Down
4 changes: 4 additions & 0 deletions pipeline/coordinator_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -556,6 +556,10 @@ type stubFFprobe struct {
Err error
}

func (f stubFFprobe) CheckFirstFrame(url string) (string, error) {
return "I", nil
}

func (f stubFFprobe) ProbeFile(_, _ string, _ ...string) (video.InputVideo, error) {
if f.Err != nil {
return video.InputVideo{}, f.Err
Expand Down
50 changes: 44 additions & 6 deletions pipeline/ffmpeg.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package pipeline

import (
"context"
"errors"
"fmt"
"net/url"
"os"
Expand All @@ -23,6 +24,10 @@ import (

const LocalSourceFilePattern = "sourcevideo*.mp4"

// ErrKeyframe indicates that a probed segment did not start with a keyframe and
// requires re-segmenting with different parameters.
var ErrKeyframe = errors.New("keyframe error")

type ffmpeg struct {
// The base of where to output source segments to
SourceOutputURL *url.URL
Expand All @@ -45,7 +50,24 @@ func (f *ffmpeg) Name() string {
}

func (f *ffmpeg) HandleStartUploadJob(job *JobInfo) (*HandlerOutput, error) {
// First attempt: try cheap "copy" based segmenting.
out, err := f.handleStartUploadJob(job, false)
if err != nil && errors.Is(err, ErrKeyframe) {
// If we hit a keyframe error when probing source segments, re-run the
// whole pipeline from the top but with a more expensive segmentation
// mode that re-encodes and forces keyframes.
log.Log(job.RequestID, "keyframe error while probing source segments, retrying with re-encoding segmentation")
return f.handleStartUploadJob(job, true)
}
return out, err
}

// handleStartUploadJob contains the core logic of the ffmpeg pipeline. The
// reencodeSegmentation flag controls whether we use a cheap "copy" based
// segmenting pass or a more expensive re-encoding pass that forces keyframes.
func (f *ffmpeg) handleStartUploadJob(job *JobInfo, reencodeSegmentation bool) (*HandlerOutput, error) {
log.Log(job.RequestID, "Handling job via FFMPEG/Livepeer pipeline")
job.ReencodeSegmentation = reencodeSegmentation

sourceOutputURL := f.SourceOutputURL.JoinPath(job.RequestID)
segmentingTargetURL := sourceOutputURL.JoinPath(config.SEGMENTING_SUBDIR, config.SEGMENTING_TARGET_MANIFEST)
Expand All @@ -58,7 +80,7 @@ func (f *ffmpeg) HandleStartUploadJob(job *JobInfo) (*HandlerOutput, error) {
var localSourceTmp string
if job.InputFileInfo.Format != "hls" {
var err error
localSourceTmp, err = copyFileToLocalTmpAndSegment(job)
localSourceTmp, err = copyFileToLocalTmpAndSegment(job, reencodeSegmentation)
if err != nil {
return nil, err
}
Expand Down Expand Up @@ -287,15 +309,15 @@ func (f *ffmpeg) probeSourceSegments(job *JobInfo, sourceSegments []*m3u8.MediaS
return nil
}
segCount := len(sourceSegments)
if segCount < 4 {
if segCount < 6 {
for _, segment := range sourceSegments {
if err := f.probeSourceSegment(job.RequestID, segment, job.SegmentingTargetURL); err != nil {
return err
}
}
return nil
}
segmentsToCheck := []int{0, 1, segCount - 2, segCount - 1}
segmentsToCheck := []int{0, 1, 2, 3, segCount - 2, segCount - 1}
for _, i := range segmentsToCheck {
if err := f.probeSourceSegment(job.RequestID, sourceSegments[i], job.SegmentingTargetURL); err != nil {
return err
Expand All @@ -313,6 +335,22 @@ func (f *ffmpeg) probeSourceSegment(requestID string, seg *m3u8.MediaSegment, so
if err != nil {
return fmt.Errorf("failed to create signed url for %s: %w", u.Redacted(), err)
}

// check that the segment starts with a keyframe
if err := backoff.Retry(func() error {
output, err := f.probe.CheckFirstFrame(probeURL)
if err != nil {
return fmt.Errorf("failed to check segment starts with keyframe: %w", err)
}
// ffprobe should print I for i-frame
if !strings.HasPrefix(output, "I") || strings.Contains(output, "non-existing PPS") {
return fmt.Errorf("segment does not start with keyframe: %w", ErrKeyframe)
}
return nil
}, retries(6)); err != nil {
return err
}

if err := backoff.Retry(func() error {
_, err = f.probe.ProbeFile(requestID, probeURL)
if err != nil {
Expand All @@ -335,7 +373,7 @@ func (f *ffmpeg) probeSourceSegment(requestID string, seg *m3u8.MediaSegment, so
return nil
}

func copyFileToLocalTmpAndSegment(job *JobInfo) (string, error) {
func copyFileToLocalTmpAndSegment(job *JobInfo, reencodeSegmentation bool) (string, error) {
// Create a temporary local file to write to
localSourceFile, err := os.CreateTemp(os.TempDir(), LocalSourceFilePattern)
if err != nil {
Expand All @@ -358,7 +396,7 @@ func copyFileToLocalTmpAndSegment(job *JobInfo) (string, error) {
}

// Begin Segmenting
log.Log(job.RequestID, "Beginning segmenting via FFMPEG/Livepeer pipeline")
log.Log(job.RequestID, "Beginning segmenting via FFMPEG/Livepeer pipeline", "reencode", reencodeSegmentation)
job.ReportProgress(clients.TranscodeStatusPreparing, 0.5)

// FFMPEG fails when presented with a raw IP + Path type URL, so we prepend "http://" to it
Expand All @@ -368,7 +406,7 @@ func copyFileToLocalTmpAndSegment(job *JobInfo) (string, error) {
}

destinationURL := fmt.Sprintf("%s/api/ffmpeg/%s/index.m3u8", internalAddress, job.StreamName)
if err := video.Segment(localSourceFile.Name(), destinationURL, job.TargetSegmentSizeSecs); err != nil {
if err := video.Segment(localSourceFile.Name(), destinationURL, job.TargetSegmentSizeSecs, reencodeSegmentation); err != nil {
return "", err
}

Expand Down
8 changes: 6 additions & 2 deletions pipeline/ffmpeg_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -232,6 +232,10 @@ func (p *stubProbe) ProbeFile(requestID string, url string, ffProbeOptions ...st
return video.InputVideo{}, nil
}

func (p *stubProbe) CheckFirstFrame(url string) (string, error) {
return "I", nil
}

func Test_probeSegments(t *testing.T) {
probe := stubProbe{}
f := ffmpeg{
Expand All @@ -257,6 +261,6 @@ func Test_probeSegments(t *testing.T) {
require.Equal(t, []string{"/0.ts", "/0.ts", "/1.ts", "/1.ts", "/2.ts", "/2.ts", "/3.ts", "/3.ts"}, probe.probedUrls)

probe.probedUrls = []string{}
_ = f.probeSourceSegments(job, []*m3u8.MediaSegment{{URI: "0.ts"}, {URI: "1.ts"}, {URI: "2.ts"}, {URI: "3.ts"}, {URI: "4.ts"}, {URI: "5.ts"}})
require.Equal(t, []string{"/0.ts", "/0.ts", "/1.ts", "/1.ts", "/4.ts", "/4.ts", "/5.ts", "/5.ts"}, probe.probedUrls)
_ = f.probeSourceSegments(job, []*m3u8.MediaSegment{{URI: "0.ts"}, {URI: "1.ts"}, {URI: "2.ts"}, {URI: "3.ts"}, {URI: "4.ts"}, {URI: "5.ts"}, {URI: "6.ts"}})
require.Equal(t, []string{"/0.ts", "/0.ts", "/1.ts", "/1.ts", "/2.ts", "/2.ts", "/3.ts", "/3.ts", "/5.ts", "/5.ts", "/6.ts", "/6.ts"}, probe.probedUrls)
}
8 changes: 8 additions & 0 deletions video/probe.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import (
"context"
"errors"
"fmt"
"os/exec"
"strconv"
"strings"
"time"
Expand All @@ -20,6 +21,7 @@ var (

type Prober interface {
ProbeFile(requestID, url string, ffProbeOptions ...string) (InputVideo, error)
CheckFirstFrame(url string) (string, error)
}

type Probe struct {
Expand Down Expand Up @@ -288,3 +290,9 @@ func containsStr(slc []string, val string) bool {
}
return false
}

func (p Probe) CheckFirstFrame(url string) (string, error) {
cmd := exec.Command("ffprobe", "-v", "error", "-select_streams", "v:0", "-show_entries", "frame=pict_type", "-of", "csv=p=0", "-read_intervals", "%+#1", url)
output, err := cmd.CombinedOutput()
return string(output), err
}
53 changes: 41 additions & 12 deletions video/segment.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,28 +8,57 @@ import (
ffmpeg "github.com/u2takey/ffmpeg-go"
)

// Segment splits a source video URL into segments
// Segment splits a source video URL into segments.
//
// FFMPEG can use remote files, but depending on the layout of the file can get bogged
// down and end up making multiple range requests per segment.
// Because of this, we download first and then clean up at the end.
func Segment(sourceFilename string, outputManifestURL string, targetSegmentSize int64) error {
//
// The reencode parameter allows callers to force a re-encoding pass that inserts
// keyframes, which can be used as a fallback if simple "copy" based segmenting
// produces segments that don't start on a keyframe.
func Segment(sourceFilename string, outputManifestURL string, targetSegmentSize int64, reencode bool) error {
// Do the segmenting, using the local file as source
ffmpegErr := bytes.Buffer{}

var outputArgs ffmpeg.KwArgs
if reencode {
// More expensive path that forces keyframes on a fixed cadence and
// resets timestamps; used as a fallback for problematic inputs.
outputArgs = ffmpeg.KwArgs{
"c:v": "libx264",
"preset": "veryfast",
"sc_threshold": "0",
"force_key_frames": "expr:gte(t,n_forced*3)",
"c:a": "aac",
"f": "segment",
"segment_list": outputManifestURL,
"segment_list_type": "m3u8",
"segment_format": "mpegts",
"segment_time": targetSegmentSize,
"min_seg_duration": "2",
"reset_timestamps": "1",
}
} else {
// Faster path that keeps the original encoding and simply remuxes to TS.
outputArgs = ffmpeg.KwArgs{
"c:a": "aac",
"c:v": "copy",
"f": "segment",
"segment_list": outputManifestURL,
"segment_list_type": "m3u8",
"segment_format": "mpegts",
"segment_time": targetSegmentSize,
"min_seg_duration": "2",
}
}

err := ffmpeg.Input(sourceFilename).
Output(
strings.Replace(outputManifestURL, ".m3u8", "", 1)+"%d.ts",
ffmpeg.KwArgs{
"c:a": "aac",
"c:v": "copy",
"f": "segment",
"segment_list": outputManifestURL,
"segment_list_type": "m3u8",
"segment_format": "mpegts",
"segment_time": targetSegmentSize,
"min_seg_duration": "2",
},
outputArgs,
).OverWriteOutput().WithErrorOutput(&ffmpegErr).Run()

if err != nil {
return fmt.Errorf("failed to segment source file (%s) [%s]: %s", sourceFilename, ffmpegErr.String(), err)
}
Expand Down
Loading