Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions transcoder/src/audiostream.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,8 +24,8 @@ func (t *Transcoder) NewAudioStream(file *FileStream, idx uint32) (*AudioStream,
return ret, nil
}

func (as *AudioStream) getOutPath(encoder_id int) string {
return fmt.Sprintf("%s/segment-a%d-%d-%%d.ts", as.file.Out, as.index, encoder_id)
func (as *AudioStream) getIdentifier() string {
return fmt.Sprintf("a%d", as.index)
}

func (as *AudioStream) getFlags() Flags {
Expand Down
10 changes: 5 additions & 5 deletions transcoder/src/filestream.go
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,7 @@ func (fs *FileStream) Destroy() {
}

func (fs *FileStream) GetMaster() string {
master := "#EXTM3U\n"
master := "#EXTM3U"

// TODO: support multiples audio qualities (and original)
for _, audio := range fs.Info.Audios {
Expand All @@ -86,7 +86,7 @@ func (fs *FileStream) GetMaster() string {
master += "DEFAULT=YES,"
}
master += "CHANNELS=\"2\","
master += fmt.Sprintf("URI=\"./audio/%d/index.m3u8\"\n", audio.Index)
master += fmt.Sprintf("URI=\"audio/%d/index.m3u8\"\n", audio.Index)
}
master += "\n"

Expand Down Expand Up @@ -131,7 +131,7 @@ func (fs *FileStream) GetMaster() string {
if video == *def_video {
master += "DEFAULT=YES\n"
} else {
master += fmt.Sprintf("URI=\"./%d/%s/index.m3u8\"\n", video.Index, quality)
master += fmt.Sprintf("URI=\"%d/%s/index.m3u8\"\n", video.Index, quality)
}
}
}
Expand All @@ -149,7 +149,7 @@ func (fs *FileStream) GetMaster() string {
}
master += "AUDIO=\"audio\","
master += "CLOSED-CAPTIONS=NONE\n"
master += fmt.Sprintf("./%d/%s/index.m3u8\n", def_video.Index, Original)
master += fmt.Sprintf("%d/%s/index.m3u8\n", def_video.Index, Original)
}

aspectRatio := float32(def_video.Width) / float32(def_video.Height)
Expand All @@ -167,7 +167,7 @@ func (fs *FileStream) GetMaster() string {
master += fmt.Sprintf("CODECS=\"%s\",", strings.Join([]string{transmux_codec, audio_codec}, ","))
master += "AUDIO=\"audio\","
master += "CLOSED-CAPTIONS=NONE\n"
master += fmt.Sprintf("./%d/%s/index.m3u8\n", def_video.Index, quality)
master += fmt.Sprintf("%d/%s/index.m3u8\n", def_video.Index, quality)
}
}

