Skip to content

Commit eadf20f

Browse files
committed
fix: Optimize the writing of metadata into FLV files
1 parent c5aa26d commit eadf20f

File tree

1 file changed

+76
-41
lines changed

1 file changed

+76
-41
lines changed

plugin/flv/pkg/record.go

Lines changed: 76 additions & 41 deletions
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,7 @@ import (
44
"context"
55
"fmt"
66
"io"
7+
"log/slog"
78
"os"
89
"path/filepath"
910
"time"
@@ -16,6 +17,27 @@ import (
1617
rtmp "m7s.live/v5/plugin/rtmp/pkg"
1718
)
1819

20+
// MetaData 结构体保存 writeMetaTag 需要的编解码器元数据
21+
type MetaData struct {
22+
// 音频相关
23+
HasAudio bool
24+
AudioCodecID int
25+
AudioSampleRate int
26+
AudioSampleSize int
27+
AudioChannels int
28+
29+
// 视频相关
30+
HasVideo bool
31+
VideoCodecID int
32+
VideoWidth int
33+
VideoHeight int
34+
VideoFPS int
35+
VideoBPS int
36+
37+
// 日志记录器
38+
Logger *slog.Logger
39+
}
40+
1941
type WriteFlvMetaTagQueueTask struct {
2042
task.Work
2143
}
@@ -39,6 +61,9 @@ func (task *writeMetaTagTask) Start() (err error) {
3961
err = task.file.Close()
4062
if info, err := task.file.Stat(); err == nil && info.Size() == 0 {
4163
err = os.Remove(info.Name())
64+
if err != nil {
65+
task.Error("writeMetaTagTask", "remove file err", err)
66+
}
4267
}
4368
}()
4469
var tempFile *os.File
@@ -78,9 +103,8 @@ func (task *writeMetaTagTask) Start() (err error) {
78103
}
79104
}
80105

