diff --git a/transcoder/src/audiostream.go b/transcoder/src/audiostream.go index 24f561a4bc..e5e6641793 100644 --- a/transcoder/src/audiostream.go +++ b/transcoder/src/audiostream.go @@ -24,8 +24,8 @@ func (t *Transcoder) NewAudioStream(file *FileStream, idx uint32) (*AudioStream, return ret, nil } -func (as *AudioStream) getOutPath(encoder_id int) string { - return fmt.Sprintf("%s/segment-a%d-%d-%%d.ts", as.file.Out, as.index, encoder_id) +func (as *AudioStream) getIdentifier() string { + return fmt.Sprintf("a%d", as.index) } func (as *AudioStream) getFlags() Flags { diff --git a/transcoder/src/filestream.go b/transcoder/src/filestream.go index 3386053f03..bfbd0744cd 100644 --- a/transcoder/src/filestream.go +++ b/transcoder/src/filestream.go @@ -66,7 +66,7 @@ func (fs *FileStream) Destroy() { } func (fs *FileStream) GetMaster() string { - master := "#EXTM3U\n" + master := "#EXTM3U" // TODO: support multiples audio qualities (and original) for _, audio := range fs.Info.Audios { @@ -86,7 +86,7 @@ func (fs *FileStream) GetMaster() string { master += "DEFAULT=YES," } master += "CHANNELS=\"2\"," - master += fmt.Sprintf("URI=\"./audio/%d/index.m3u8\"\n", audio.Index) + master += fmt.Sprintf("URI=\"audio/%d/index.m3u8\"\n", audio.Index) } master += "\n" @@ -131,7 +131,7 @@ func (fs *FileStream) GetMaster() string { if video == *def_video { master += "DEFAULT=YES\n" } else { - master += fmt.Sprintf("URI=\"./%d/%s/index.m3u8\"\n", video.Index, quality) + master += fmt.Sprintf("URI=\"%d/%s/index.m3u8\"\n", video.Index, quality) } } } @@ -149,7 +149,7 @@ func (fs *FileStream) GetMaster() string { } master += "AUDIO=\"audio\"," master += "CLOSED-CAPTIONS=NONE\n" - master += fmt.Sprintf("./%d/%s/index.m3u8\n", def_video.Index, Original) + master += fmt.Sprintf("%d/%s/index.m3u8\n", def_video.Index, Original) } aspectRatio := float32(def_video.Width) / float32(def_video.Height) @@ -167,7 +167,7 @@ func (fs *FileStream) GetMaster() string { master += fmt.Sprintf("CODECS=\"%s\",", strings.Join([]string{transmux_codec, audio_codec}, ",")) master += "AUDIO=\"audio\"," master += "CLOSED-CAPTIONS=NONE\n" - master += fmt.Sprintf("./%d/%s/index.m3u8\n", def_video.Index, quality) + master += fmt.Sprintf("%d/%s/index.m3u8\n", def_video.Index, quality) } } diff --git a/transcoder/src/keyframes.go b/transcoder/src/keyframes.go index f8d74378aa..e8503a0128 100644 --- a/transcoder/src/keyframes.go +++ b/transcoder/src/keyframes.go @@ -4,6 +4,7 @@ import ( "bufio" "fmt" "log" + "math" "os/exec" "strconv" "strings" @@ -12,7 +13,10 @@ import ( "github.com/lib/pq" ) -const KeyframeVersion = 1 +const KeyframeVersion = 2 + +// In seconds, the spec recomands 6 but since we don't control keyframes we go over more often than not. +const OptimalFragmentDuration = float64(5) type Keyframe struct { Keyframes []float64 @@ -37,7 +41,12 @@ func (kf *Keyframe) Slice(start int32, end int32) []float64 { } kf.info.mutex.RLock() defer kf.info.mutex.RUnlock() + ref := kf.Keyframes[start:end] + if kf.IsDone { + return ref + } + // make a copy since we will continue to mutate the array. ret := make([]float64, end-start) copy(ret, ref) return ret @@ -168,6 +177,7 @@ func getVideoKeyframes(path string, video_idx uint32, kf *Keyframe) error { ret := make([]float64, 0, 1000) limit := 100 + last_frame := math.Inf(-1) done := 0 // sometimes, videos can start at a timing greater than 0:00. We need to take that into account // and only list keyframes that come after the start of the video (without that, our segments count @@ -176,6 +186,9 @@ func getVideoKeyframes(path string, video_idx uint32, kf *Keyframe) error { // We can't hardcode the first keyframe at 0 because the transcoder needs to reference durations of segments // To handle this edge case, when we fetch the segment n0, no seeking is done but duration is computed from the // first keyframe (instead of 0) + // + // since we switched to -f hls, duration is not computed anymore so we could hardcode the first timestamp at 0 but + // not changing it is easier. for scanner.Scan() { frame := scanner.Text() if frame == "" { @@ -199,12 +212,18 @@ func getVideoKeyframes(path string, video_idx uint32, kf *Keyframe) error { return err } - // Before, we wanted to only save keyframes with at least 3s betweens - // to prevent segments of 0.2s but sometimes, the -f segment muxer discards - // the segment time and decide to cut at a random keyframe. Having every keyframe - // handled as a segment prevents that. + // The -f hls encoder will cut at the next keyframe found every X seconds. + // To be sure there is no mismatch between what -f hls outputs and what we think segments are, we need to make + // sure every keyframes is at least `OptimalFragmentDuration` away from the next. + // + // Note that this handling was not possible with the -f segment muxer, since if sometimes doesn't care about the + // -segment_times option and outputed segments of 0.2s. With -f segment, we needed to cut at every keyframe. + if fpts < (last_frame + OptimalFragmentDuration) { + continue + } ret = append(ret, fpts) + last_frame = fpts if len(ret) == limit { kf.add(ret) @@ -226,13 +245,12 @@ func getVideoKeyframes(path string, video_idx uint32, kf *Keyframe) error { return nil } -// we can pretty much cut audio at any point so no need to get specific frames, just cut every 4s +// we can pretty much cut audio at any point so no need to get specific frames, just cut every `OptimalFragmentDuration` seconds func getAudioKeyframes(info *MediaInfo, audio_idx uint32, kf *Keyframe) error { - dummyKeyframeDuration := float64(4) - segmentCount := int((float64(info.Duration) / dummyKeyframeDuration) + 1) + segmentCount := int((float64(info.Duration) / OptimalFragmentDuration) + 1) kf.Keyframes = make([]float64, segmentCount) for segmentIndex := 0; segmentIndex < segmentCount; segmentIndex += 1 { - kf.Keyframes[segmentIndex] = float64(segmentIndex) * dummyKeyframeDuration + kf.Keyframes[segmentIndex] = float64(segmentIndex) * OptimalFragmentDuration } kf.IsDone = true diff --git a/transcoder/src/metadata.go b/transcoder/src/metadata.go index d4c84e2c58..c1fd2edf0f 100644 --- a/transcoder/src/metadata.go +++ b/transcoder/src/metadata.go @@ -83,8 +83,8 @@ func (s *MetadataService) GetMetadata(path string, sha string) (*MediaInfo, erro if err != nil { return nil, err } - tx.Exec(`update videos set keyframes = nil where sha = $1`, sha) - tx.Exec(`update audios set keyframes = nil where sha = $1`, sha) + tx.Exec(`update videos set keyframes = null where sha = $1`, sha) + tx.Exec(`update audios set keyframes = null where sha = $1`, sha) tx.Exec(`update info set ver_keyframes = 0 where sha = $1`, sha) err = tx.Commit() if err != nil { diff --git a/transcoder/src/stream.go b/transcoder/src/stream.go index 3692e3ecce..918d96744f 100644 --- a/transcoder/src/stream.go +++ b/transcoder/src/stream.go @@ -8,7 +8,6 @@ import ( "math" "os" "os/exec" - "path/filepath" "slices" "strings" "sync" @@ -23,9 +22,12 @@ const ( Transmux Flags = 1 << 3 ) +// First %d is encoder_id, second %d is segment number (escaped for ffmpeg) +const SegmentNameFormat = "%d-segment-%%d.ts" + type StreamHandle interface { getTranscodeArgs(segments string) []string - getOutPath(encoder_id int) string + getIdentifier() string getFlags() Flags } @@ -200,8 +202,8 @@ func (ts *Stream) run(start int32) error { segments = []float64{9999999} } - outpath := ts.handle.getOutPath(encoder_id) - err := os.MkdirAll(filepath.Dir(outpath), 0o755) + outpath := fmt.Sprintf("%s/%s", ts.file.Out, ts.handle.getIdentifier()) + err := os.MkdirAll(outpath, 0o755) if err != nil { return err } @@ -253,23 +255,24 @@ func (ts *Stream) run(start int32) error { ) args = append(args, ts.handle.getTranscodeArgs(toSegmentStr(segments))...) args = append(args, - "-f", "segment", - // needed for rounding issues when forcing keyframes - // recommended value is 1/(2*frame_rate), which for a 24fps is ~0.021 - // we take a little bit more than that to be extra safe but too much can be harmfull - // when segments are short (can make the video repeat itself) - "-segment_time_delta", "0.05", - "-segment_format", "mpegts", - "-segment_times", toSegmentStr(Map(segments, func(seg float64, _ int) float64 { - // segment_times want durations, not timestamps so we must substract the -ss param - // since we give a greater value to -ss to prevent wrong seeks but -segment_times - // needs precise segments, we use the keyframe we want to seek to as a reference. - return seg - ts.keyframes.Get(start_segment) - })), - "-segment_list_type", "flat", - "-segment_list", "pipe:1", - "-segment_start_number", fmt.Sprint(start_segment), - outpath, + "-f", "hls", + // we can't list cut times w/ hls but + // - -hls_time will be cut on the next key frame after the time has passed + // - we specify keyframes in transcode with -force_key_frames + // - we know keyframes time of the transmux stream + // to unsure we don't have issues, the keyframe retriver needs to ignore + // sequentials keyframes closer than OptimalFragmentDuration. + // + // audio is simpler since we always cut at OptimalFragmentDuration + "-hls_time", fmt.Sprint(OptimalFragmentDuration), + "-start_number", fmt.Sprint(start_segment), + "-hls_segment_type", "mpegts", + "-hls_segment_filename", fmt.Sprintf("%s/%s", outpath, fmt.Sprintf(SegmentNameFormat, encoder_id)), + // Make the playlist easier to parse in our program by only outputing 1 segment and no endlist marker + // anyways this list is only read once and we generate our own. + "-hls_list_size", "1", + "-hls_flags", "omit_endlist", + "-", ) cmd := exec.Command("ffmpeg", args...) @@ -292,12 +295,18 @@ func (ts *Stream) run(start int32) error { go func() { scanner := bufio.NewScanner(stdout) - format := filepath.Base(outpath) + format := fmt.Sprintf(SegmentNameFormat, encoder_id) should_stop := false for scanner.Scan() { + line := scanner.Text() + // ignore m3u8 infos, we only want to know when segments are ready. + if line[0] == '#' { + continue + } + var segment int32 - _, _ = fmt.Sscanf(scanner.Text(), format, &segment) + _, _ = fmt.Sscanf(line, format, &segment) if segment < start { // This happen because we use -f segments for accurate cutting (since -ss is not) @@ -310,7 +319,7 @@ func (ts *Stream) run(start int32) error { if ts.isSegmentReady(segment) { // the current segment is already marked at done so another process has already gone up to here. cmd.Process.Signal(os.Interrupt) - log.Printf("Killing ffmpeg because segment %d is already ready", segment) + log.Printf("Killing ffmpeg %s-%d because segment %d is already ready", ts.handle.getIdentifier(), encoder_id, segment) should_stop = true } else { ts.segments[segment].encoder = encoder_id @@ -320,7 +329,7 @@ func (ts *Stream) run(start int32) error { should_stop = true } else if ts.isSegmentReady(segment + 1) { cmd.Process.Signal(os.Interrupt) - log.Printf("Killing ffmpeg because next segment %d is ready", segment) + log.Printf("Killing ffmpeg %s-%d because next segment %d is ready", ts.handle.getIdentifier(), encoder_id, segment) should_stop = true } } @@ -340,11 +349,11 @@ func (ts *Stream) run(start int32) error { go func() { err := cmd.Wait() if exiterr, ok := err.(*exec.ExitError); ok && exiterr.ExitCode() == 255 { - log.Printf("ffmpeg %d was killed by us", encoder_id) + log.Printf("ffmpeg %s-%d was killed by us", ts.handle.getIdentifier(), encoder_id) } else if err != nil { - log.Printf("ffmpeg %d occured an error: %s: %s", encoder_id, err, stderr.String()) + log.Printf("ffmpeg %s-%d occured an error: %s: %s", ts.handle.getIdentifier(), encoder_id, err, stderr.String()) } else { - log.Printf("ffmpeg %d finished successfully", encoder_id) + log.Printf("ffmpeg %s-%d finished successfully", ts.handle.getIdentifier(), encoder_id) } ts.lock.Lock() @@ -420,7 +429,15 @@ func (ts *Stream) GetSegment(segment int32) (string, error) { } } ts.prerareNextSegements(segment) - return fmt.Sprintf(ts.handle.getOutPath(ts.segments[segment].encoder), segment), nil + return fmt.Sprintf( + "%s/%s/%s", + ts.file.Out, + ts.handle.getIdentifier(), + fmt.Sprintf( + fmt.Sprintf(SegmentNameFormat, ts.segments[segment].encoder), + segment, + ), + ), nil } func (ts *Stream) prerareNextSegements(segment int32) { diff --git a/transcoder/src/videostream.go b/transcoder/src/videostream.go index 95c69a5602..0f65dea0ac 100644 --- a/transcoder/src/videostream.go +++ b/transcoder/src/videostream.go @@ -44,8 +44,8 @@ func (vs *VideoStream) getFlags() Flags { return VideoF } -func (vs *VideoStream) getOutPath(encoder_id int) string { - return fmt.Sprintf("%s/segment-%s-%d-%%d.ts", vs.file.Out, vs.quality, encoder_id) +func (vs *VideoStream) getIdentifier() string { + return fmt.Sprintf("v%d-%s", vs.video.Index, vs.quality) } func closestMultiple(n int32, x int32) int32 {