Expand Down
36 changes: 27 additions & 9 deletions transcoder/src/keyframes.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import (
"bufio"
"fmt"
"log"
"math"
"os/exec"
"strconv"
"strings"
Expand All @@ -12,7 +13,10 @@ import (
"github.com/lib/pq"
)

const KeyframeVersion = 1
const KeyframeVersion = 2

// In seconds, the spec recomands 6 but since we don't control keyframes we go over more often than not.
const OptimalFragmentDuration = float64(5)

type Keyframe struct {
Keyframes []float64
Expand All @@ -37,7 +41,12 @@ func (kf *Keyframe) Slice(start int32, end int32) []float64 {
}
kf.info.mutex.RLock()
defer kf.info.mutex.RUnlock()

ref := kf.Keyframes[start:end]
if kf.IsDone {
return ref
}
// make a copy since we will continue to mutate the array.
ret := make([]float64, end-start)
copy(ret, ref)
return ret
Expand Down Expand Up @@ -168,6 +177,7 @@ func getVideoKeyframes(path string, video_idx uint32, kf *Keyframe) error {

ret := make([]float64, 0, 1000)
limit := 100
last_frame := math.Inf(-1)
done := 0
// sometimes, videos can start at a timing greater than 0:00. We need to take that into account
// and only list keyframes that come after the start of the video (without that, our segments count
Expand All @@ -176,6 +186,9 @@ func getVideoKeyframes(path string, video_idx uint32, kf *Keyframe) error {
// We can't hardcode the first keyframe at 0 because the transcoder needs to reference durations of segments
// To handle this edge case, when we fetch the segment n0, no seeking is done but duration is computed from the
// first keyframe (instead of 0)
//
// since we switched to -f hls, duration is not computed anymore so we could hardcode the first timestamp at 0 but
// not changing it is easier.
for scanner.Scan() {
frame := scanner.Text()
if frame == "" {
Expand All @@ -199,12 +212,18 @@ func getVideoKeyframes(path string, video_idx uint32, kf *Keyframe) error {
return err
}

// Before, we wanted to only save keyframes with at least 3s betweens
// to prevent segments of 0.2s but sometimes, the -f segment muxer discards
// the segment time and decide to cut at a random keyframe. Having every keyframe
// handled as a segment prevents that.
// The -f hls encoder will cut at the next keyframe found every X seconds.
// To be sure there is no mismatch between what -f hls outputs and what we think segments are, we need to make
// sure every keyframes is at least `OptimalFragmentDuration` away from the next.
//
// Note that this handling was not possible with the -f segment muxer, since if sometimes doesn't care about the
// -segment_times option and outputed segments of 0.2s. With -f segment, we needed to cut at every keyframe.
if fpts < (last_frame + OptimalFragmentDuration) {
continue
}

ret = append(ret, fpts)
last_frame = fpts

if len(ret) == limit {
kf.add(ret)
Expand All @@ -226,13 +245,12 @@ func getVideoKeyframes(path string, video_idx uint32, kf *Keyframe) error {
return nil
}

// we can pretty much cut audio at any point so no need to get specific frames, just cut every 4s
// we can pretty much cut audio at any point so no need to get specific frames, just cut every `OptimalFragmentDuration` seconds
func getAudioKeyframes(info *MediaInfo, audio_idx uint32, kf *Keyframe) error {
dummyKeyframeDuration := float64(4)
segmentCount := int((float64(info.Duration) / dummyKeyframeDuration) + 1)
segmentCount := int((float64(info.Duration) / OptimalFragmentDuration) + 1)
kf.Keyframes = make([]float64, segmentCount)
for segmentIndex := 0; segmentIndex < segmentCount; segmentIndex += 1 {
kf.Keyframes[segmentIndex] = float64(segmentIndex) * dummyKeyframeDuration
kf.Keyframes[segmentIndex] = float64(segmentIndex) * OptimalFragmentDuration
}

kf.IsDone = true
Expand Down
4 changes: 2 additions & 2 deletions transcoder/src/metadata.go
Original file line number Diff line number Diff line change
Expand Up @@ -83,8 +83,8 @@ func (s *MetadataService) GetMetadata(path string, sha string) (*MediaInfo, erro
if err != nil {
return nil, err
}
tx.Exec(`update videos set keyframes = nil where sha = $1`, sha)
tx.Exec(`update audios set keyframes = nil where sha = $1`, sha)
tx.Exec(`update videos set keyframes = null where sha = $1`, sha)
tx.Exec(`update audios set keyframes = null where sha = $1`, sha)
tx.Exec(`update info set ver_keyframes = 0 where sha = $1`, sha)
err = tx.Commit()
if err != nil {
Expand Down
75 changes: 46 additions & 29 deletions transcoder/src/stream.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,6 @@ import (
"math"
"os"
"os/exec"
"path/filepath"
"slices"
"strings"
"sync"
Expand All @@ -23,9 +22,12 @@ const (
Transmux Flags = 1 << 3
)

// First %d is encoder_id, second %d is segment number (escaped for ffmpeg)
const SegmentNameFormat = "%d-segment-%%d.ts"

type StreamHandle interface {
getTranscodeArgs(segments string) []string
getOutPath(encoder_id int) string
getIdentifier() string
getFlags() Flags
}

Expand Down Expand Up @@ -200,8 +202,8 @@ func (ts *Stream) run(start int32) error {
segments = []float64{9999999}
}

outpath := ts.handle.getOutPath(encoder_id)
err := os.MkdirAll(filepath.Dir(outpath), 0o755)
outpath := fmt.Sprintf("%s/%s", ts.file.Out, ts.handle.getIdentifier())
err := os.MkdirAll(outpath, 0o755)
if err != nil {
return err
}
Expand Down Expand Up @@ -253,23 +255,24 @@ func (ts *Stream) run(start int32) error {
)
args = append(args, ts.handle.getTranscodeArgs(toSegmentStr(segments))...)
args = append(args,
"-f", "segment",
// needed for rounding issues when forcing keyframes
// recommended value is 1/(2*frame_rate), which for a 24fps is ~0.021
// we take a little bit more than that to be extra safe but too much can be harmfull
// when segments are short (can make the video repeat itself)
"-segment_time_delta", "0.05",
"-segment_format", "mpegts",
"-segment_times", toSegmentStr(Map(segments, func(seg float64, _ int) float64 {
// segment_times want durations, not timestamps so we must substract the -ss param
// since we give a greater value to -ss to prevent wrong seeks but -segment_times
// needs precise segments, we use the keyframe we want to seek to as a reference.
return seg - ts.keyframes.Get(start_segment)
})),
"-segment_list_type", "flat",
"-segment_list", "pipe:1",
"-segment_start_number", fmt.Sprint(start_segment),
outpath,
"-f", "hls",
// we can't list cut times w/ hls but
// - -hls_time will be cut on the next key frame after the time has passed
// - we specify keyframes in transcode with -force_key_frames
// - we know keyframes time of the transmux stream
// to unsure we don't have issues, the keyframe retriver needs to ignore
// sequentials keyframes closer than OptimalFragmentDuration.
//
// audio is simpler since we always cut at OptimalFragmentDuration
"-hls_time", fmt.Sprint(OptimalFragmentDuration),
"-start_number", fmt.Sprint(start_segment),
"-hls_segment_type", "mpegts",
"-hls_segment_filename", fmt.Sprintf("%s/%s", outpath, fmt.Sprintf(SegmentNameFormat, encoder_id)),
// Make the playlist easier to parse in our program by only outputing 1 segment and no endlist marker
// anyways this list is only read once and we generate our own.
"-hls_list_size", "1",
"-hls_flags", "omit_endlist",
"-",
)

cmd := exec.Command("ffmpeg", args...)
Expand All @@ -292,12 +295,18 @@ func (ts *Stream) run(start int32) error {

go func() {
scanner := bufio.NewScanner(stdout)
format := filepath.Base(outpath)
format := fmt.Sprintf(SegmentNameFormat, encoder_id)
should_stop := false

for scanner.Scan() {
line := scanner.Text()
// ignore m3u8 infos, we only want to know when segments are ready.
if line[0] == '#' {
continue
}

var segment int32
_, _ = fmt.Sscanf(scanner.Text(), format, &segment)
_, _ = fmt.Sscanf(line, format, &segment)

if segment < start {
// This happen because we use -f segments for accurate cutting (since -ss is not)
Expand All @@ -310,7 +319,7 @@ func (ts *Stream) run(start int32) error {
if ts.isSegmentReady(segment) {
// the current segment is already marked at done so another process has already gone up to here.
cmd.Process.Signal(os.Interrupt)
log.Printf("Killing ffmpeg because segment %d is already ready", segment)
log.Printf("Killing ffmpeg %s-%d because segment %d is already ready", ts.handle.getIdentifier(), encoder_id, segment)
should_stop = true
} else {
ts.segments[segment].encoder = encoder_id
Expand All @@ -320,7 +329,7 @@ func (ts *Stream) run(start int32) error {
should_stop = true
} else if ts.isSegmentReady(segment + 1) {
cmd.Process.Signal(os.Interrupt)
log.Printf("Killing ffmpeg because next segment %d is ready", segment)
log.Printf("Killing ffmpeg %s-%d because next segment %d is ready", ts.handle.getIdentifier(), encoder_id, segment)
should_stop = true
}
}
Expand All @@ -340,11 +349,11 @@ func (ts *Stream) run(start int32) error {
go func() {
err := cmd.Wait()
if exiterr, ok := err.(*exec.ExitError); ok && exiterr.ExitCode() == 255 {
log.Printf("ffmpeg %d was killed by us", encoder_id)
log.Printf("ffmpeg %s-%d was killed by us", ts.handle.getIdentifier(), encoder_id)
} else if err != nil {
log.Printf("ffmpeg %d occured an error: %s: %s", encoder_id, err, stderr.String())
log.Printf("ffmpeg %s-%d occured an error: %s: %s", ts.handle.getIdentifier(), encoder_id, err, stderr.String())
} else {
log.Printf("ffmpeg %d finished successfully", encoder_id)
log.Printf("ffmpeg %s-%d finished successfully", ts.handle.getIdentifier(), encoder_id)
}

ts.lock.Lock()
Expand Down Expand Up @@ -420,7 +429,15 @@ func (ts *Stream) GetSegment(segment int32) (string, error) {
}
}
ts.prerareNextSegements(segment)
return fmt.Sprintf(ts.handle.getOutPath(ts.segments[segment].encoder), segment), nil
return fmt.Sprintf(
"%s/%s/%s",
ts.file.Out,
ts.handle.getIdentifier(),
fmt.Sprintf(
fmt.Sprintf(SegmentNameFormat, ts.segments[segment].encoder),
segment,
),
), nil
}

func (ts *Stream) prerareNextSegements(segment int32) {
Expand Down
4 changes: 2 additions & 2 deletions transcoder/src/videostream.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,8 +44,8 @@ func (vs *VideoStream) getFlags() Flags {
return VideoF
}

func (vs *VideoStream) getOutPath(encoder_id int) string {
return fmt.Sprintf("%s/segment-%s-%d-%%d.ts", vs.file.Out, vs.quality, encoder_id)
func (vs *VideoStream) getIdentifier() string {
return fmt.Sprintf("v%d-%s", vs.video.Index, vs.quality)
}

func closestMultiple(n int32, x int32) int32 {
Expand Down