Skip to content
Draft
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
20 changes: 20 additions & 0 deletions runner/handlers.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,3 +45,23 @@ func (r *Runner) RequestStreamEnd(_ context.Context, req *protobuf.StreamEndRequ
}
return nil, status.Errorf(codes.NotFound, "stream not found")
}

func (r *Runner) HandleVOD(_ context.Context, req *protobuf.HandleVODRequest) (*protobuf.HandleVODResponse, error) {
data := map[string]any{
"streamID": req.GetStreamId(),
"streamVersion": req.GetVersion().String(),
"recordingDir": req.GetFilepath(),
}
r.log.Info("HandleVOD data constructed", "data", data)
a := []actions.Action{
actions.CheckCodec,
actions.MkVOD,
actions.CheckVoD,
actions.MkThumb,
}

jID := r.RunAction(a, data, r.log.With("stream_id", req.GetStreamId(), "stream_version", req.GetVersion(), "input", req.GetFilepath()))
r.log.Info("job added", "ID", jID)

return &protobuf.HandleVODResponse{}, nil
}
58 changes: 58 additions & 0 deletions runner/pkg/actions/checkcodec.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,58 @@
package actions

import (
"context"
"fmt"
"log/slog"
"path"

"github.com/tum-dev/gocast/runner/pkg/ffmpeg"
"github.com/tum-dev/gocast/runner/pkg/metrics"
"github.com/tum-dev/gocast/runner/protobuf"
)

// CheckCodec probes the recording file and checks if it needs re-encoding.
// Sets "needsReencode" to true if the video is not h264 or exceeds 3Mbit/s bitrate,
// or if audio is not AAC.
func CheckCodec(ctx context.Context, logger *slog.Logger, _ chan *protobuf.Notification, d map[string]any, _ *metrics.Broker) error {
recordingDir, ok := d["recordingDir"].(string)
if !ok {
return AbortingError(fmt.Errorf("no recordingDir in context"))
}
recording := path.Join(recordingDir, "playlist.m3u8")

probe, err := ffmpeg.Probe(ctx, recording)
if err != nil {
return AbortingError(fmt.Errorf("ffprobe failed: %w", err))
}

needsReencode := false
for _, stream := range probe.Streams() {
if stream.CodecType == "video" {
if stream.CodecName != "h264" {
needsReencode = true
logger.Info("video codec requires re-encoding", "codec", stream.CodecName)
break
}

if stream.BitRate > 3000000 { // 3 Mbit/s in bits/s
needsReencode = true
logger.Info("video bitrate exceeds 3Mbit/s", "bitrate", stream.BitRate)
break
}
}

if stream.CodecType == "audio" {
if stream.CodecName != "aac" {
needsReencode = true
logger.Info("audio codec requires re-encoding", "codec", stream.CodecName)
break
}
}
}

d["needsReencode"] = needsReencode
logger.Info("codec check completed", "needsReencode", needsReencode)

return nil
}
3 changes: 1 addition & 2 deletions runner/pkg/actions/mkthumb.go
Original file line number Diff line number Diff line change
Expand Up @@ -47,13 +47,12 @@ func MkThumb(_ context.Context, logger *slog.Logger, notify chan *protobuf.Notif
return nil
}

// createVideoThumbnail creates a thumbnail from the given video file

const (
thumbnailWidth = 720 // Width of the generated thumbnail in pixels
jpegCompressionQuality = 90 // JPEG compression quality (0-100)
)

