Skip to content

Commit f1738b4

Browse files
authored
Re-encode during segmenting when needed (#1406)
* Switch to an encode command for segmenting This ensures that we have a keyframe at the beginning of every segment * Conditional re-encode * fix test * try to fix tests * Add a metric tag for tracking * try again
1 parent f16c9e7 commit f1738b4

File tree

8 files changed

+122
-23
lines changed

8 files changed

+122
-23
lines changed

.github/workflows/test.yaml

Lines changed: 16 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -63,9 +63,16 @@ jobs:
6363
- name: Install dependencies
6464
uses: awalsh128/cache-apt-pkgs-action@latest
6565
with:
66-
packages: ffmpeg
66+
packages: ffmpeg libblas3 liblapack3
6767
version: 1.0
6868

69+
- name: Fix BLAS library symlinks
70+
run: |
71+
# Create symlinks so libblas.so.3 is in a standard search path
72+
sudo ln -sf /usr/lib/x86_64-linux-gnu/blas/libblas.so.3 /usr/lib/x86_64-linux-gnu/libblas.so.3
73+
sudo ln -sf /usr/lib/x86_64-linux-gnu/lapack/liblapack.so.3 /usr/lib/x86_64-linux-gnu/liblapack.so.3
74+
sudo ldconfig
75+
6976
- name: go fmt
7077
run: |
7178
go fmt ./...
@@ -110,9 +117,16 @@ jobs:
110117
- name: Install dependencies
111118
uses: awalsh128/cache-apt-pkgs-action@latest
112119
with:
113-
packages: ffmpeg
120+
packages: ffmpeg libblas3 liblapack3
114121
version: 1.0
115122

123+
- name: Fix BLAS library symlinks
124+
run: |
125+
# Create symlinks so libblas.so.3 is in a standard search path
126+
sudo ln -sf /usr/lib/x86_64-linux-gnu/blas/libblas.so.3 /usr/lib/x86_64-linux-gnu/libblas.so.3
127+
sudo ln -sf /usr/lib/x86_64-linux-gnu/lapack/liblapack.so.3 /usr/lib/x86_64-linux-gnu/liblapack.so.3
128+
sudo ldconfig
129+
116130
- name: Run cucumber tests
117131
run: |
118132
go install github.com/cucumber/godog/cmd/godog@latest

metrics/metrics.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -63,7 +63,7 @@ type CatalystAPIMetrics struct {
6363
AnalyticsMetrics AnalyticsMetrics
6464
}
6565

66-
var vodLabels = []string{"source_codec_video", "source_codec_audio", "pipeline", "catalyst_region", "num_profiles", "stage", "version", "is_fallback_mode", "is_livepeer_supported", "is_clip", "is_thumbs"}
66+
var vodLabels = []string{"source_codec_video", "source_codec_audio", "pipeline", "catalyst_region", "num_profiles", "stage", "version", "is_fallback_mode", "is_livepeer_supported", "is_clip", "is_thumbs", "is_reencode"}
6767

6868
func NewMetrics() *CatalystAPIMetrics {
6969
m := &CatalystAPIMetrics{

pipeline/coordinator.go

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -64,6 +64,7 @@ type UploadJobPayload struct {
6464
FragMp4TargetURL *url.URL
6565
ClipTargetURL *url.URL
6666
ThumbnailsTargetURL *url.URL
67+
ReencodeSegmentation bool
6768
Mp4OnlyShort bool
6869
AccessToken string
6970
TranscodeAPIUrl string
@@ -639,6 +640,7 @@ func (c *Coordinator) finishJob(job *JobInfo, out *HandlerOutput, err error) {
639640
strconv.FormatBool(job.LivepeerSupported),
640641
strconv.FormatBool(job.ClipStrategy.Enabled),
641642
strconv.FormatBool(job.ThumbnailsTargetURL != nil),
643+
strconv.FormatBool(job.ReencodeSegmentation),
642644
}
643645

644646
metrics.Metrics.VODPipelineMetrics.Count.

pipeline/coordinator_test.go

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -556,6 +556,10 @@ type stubFFprobe struct {
556556
Err error
557557
}
558558

559+
func (f stubFFprobe) CheckFirstFrame(url string) (string, error) {
560+
return "I", nil
561+
}
562+
559563
func (f stubFFprobe) ProbeFile(_, _ string, _ ...string) (video.InputVideo, error) {
560564
if f.Err != nil {
561565
return video.InputVideo{}, f.Err

pipeline/ffmpeg.go

Lines changed: 44 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@ package pipeline
22

33
import (
44
"context"
5+
"errors"
56
"fmt"
67
"net/url"
78
"os"
@@ -23,6 +24,10 @@ import (
2324

2425
const LocalSourceFilePattern = "sourcevideo*.mp4"
2526

27+
// ErrKeyframe indicates that a probed segment did not start with a keyframe and
28+
// requires re-segmenting with different parameters.
29+
var ErrKeyframe = errors.New("keyframe error")
30+
2631
type ffmpeg struct {
2732
// The base of where to output source segments to
2833
SourceOutputURL *url.URL
@@ -45,7 +50,24 @@ func (f *ffmpeg) Name() string {
4550
}
4651

4752
func (f *ffmpeg) HandleStartUploadJob(job *JobInfo) (*HandlerOutput, error) {
53+
// First attempt: try cheap "copy" based segmenting.
54+
out, err := f.handleStartUploadJob(job, false)
55+
if err != nil && errors.Is(err, ErrKeyframe) {
56+
// If we hit a keyframe error when probing source segments, re-run the
57+
// whole pipeline from the top but with a more expensive segmentation
58+
// mode that re-encodes and forces keyframes.
59+
log.Log(job.RequestID, "keyframe error while probing source segments, retrying with re-encoding segmentation")
60+
return f.handleStartUploadJob(job, true)
61+
}
62+
return out, err
63+
}
64+
65+
// handleStartUploadJob contains the core logic of the ffmpeg pipeline. The
66+
// reencodeSegmentation flag controls whether we use a cheap "copy" based
67+
// segmenting pass or a more expensive re-encoding pass that forces keyframes.
68+
func (f *ffmpeg) handleStartUploadJob(job *JobInfo, reencodeSegmentation bool) (*HandlerOutput, error) {
4869
log.Log(job.RequestID, "Handling job via FFMPEG/Livepeer pipeline")
70+
job.ReencodeSegmentation = reencodeSegmentation
4971

5072
sourceOutputURL := f.SourceOutputURL.JoinPath(job.RequestID)
5173
segmentingTargetURL := sourceOutputURL.JoinPath(config.SEGMENTING_SUBDIR, config.SEGMENTING_TARGET_MANIFEST)
@@ -58,7 +80,7 @@ func (f *ffmpeg) HandleStartUploadJob(job *JobInfo) (*HandlerOutput, error) {
5880
var localSourceTmp string
5981
if job.InputFileInfo.Format != "hls" {
6082
var err error
61-
localSourceTmp, err = copyFileToLocalTmpAndSegment(job)
83+
localSourceTmp, err = copyFileToLocalTmpAndSegment(job, reencodeSegmentation)
6284
if err != nil {
6385
return nil, err
6486
}
@@ -287,15 +309,15 @@ func (f *ffmpeg) probeSourceSegments(job *JobInfo, sourceSegments []*m3u8.MediaS
287309
return nil
288310
}
289311
segCount := len(sourceSegments)
290-
if segCount < 4 {
312+
if segCount < 6 {
291313
for _, segment := range sourceSegments {
292314
if err := f.probeSourceSegment(job.RequestID, segment, job.SegmentingTargetURL); err != nil {
293315
return err
294316
}
295317
}
296318
return nil
297319
}
298-
segmentsToCheck := []int{0, 1, segCount - 2, segCount - 1}
320+
segmentsToCheck := []int{0, 1, 2, 3, segCount - 2, segCount - 1}
299321
for _, i := range segmentsToCheck {
300322
if err := f.probeSourceSegment(job.RequestID, sourceSegments[i], job.SegmentingTargetURL); err != nil {
301323
return err
@@ -313,6 +335,22 @@ func (f *ffmpeg) probeSourceSegment(requestID string, seg *m3u8.MediaSegment, so
313335
if err != nil {
314336
return fmt.Errorf("failed to create signed url for %s: %w", u.Redacted(), err)
315337
}
338+
339+
// check that the segment starts with a keyframe
340+
if err := backoff.Retry(func() error {
341+
output, err := f.probe.CheckFirstFrame(probeURL)
342+
if err != nil {
343+
return fmt.Errorf("failed to check segment starts with keyframe: %w", err)
344+
}
345+
// ffprobe should print I for i-frame
346+
if !strings.HasPrefix(output, "I") || strings.Contains(output, "non-existing PPS") {
347+
return fmt.Errorf("segment does not start with keyframe: %w", ErrKeyframe)
348+
}
349+
return nil
350+
}, retries(6)); err != nil {
351+
return err
352+
}
353+
316354
if err := backoff.Retry(func() error {
317355
_, err = f.probe.ProbeFile(requestID, probeURL)
318356
if err != nil {
@@ -335,7 +373,7 @@ func (f *ffmpeg) probeSourceSegment(requestID string, seg *m3u8.MediaSegment, so
335373
return nil
336374
}
337375

338-
func copyFileToLocalTmpAndSegment(job *JobInfo) (string, error) {
376+
func copyFileToLocalTmpAndSegment(job *JobInfo, reencodeSegmentation bool) (string, error) {
339377
// Create a temporary local file to write to
340378
localSourceFile, err := os.CreateTemp(os.TempDir(), LocalSourceFilePattern)
341379
if err != nil {
@@ -358,7 +396,7 @@ func copyFileToLocalTmpAndSegment(job *JobInfo) (string, error) {
358396
}
359397

360398
// Begin Segmenting
361-
log.Log(job.RequestID, "Beginning segmenting via FFMPEG/Livepeer pipeline")
399+
log.Log(job.RequestID, "Beginning segmenting via FFMPEG/Livepeer pipeline", "reencode", reencodeSegmentation)
362400
job.ReportProgress(clients.TranscodeStatusPreparing, 0.5)
363401

364402
// FFMPEG fails when presented with a raw IP + Path type URL, so we prepend "http://" to it
@@ -368,7 +406,7 @@ func copyFileToLocalTmpAndSegment(job *JobInfo) (string, error) {
368406
}
369407

370408
destinationURL := fmt.Sprintf("%s/api/ffmpeg/%s/index.m3u8", internalAddress, job.StreamName)
371-
if err := video.Segment(localSourceFile.Name(), destinationURL, job.TargetSegmentSizeSecs); err != nil {
409+
if err := video.Segment(localSourceFile.Name(), destinationURL, job.TargetSegmentSizeSecs, reencodeSegmentation); err != nil {
372410
return "", err
373411
}
374412

pipeline/ffmpeg_test.go

Lines changed: 6 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -232,6 +232,10 @@ func (p *stubProbe) ProbeFile(requestID string, url string, ffProbeOptions ...st
232232
return video.InputVideo{}, nil
233233
}
234234

235+
func (p *stubProbe) CheckFirstFrame(url string) (string, error) {
236+
return "I", nil
237+
}
238+
235239
func Test_probeSegments(t *testing.T) {
236240
probe := stubProbe{}
237241
f := ffmpeg{
@@ -257,6 +261,6 @@ func Test_probeSegments(t *testing.T) {
257261
require.Equal(t, []string{"/0.ts", "/0.ts", "/1.ts", "/1.ts", "/2.ts", "/2.ts", "/3.ts", "/3.ts"}, probe.probedUrls)
258262

259263
probe.probedUrls = []string{}
260-
_ = f.probeSourceSegments(job, []*m3u8.MediaSegment{{URI: "0.ts"}, {URI: "1.ts"}, {URI: "2.ts"}, {URI: "3.ts"}, {URI: "4.ts"}, {URI: "5.ts"}})
261-
require.Equal(t, []string{"/0.ts", "/0.ts", "/1.ts", "/1.ts", "/4.ts", "/4.ts", "/5.ts", "/5.ts"}, probe.probedUrls)
264+
_ = f.probeSourceSegments(job, []*m3u8.MediaSegment{{URI: "0.ts"}, {URI: "1.ts"}, {URI: "2.ts"}, {URI: "3.ts"}, {URI: "4.ts"}, {URI: "5.ts"}, {URI: "6.ts"}})
265+
require.Equal(t, []string{"/0.ts", "/0.ts", "/1.ts", "/1.ts", "/2.ts", "/2.ts", "/3.ts", "/3.ts", "/5.ts", "/5.ts", "/6.ts", "/6.ts"}, probe.probedUrls)
262266
}

video/probe.go

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,7 @@ import (
44
"context"
55
"errors"
66
"fmt"
7+
"os/exec"
78
"strconv"
89
"strings"
910
"time"
@@ -20,6 +21,7 @@ var (
2021

2122
type Prober interface {
2223
ProbeFile(requestID, url string, ffProbeOptions ...string) (InputVideo, error)
24+
CheckFirstFrame(url string) (string, error)
2325
}
2426

2527
type Probe struct {
@@ -288,3 +290,9 @@ func containsStr(slc []string, val string) bool {
288290
}
289291
return false
290292
}
293+
294+
func (p Probe) CheckFirstFrame(url string) (string, error) {
295+
cmd := exec.Command("ffprobe", "-v", "error", "-select_streams", "v:0", "-show_entries", "frame=pict_type", "-of", "csv=p=0", "-read_intervals", "%+#1", url)
296+
output, err := cmd.CombinedOutput()
297+
return string(output), err
298+
}

video/segment.go

Lines changed: 41 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -8,28 +8,57 @@ import (
88
ffmpeg "github.com/u2takey/ffmpeg-go"
99
)
1010

11-
// Segment splits a source video URL into segments
11+
// Segment splits a source video URL into segments.
1212
//
1313
// FFMPEG can use remote files, but depending on the layout of the file can get bogged
1414
// down and end up making multiple range requests per segment.
1515
// Because of this, we download first and then clean up at the end.
16-
func Segment(sourceFilename string, outputManifestURL string, targetSegmentSize int64) error {
16+
//
17+
// The reencode parameter allows callers to force a re-encoding pass that inserts
18+
// keyframes, which can be used as a fallback if simple "copy" based segmenting
19+
// produces segments that don't start on a keyframe.
20+
func Segment(sourceFilename string, outputManifestURL string, targetSegmentSize int64, reencode bool) error {
1721
// Do the segmenting, using the local file as source
1822
ffmpegErr := bytes.Buffer{}
23+
24+
var outputArgs ffmpeg.KwArgs
25+
if reencode {
26+
// More expensive path that forces keyframes on a fixed cadence and
27+
// resets timestamps; used as a fallback for problematic inputs.
28+
outputArgs = ffmpeg.KwArgs{
29+
"c:v": "libx264",
30+
"preset": "veryfast",
31+
"sc_threshold": "0",
32+
"force_key_frames": "expr:gte(t,n_forced*3)",
33+
"c:a": "aac",
34+
"f": "segment",
35+
"segment_list": outputManifestURL,
36+
"segment_list_type": "m3u8",
37+
"segment_format": "mpegts",
38+
"segment_time": targetSegmentSize,
39+
"min_seg_duration": "2",
40+
"reset_timestamps": "1",
41+
}
42+
} else {
43+
// Faster path that keeps the original encoding and simply remuxes to TS.
44+
outputArgs = ffmpeg.KwArgs{
45+
"c:a": "aac",
46+
"c:v": "copy",
47+
"f": "segment",
48+
"segment_list": outputManifestURL,
49+
"segment_list_type": "m3u8",
50+
"segment_format": "mpegts",
51+
"segment_time": targetSegmentSize,
52+
"min_seg_duration": "2",
53+
}
54+
}
55+
1956
err := ffmpeg.Input(sourceFilename).
2057
Output(
2158
strings.Replace(outputManifestURL, ".m3u8", "", 1)+"%d.ts",
22-
ffmpeg.KwArgs{
23-
"c:a": "aac",
24-
"c:v": "copy",
25-
"f": "segment",
26-
"segment_list": outputManifestURL,
27-
"segment_list_type": "m3u8",
28-
"segment_format": "mpegts",
29-
"segment_time": targetSegmentSize,
30-
"min_seg_duration": "2",
31-
},
59+
outputArgs,
3260
).OverWriteOutput().WithErrorOutput(&ffmpegErr).Run()
61+
3362
if err != nil {
3463
return fmt.Errorf("failed to segment source file (%s) [%s]: %s", sourceFilename, ffmpegErr.String(), err)
3564
}

0 commit comments

Comments
 (0)