diff --git a/transcode/transcode.go b/transcode/transcode.go index d187b63a5..ba67f27c3 100644 --- a/transcode/transcode.go +++ b/transcode/transcode.go @@ -8,6 +8,7 @@ import ( "net/url" "os" "path/filepath" + "strconv" "strings" "sync" "time" @@ -22,7 +23,7 @@ import ( const ( UploadTimeout = 5 * time.Minute - TransmuxStorageDir = "/tmp/transmux_stage" + SegmentChannelSize = 10 ) type TranscodeSegmentRequest struct { @@ -53,6 +54,12 @@ type TranscodeSegmentRequest struct { IsClip bool } +type TranscodedSegmentInfo struct { + RequestID string + RenditionName string + SegmentIndex int +} + func RunTranscodeProcess(transcodeRequest TranscodeSegmentRequest, streamName string, inputInfo video.InputVideo, broadcaster clients.BroadcasterClient) ([]video.OutputVideo, int, error) { log.AddContext(transcodeRequest.RequestID, "source_manifest", transcodeRequest.SourceManifestURL, "stream_name", streamName) log.Log(transcodeRequest.RequestID, "RunTranscodeProcess (v2) Beginning") @@ -148,9 +155,16 @@ func RunTranscodeProcess(transcodeRequest TranscodeSegmentRequest, streamName st } } + // Create a buffered channel where transcoded segments are sent to be written to disk + segmentChannel := make(chan TranscodedSegmentInfo, SegmentChannelSize) + + // Create a waitgroup to synchronize when the disk writing goroutine finishes + var wg sync.WaitGroup + + // Setup parallel transcode sessions var jobs *ParallelTranscoding jobs = NewParallelTranscoding(sourceSegmentURLs, func(segment segmentInfo) error { - err := transcodeSegment(segment, streamName, manifestID, transcodeRequest, transcodeProfiles, hlsTargetURL, transcodedStats, &renditionList, broadcaster) + err := transcodeSegment(segment, streamName, manifestID, transcodeRequest, transcodeProfiles, hlsTargetURL, transcodedStats, &renditionList, broadcaster, segmentChannel) segmentsCount++ if err != nil { return err @@ -162,12 +176,55 @@ func RunTranscodeProcess(transcodeRequest TranscodeSegmentRequest, streamName st } return nil }) + + var TransmuxStorageDir string + if transcodeRequest.GenerateMP4 { + var err error + // Create folder to hold transmux-ed files in local storage temporarily + TransmuxStorageDir, err = os.MkdirTemp(os.TempDir(), "transmux_stage_"+transcodeRequest.RequestID+"_") + if err != nil && !os.IsExist(err) { + log.Log(transcodeRequest.RequestID, "failed to create temp dir for transmuxing", "dir", TransmuxStorageDir, "err", err) + return outputs, segmentsCount, err + } + defer os.RemoveAll(TransmuxStorageDir) + + // Start the disk-writing (consumer) goroutine + wg.Add(1) + go func(transmuxTopLevelDir string, renditionList *video.TRenditionList) { + var segmentBatch []TranscodedSegmentInfo + defer wg.Done() + + // Keep checking for new segments in the buffered channel + for segInfo := range segmentChannel { + segmentBatch = append(segmentBatch, segInfo) + // Begin writing to disk if at-least 50% of buffered channel is full + if len(segmentBatch) >= SegmentChannelSize/2 { + writeSegmentsToDisk(transmuxTopLevelDir, renditionList, segmentBatch) + fmt.Println("XXX: writing to disk here because > 10", segInfo.RenditionName, segInfo.SegmentIndex) + segmentBatch = nil + } + } + // Handle any remaining segments after the channel is closed + if len(segmentBatch) > 0 { + writeSegmentsToDisk(transmuxTopLevelDir, renditionList, segmentBatch) + fmt.Println("XXX: writing to disk here after channel close") + } + }(TransmuxStorageDir, &renditionList) + } + + // Start the transcoding (producer) goroutines jobs.Start() if err = jobs.Wait(); err != nil { // return first error to caller return outputs, segmentsCount, err } + // If the disk-writing gorouine was started, then close the segment channel to + // signal that no more segments will be sent. This will be a no-op if MP4s are not requested. + close(segmentChannel) + // Wait for disk-writing goroutine to finish. This will be a no-op if MP4s are not requested. + wg.Wait() + // Build the manifests and push them to storage manifestURL, err := clients.GenerateAndUploadManifests(sourceManifest, hlsTargetURL.String(), transcodedStats, transcodeRequest.IsClip) if err != nil { @@ -193,14 +250,6 @@ func RunTranscodeProcess(transcodeRequest TranscodeSegmentRequest, streamName st var concatFiles []string for rendition, segments := range renditionList.RenditionSegmentTable { - // Create folder to hold transmux-ed files in local storage temporarily - TransmuxStorageDir, err := os.MkdirTemp(os.TempDir(), "transmux_stage_") - if err != nil && !os.IsExist(err) { - log.Log(transcodeRequest.RequestID, "failed to create temp dir for transmuxing", "dir", TransmuxStorageDir, "err", err) - return outputs, segmentsCount, err - } - defer os.RemoveAll(TransmuxStorageDir) - // Create a single .ts file for a given rendition by concatenating all segments in order if rendition == "low-bitrate" { // skip mp4 generation for low-bitrate profile @@ -210,14 +259,8 @@ func RunTranscodeProcess(transcodeRequest TranscodeSegmentRequest, streamName st concatFiles = append(concatFiles, concatTsFileName) defer os.Remove(concatTsFileName) - // For now, use the stream based concat for clipping only and file based concat for everything else. - // Eventually, all mp4 generation can be moved to stream based concat once proven effective. var totalBytes int64 - if transcodeRequest.IsClip { - totalBytes, err = video.ConcatTS(concatTsFileName, segments, true) - } else { - totalBytes, err = video.ConcatTS(concatTsFileName, segments, false) - } + totalBytes, err = video.ConcatTS(concatTsFileName, segments, true) if err != nil { log.Log(transcodeRequest.RequestID, "error concatenating .ts", "file", concatTsFileName, "err", err) continue @@ -359,6 +402,29 @@ func RunTranscodeProcess(transcodeRequest TranscodeSegmentRequest, streamName st return outputs, segmentsCount, nil } +func writeSegmentsToDisk(transmuxTopLevelDir string, renditionList *video.TRenditionList, segmentBatch []TranscodedSegmentInfo) (int64, error) { + for _, segInfo := range segmentBatch { + + // All accesses to renditionList and segmentList is protected by a mutex behind the scenes + segmentList := renditionList.GetSegmentList(segInfo.RenditionName) + segmentData := segmentList.GetSegment(segInfo.SegmentIndex) + segmentFilename := filepath.Join(transmuxTopLevelDir, segInfo.RequestID+"_"+segInfo.RenditionName+"_"+strconv.Itoa(segInfo.SegmentIndex)+".ts") + segmentFile, err := os.Create(segmentFilename) + if err != nil { + return 0, fmt.Errorf("error creating .ts file to write transcoded segment data err: %w", err) + } + defer segmentFile.Close() + _, err = segmentFile.Write(segmentData) + if err != nil { + return 0, fmt.Errorf("error writing segment err: %w", err) + } + // "Delete" buffered segment data from memory in hopes the garbage-collector releases it + segmentList.RemoveSegmentData(segInfo.SegmentIndex) + + } + return 0, nil +} + func uploadMp4Files(basePath *url.URL, mp4OutputFiles []string, prefix string) ([]video.OutputVideoFile, error) { var mp4OutputsPre []video.OutputVideoFile // e. Upload all mp4 related output files @@ -432,6 +498,7 @@ func transcodeSegment( transcodedStats []*video.RenditionStats, renditionList *video.TRenditionList, broadcaster clients.BroadcasterClient, + segmentChannel chan<- TranscodedSegmentInfo, ) error { start := time.Now() @@ -516,6 +583,15 @@ func transcodeSegment( // be generated i.e. all profiles for mp4 inputs and only highest quality // rendition for hls inputs like recordings. segmentsList.AddSegmentData(segment.Index, transcodedSegment.MediaData) + + // send this transcoded segment to the segment channel so that it can be written + // to disk in parallel + segmentChannel <- TranscodedSegmentInfo{ + RequestID: transcodeRequest.RequestID, + RenditionName: transcodedSegment.Name, // Use actual rendition name + SegmentIndex: segment.Index, // Use actual segment index + } + } } diff --git a/video/media.go b/video/media.go index 71d1c7c4d..1eba7173c 100644 --- a/video/media.go +++ b/video/media.go @@ -39,6 +39,12 @@ func (s *TSegmentList) AddSegmentData(segIdx int, data []byte) { s.mu.Unlock() } +func (s *TSegmentList) RemoveSegmentData(segIdx int) { + s.mu.Lock() + s.SegmentDataTable[segIdx] = []byte{} + s.mu.Unlock() +} + func (s *TSegmentList) GetSegment(segIdx int) []byte { s.mu.Lock() defer s.mu.Unlock() diff --git a/video/transmux.go b/video/transmux.go index 84125f8ca..48c863b15 100644 --- a/video/transmux.go +++ b/video/transmux.go @@ -138,19 +138,26 @@ func ConcatTS(tsFileName string, segmentsList *TSegmentList, useStreamBasedConca }() // Write each segment to disk and add segment filename to the text file - for segName, segData := range segmentsList.GetSortedSegments() { + for segName, _ := range segmentsList.GetSortedSegments() { // Open a new file to write each segment to disk segmentFilename := fileBaseWithoutExt + "_" + strconv.Itoa(segName) + ".ts" - segmentFile, err := os.Create(segmentFilename) + /* segmentFile, err := os.Create(segmentFilename) + if err != nil { + return totalBytes, fmt.Errorf("error creating individual segment file (%s) err: %w", segmentFilename, err) + } + defer segmentFile.Close() + // Write the segment data to disk + segBytes, err := segmentFile.Write(segmentsList.SegmentDataTable[segData]) + if err != nil { + return totalBytes, fmt.Errorf("error writing segment %d err: %w", segName, err) + } + */ + fileInfo, err := os.Stat(segmentFilename) if err != nil { - return totalBytes, fmt.Errorf("error creating individual segment file (%s) err: %w", segmentFilename, err) - } - defer segmentFile.Close() - // Write the segment data to disk - segBytes, err := segmentFile.Write(segmentsList.SegmentDataTable[segData]) - if err != nil { - return totalBytes, fmt.Errorf("error writing segment %d err: %w", segName, err) + return totalBytes, fmt.Errorf("error stat segment %d err: %w", segName, err) } + segBytes := fileInfo.Size() + segmentFilenames = append(segmentFilenames, segmentFilename) totalBytes = totalBytes + int64(segBytes) // Add filename to the text file