81-
func writeMetaTag(file storage.File, suber *m7s.Subscriber, filepositions []uint64, times []float64, duration *int64) {
82-
ar, vr := suber.AudioReader, suber.VideoReader
83-
hasAudio, hasVideo := ar != nil, vr != nil
106+
func writeMetaTag(file storage.File, metadata *MetaData, filepositions []uint64, times []float64, duration *int64) {
107+
hasAudio, hasVideo := metadata.HasAudio, metadata.HasVideo
84108
var amf rtmp.AMF
85109
metaData := rtmp.EcmaArray{
86110
"MetaDataCreator": "m7s/" + m7s.Version,
@@ -94,21 +118,19 @@ func writeMetaTag(file storage.File, suber *m7s.Subscriber, filepositions []uint
94118
}
95119
var flags byte
96120
if hasAudio {
97-
ctx := ar.Track.ICodecCtx.GetBase().(pkg.IAudioCodecCtx)
98121
flags |= (1 << 2)
99-
metaData["audiocodecid"] = int(rtmp.ParseAudioCodec(ctx.FourCC()))
100-
metaData["audiosamplerate"] = ctx.GetSampleRate()
101-
metaData["audiosamplesize"] = ctx.GetSampleSize()
102-
metaData["stereo"] = ctx.GetChannels() == 2
122+
metaData["audiocodecid"] = metadata.AudioCodecID
123+
metaData["audiosamplerate"] = metadata.AudioSampleRate
124+
metaData["audiosamplesize"] = metadata.AudioSampleSize
125+
metaData["stereo"] = metadata.AudioChannels == 2
103126
}
104127
if hasVideo {
105-
ctx := vr.Track.ICodecCtx.GetBase().(pkg.IVideoCodecCtx)
106128
flags |= 1
107-
metaData["videocodecid"] = int(rtmp.ParseVideoCodec(ctx.FourCC()))
108-
metaData["width"] = ctx.Width()
109-
metaData["height"] = ctx.Height()
110-
metaData["framerate"] = vr.Track.FPS
111-
metaData["videodatarate"] = vr.Track.BPS
129+
metaData["videocodecid"] = metadata.VideoCodecID
130+
metaData["width"] = metadata.VideoWidth
131+
metaData["height"] = metadata.VideoHeight
132+
metaData["framerate"] = metadata.VideoFPS
133+
metaData["videodatarate"] = metadata.VideoBPS
112134
metaData["keyframes"] = map[string]any{
113135
"filepositions": filepositions,
114136
"times": times,
@@ -133,7 +155,7 @@ func writeMetaTag(file storage.File, suber *m7s.Subscriber, filepositions []uint
133155
flags: flags,
134156
metaData: marshals,
135157
}
136-
wrTask.Logger = suber.Logger.With("file", file.Name())
158+
wrTask.Logger = metadata.Logger.With("file", file.Name())
137159
writeMetaTagQueueTask.AddTask(wrTask)
138160
}
139161

@@ -143,8 +165,9 @@ func NewRecorder(conf config.Record) m7s.IRecorder {
143165

144166
type Recorder struct {
145167
m7s.DefaultRecorder
146-
writer *FlvWriter
147-
file storage.File
168+
writer *FlvWriter
169+
file storage.File
170+
metadata *MetaData // 保存编解码器元数据,避免在OnStop回调时指针失效
148171
}
149172

150173
var CustomFileName = func(job *m7s.RecordJob) string {
@@ -178,29 +201,6 @@ func (r *Recorder) createStream(start time.Time) (err error) {
178201
if err != nil {
179202
return
180203
}
181-
// 写入序列头(如果已知)以保证每个分片可独立回放
182-
// 优先使用 Subscriber 的 VideoReader/AudioReader 的 codec context 的 sequence frame
183-
sub := r.RecordJob.Subscriber
184-
if sub != nil {
185-
// 视频序列头
186-
if sub.VideoReader != nil && sub.VideoReader.Track != nil && sub.VideoReader.Track.ICodecCtx != nil {
187-
if seqCtx, ok := sub.VideoReader.Track.ICodecCtx.(pkg.ISequenceCodecCtx[*rtmp.VideoFrame]); ok {
188-
seq := seqCtx.GetSequenceFrame()
189-
if seq != nil && seq.Size > 0 {
190-
_ = r.writer.WriteTag(FLV_TAG_TYPE_VIDEO, 0, uint32(seq.Size), seq.Buffers...)
191-
}
192-
}
193-
}
194-
// 音频序列头
195-
if sub.AudioReader != nil && sub.AudioReader.Track != nil && sub.AudioReader.Track.ICodecCtx != nil {
196-
if seqCtx, ok := sub.AudioReader.Track.ICodecCtx.(pkg.ISequenceCodecCtx[*rtmp.AudioFrame]); ok {
197-
seq := seqCtx.GetSequenceFrame()
198-
if seq != nil && seq.Size > 0 {
199-
_ = r.writer.WriteTag(FLV_TAG_TYPE_AUDIO, 0, uint32(seq.Size), seq.Buffers...)
200-
}
201-
}
202-
}
203-
}
204204
return
205205
}
206206

@@ -230,10 +230,14 @@ func (r *Recorder) Run() (err error) {
230230
var duration int64
231231
ctx := &r.RecordJob
232232
suber := ctx.Subscriber
233+
233234
noFragment := ctx.RecConf.Fragment == 0 || ctx.RecConf.Append
235+
suber.OnStop(func() {
236+
writeMetaTag(r.file, r.metadata, filepositions, times, &duration)
237+
})
234238
checkFragment := func(absTime uint32, writeTime time.Time) {
235239
if duration = int64(absTime); time.Duration(duration)*time.Millisecond >= ctx.RecConf.Fragment {
236-
writeMetaTag(r.file, suber, filepositions, times, &duration)
240+
writeMetaTag(r.file, r.metadata, filepositions, times, &duration)
237241
r.writeTailer(writeTime)
238242
filepositions = []uint64{0}
239243
times = []float64{0}
@@ -259,19 +263,50 @@ func (r *Recorder) Run() (err error) {
259263
}
260264

261265
return m7s.PlayBlock(ctx.Subscriber, func(audio *rtmp.AudioFrame) (err error) {
266+
// 初始化元数据结构体(如果还没有)
267+
if r.metadata == nil {
268+
r.metadata = &MetaData{Logger: suber.Logger}
269+
}
270+
271+
// 如果还没有设置音频参数,并且当前有音频流,则设置音频参数
272+
if !r.metadata.HasAudio && suber.AudioReader != nil {
273+
r.metadata.HasAudio = true
274+
audioCtx := suber.AudioReader.Track.ICodecCtx.GetBase().(pkg.IAudioCodecCtx)
275+
r.metadata.AudioCodecID = int(rtmp.ParseAudioCodec(audioCtx.FourCC()))
276+
r.metadata.AudioSampleRate = audioCtx.GetSampleRate()
277+
r.metadata.AudioSampleSize = audioCtx.GetSampleSize()
278+
r.metadata.AudioChannels = audioCtx.GetChannels()
279+
}
280+
262281
if suber.VideoReader == nil && !noFragment {
263282
checkFragment(suber.AudioReader.AbsTime, suber.AudioReader.Value.WriteTime)
264283
}
265284
err = r.writer.WriteTag(FLV_TAG_TYPE_AUDIO, suber.AudioReader.AbsTime, uint32(audio.Size), audio.Buffers...)
266285
offset += int64(audio.Size + 15)
267286
return
268287
}, func(video *rtmp.VideoFrame) (err error) {
288+
// 初始化元数据结构体(如果还没有)
289+
if r.metadata == nil {
290+
r.metadata = &MetaData{Logger: suber.Logger}
291+
}
292+
269293
if r.Event.StartTime.IsZero() {
270294
err = r.createStream(suber.VideoReader.Value.WriteTime)
271295
if err != nil {
272296
return err
273297
}
274298
}
299+
300+
// 如果还没有设置视频参数,并且当前有视频流,则设置视频参数
301+
if !r.metadata.HasVideo && suber.VideoReader != nil {
302+
r.metadata.HasVideo = true
303+
videoCtx := suber.VideoReader.Track.ICodecCtx.GetBase().(pkg.IVideoCodecCtx)
304+
r.metadata.VideoCodecID = int(rtmp.ParseVideoCodec(videoCtx.FourCC()))
305+
r.metadata.VideoWidth = videoCtx.Width()
306+
r.metadata.VideoHeight = videoCtx.Height()
307+
r.metadata.VideoFPS = suber.VideoReader.Track.FPS
308+
r.metadata.VideoBPS = suber.VideoReader.Track.BPS
309+
}
275310
if suber.VideoReader.Value.IDR {
276311
filepositions = append(filepositions, uint64(offset))
277312
times = append(times, float64(suber.VideoReader.AbsTime)/1000)

0 commit comments

Comments
 (0)