// createVideoThumbnail creates a thumbnail from the given video file
func createVideoThumbnail(source string) ([]byte, error) {
g, err := thumbgen.New(source, thumbnailWidth, 1, "", thumbgen.WithJpegCompression(jpegCompressionQuality))
if err != nil {
Expand Down
42 changes: 36 additions & 6 deletions runner/pkg/actions/mkvod.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,9 +31,13 @@ func MkVOD(ctx context.Context, logger *slog.Logger, notify chan *protobuf.Notif
if !ok {
return AbortingError(fmt.Errorf("no stream version in context"))
}
recordingDir, ok := d["recordingDir"].(string)
if !ok {
return AbortingError(fmt.Errorf("no recordingDir in context"))
var recording string
if rec, ok := d["recording"]; ok {
recording = rec.(string)
} else if dir, ok := d["recordingDir"]; ok {
recording = path.Join(dir.(string), "playlist.m3u8")
} else {
return AbortingError(fmt.Errorf("no recording or recordingDir in context"))
}

metrics.ConvertingProgresses.With(metrics.With().Stream(streamID).L()).Inc()
Expand All @@ -45,7 +49,24 @@ func MkVOD(ctx context.Context, logger *slog.Logger, notify chan *protobuf.Notif
return AbortingError(fmt.Errorf("create VOD directory: %w", err))
}

err = convertStream(ctx, logger, streamID, path.Join(recordingDir, "playlist.m3u8"), vodDir, "playlist.m3u8")
// Check if re-encoding is needed
var reencode bool
if needsReencode, ok := d["needsReencode"]; ok {
reencode = needsReencode.(bool)
}

var videoCodec, audioCodec string
if reencode {
logger.Info("re-encoding required, transcoding video")
videoCodec = "libx264"
audioCodec = "aac"
} else {
logger.Info("no re-encoding needed, using copy codec")
videoCodec = "copy"
audioCodec = "copy"
}

err = convertStream(ctx, logger, streamID, recording, vodDir, "playlist.m3u8", videoCodec, audioCodec)
if err != nil {
return AbortingError(fmt.Errorf("convert stream: %w", err))
}
Expand All @@ -67,9 +88,18 @@ func MkVOD(ctx context.Context, logger *slog.Logger, notify chan *protobuf.Notif
return nil
}

func convertStream(ctx context.Context, logger *slog.Logger, streamID uint64, streamPath, vodDir string, playlistName string) error {
func convertStream(ctx context.Context, logger *slog.Logger, streamID uint64, streamPath, vodDir string, playlistName string, videoCodec string, audioCodec string) error {
input := "-i " + streamPath
options := "-c copy -f hls -hls_time 20 -hls_playlist_type vod -hls_flags append_list -hls_segment_filename " + path.Join(vodDir, "%05d.ts") + " " + path.Join(vodDir, playlistName)

// Build codec options based on parameters
codecOpts := fmt.Sprintf("-c:v %s -c:a %s", videoCodec, audioCodec)

// Add bitrate limit if re-encoding video
if videoCodec != "copy" {
codecOpts += " -b:v 3M"
}

options := codecOpts + " -f hls -hls_time 20 -hls_playlist_type vod -hls_flags append_list -hls_segment_filename " + path.Join(vodDir, "%05d.ts") + " " + path.Join(vodDir, playlistName)

args := strings.Split(input, " ")
args = append(args, strings.Split(options, " ")...)
Expand Down
24 changes: 24 additions & 0 deletions runner/pkg/ffmpeg/ffmpeg.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,30 @@ func (r *FFProbeResult) Container() string {
return gjson.Get(r.raw, "format.format_name").String()
}

// StreamInfo holds information about a single stream
type StreamInfo struct {
CodecType string
CodecName string
BitRate int64
}

// Streams returns all streams with their codec information
func (r *FFProbeResult) Streams() []StreamInfo {
nStreams := gjson.Get(r.raw, "streams.#").Int()
streams := make([]StreamInfo, 0, nStreams)
for i := 0; i < int(nStreams); i++ {
codecType := gjson.Get(r.raw, fmt.Sprintf("streams.%d.codec_type", i)).String()
codecName := gjson.Get(r.raw, fmt.Sprintf("streams.%d.codec_name", i)).String()
bitRate := gjson.Get(r.raw, fmt.Sprintf("streams.%d.bit_rate", i)).Int()
streams = append(streams, StreamInfo{
CodecType: codecType,
CodecName: codecName,
BitRate: bitRate,
})
}
return streams
}

// FFProbeResult holds the results of a ffprobe execution
type FFProbeResult struct {
raw string
Expand Down
4 changes: 2 additions & 2 deletions runner/protobuf/commons.pb.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

4 changes: 2 additions & 2 deletions runner/protobuf/notifications.pb.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Loading
Loading