Skip to content

Commit 747344f

Browse files
committed
Merge remote-tracking branch 'origin/v5' into eanfs-v5
* origin/v5: fix: save dialog into gb.dialogs for pause/resume gb28181 record fix: alias with args fix: download hls recordtype 'ts' not 'hls',mp4 record save pps/sps before idr fix: publish timeout fix: pull-recorder of mp4 feat: save mp4 to s3 and play mp4 from s3
2 parents 2316cc6 + 35df83b commit 747344f

File tree

12 files changed

+349
-136
lines changed

12 files changed

+349
-136
lines changed

alias.go

Lines changed: 81 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -7,14 +7,16 @@ import (
77
"time"
88

99
"google.golang.org/protobuf/types/known/emptypb"
10+
"gorm.io/gorm"
1011
"m7s.live/v5/pb"
1112
)
1213

1314
type AliasStream struct {
1415
*Publisher `gorm:"-:all"`
1516
AutoRemove bool
1617
StreamPath string
17-
Alias string `gorm:"primarykey"`
18+
Alias string `gorm:"primarykey"`
19+
Args url.Values `gorm:"-"`
1820
}
1921

2022
func (a *AliasStream) GetKey() string {
@@ -24,14 +26,49 @@ func (a *AliasStream) GetKey() string {
2426
// StreamAliasDB 用于存储流别名的数据库模型
2527
type StreamAliasDB struct {
2628
AliasStream
27-
CreatedAt time.Time `yaml:"-"`
28-
UpdatedAt time.Time `yaml:"-"`
29+
ArgsString string `gorm:"column:args;type:text"`
30+
CreatedAt time.Time `yaml:"-"`
31+
UpdatedAt time.Time `yaml:"-"`
2932
}
3033

3134
func (StreamAliasDB) TableName() string {
3235
return "stream_alias"
3336
}
3437

38+
// BeforeSave 保存前序列化查询参数
39+
func (db *StreamAliasDB) BeforeSave(tx *gorm.DB) error {
40+
if len(db.Args) > 0 {
41+
db.ArgsString = db.Args.Encode()
42+
} else {
43+
db.ArgsString = ""
44+
}
45+
return nil
46+
}
47+
48+
// BeforeCreate 创建前序列化查询参数
49+
func (db *StreamAliasDB) BeforeCreate(tx *gorm.DB) error {
50+
return db.BeforeSave(tx)
51+
}
52+
53+
// BeforeUpdate 更新前序列化查询参数
54+
func (db *StreamAliasDB) BeforeUpdate(tx *gorm.DB) error {
55+
return db.BeforeSave(tx)
56+
}
57+
58+
// AfterFind 查询后反序列化查询参数
59+
func (db *StreamAliasDB) AfterFind(tx *gorm.DB) error {
60+
if db.ArgsString != "" {
61+
var err error
62+
db.Args, err = url.ParseQuery(db.ArgsString)
63+
if err != nil {
64+
db.Args = nil
65+
}
66+
} else {
67+
db.Args = nil
68+
}
69+
return nil
70+
}
71+
3572
func (s *Server) initStreamAlias() {
3673
if s.DB == nil {
3774
return
@@ -75,13 +112,15 @@ func (s *Server) SetStreamAlias(ctx context.Context, req *pb.SetStreamAliasReque
75112
return
76113
}
77114
req.StreamPath = strings.TrimPrefix(u.Path, "/")
115+
queryParams := u.Query()
78116
publisher, canReplace := s.Streams.Get(req.StreamPath)
79117
if !canReplace {
80-
defer s.OnSubscribe(req.StreamPath, u.Query())
118+
defer s.OnSubscribe(req.StreamPath, queryParams)
81119
}
82120
if aliasInfo, ok := s.AliasStreams.Get(req.Alias); ok { //modify alias
83121
oldStreamPath := aliasInfo.StreamPath
84122
aliasInfo.AutoRemove = req.AutoRemove
123+
aliasInfo.Args = queryParams
85124
if aliasInfo.StreamPath != req.StreamPath {
86125
aliasInfo.StreamPath = req.StreamPath
87126
if canReplace {
@@ -96,16 +135,18 @@ func (s *Server) SetStreamAlias(ctx context.Context, req *pb.SetStreamAliasReque
96135
}
97136
// 更新数据库中的别名
98137
if s.DB != nil {
99-
s.DB.Where("alias = ?", req.Alias).Assign(aliasInfo).FirstOrCreate(&StreamAliasDB{
138+
dbAlias := &StreamAliasDB{
100139
AliasStream: *aliasInfo,
101-
})
140+
}
141+
s.DB.Where("alias = ?", req.Alias).Save(dbAlias)
102142
}
103143
s.Info("modify alias", "alias", req.Alias, "oldStreamPath", oldStreamPath, "streamPath", req.StreamPath, "replace", ok && canReplace)
104144
} else { // create alias
105145
aliasInfo := AliasStream{
106146
AutoRemove: req.AutoRemove,
107147
StreamPath: req.StreamPath,
108148
Alias: req.Alias,
149+
Args: queryParams,
109150
}
110151
var pubId uint32
111152
s.AliasStreams.Add(&aliasInfo)
@@ -125,9 +166,10 @@ func (s *Server) SetStreamAlias(ctx context.Context, req *pb.SetStreamAliasReque
125166
}
126167
// 保存到数据库
127168
if s.DB != nil {
128-
s.DB.Create(&StreamAliasDB{
169+
dbAlias := &StreamAliasDB{
129170
AliasStream: aliasInfo,
130-
})
171+
}
172+
s.DB.Create(dbAlias)
131173
}
132174
s.Info("add alias", "alias", req.Alias, "streamPath", req.StreamPath, "replace", ok && canReplace, "pub", pubId)
133175
}
@@ -143,15 +185,20 @@ func (s *Server) SetStreamAlias(ctx context.Context, req *pb.SetStreamAliasReque
143185
if publisher, hasTarget := s.Streams.Get(req.Alias); hasTarget { // restore stream
144186
aliasStream.TransferSubscribers(publisher)
145187
} else {
146-
var args url.Values
147-
for sub := range aliasStream.Publisher.SubscriberRange {
148-
if sub.StreamPath == req.Alias {
149-
aliasStream.Publisher.RemoveSubscriber(sub)
150-
s.Waiting.Wait(sub)
151-
args = sub.Args
188+
// 优先使用别名保存的查询参数
189+
args := aliasStream.Args
190+
if len(args) == 0 {
191+
// 如果没有保存的查询参数,则从订阅者中获取
192+
for sub := range aliasStream.Publisher.SubscriberRange {
193+
if sub.StreamPath == req.Alias {
194+
aliasStream.Publisher.RemoveSubscriber(sub)
195+
s.Waiting.Wait(sub)
196+
args = sub.Args
197+
break
198+
}
152199
}
153200
}
154-
if args != nil {
201+
if len(args) > 0 {
155202
s.OnSubscribe(req.Alias, args)
156203
}
157204
}
@@ -218,7 +265,21 @@ func (s *Subscriber) processAliasOnStart() (hasInvited bool, done bool) {
218265
done = true
219266
return
220267
} else {
221-
server.OnSubscribe(alias.StreamPath, s.Args)
268+
// 合并参数:先使用别名保存的参数,然后用订阅者传入的参数覆盖同名参数
269+
args := make(url.Values)
270+
// 先复制别名保存的参数
271+
if alias.Args != nil {
272+
for k, v := range alias.Args {
273+
args[k] = append([]string(nil), v...)
274+
}
275+
}
276+
// 用订阅者传入的参数覆盖同名参数
277+
if s.Args != nil {
278+
for k, v := range s.Args {
279+
args[k] = append([]string(nil), v...)
280+
}
281+
}
282+
server.OnSubscribe(alias.StreamPath, args)
222283
hasInvited = true
223284
}
224285
} else {
@@ -227,12 +288,14 @@ func (s *Subscriber) processAliasOnStart() (hasInvited bool, done bool) {
227288
as := AliasStream{
228289
StreamPath: streamPath,
229290
Alias: s.StreamPath,
291+
Args: s.Args,
230292
}
231293
server.AliasStreams.Set(&as)
232294
if server.DB != nil {
233-
server.DB.Where("alias = ?", s.StreamPath).Assign(as).FirstOrCreate(&StreamAliasDB{
295+
dbAlias := &StreamAliasDB{
234296
AliasStream: as,
235-
})
297+
}
298+
server.DB.Where("alias = ?", s.StreamPath).Save(dbAlias)
236299
}
237300
if publisher, ok := server.Streams.Get(streamPath); ok {
238301
publisher.AddSubscriber(s)

pkg/storage/local.go

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -63,6 +63,15 @@ func (s *LocalStorage) CreateFile(ctx context.Context, path string) (File, error
6363
return file, nil
6464
}
6565

66+
func (s *LocalStorage) OpenFile(ctx context.Context, path string) (File, error) {
67+
// 只读模式打开文件
68+
file, err := os.Open(path)
69+
if err != nil {
70+
return nil, fmt.Errorf("failed to open file: %w", err)
71+
}
72+
return file, nil
73+
}
74+
6675
func (s *LocalStorage) Delete(ctx context.Context, path string) error {
6776
return os.Remove(path)
6877
}

pkg/storage/s3.go

Lines changed: 32 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -110,6 +110,17 @@ func (s *S3Storage) CreateFile(ctx context.Context, path string) (File, error) {
110110
storage: s,
111111
objectKey: objectKey,
112112
ctx: ctx,
113+
readOnly: false,
114+
}, nil
115+
}
116+
117+
func (s *S3Storage) OpenFile(ctx context.Context, path string) (File, error) {
118+
objectKey := s.getObjectKey(path)
119+
return &S3File{
120+
storage: s,
121+
objectKey: objectKey,
122+
ctx: ctx,
123+
readOnly: true, // 只读模式
113124
}, nil
114125
}
115126

@@ -238,6 +249,7 @@ type S3File struct {
238249
ctx context.Context
239250
tempFile *os.File // 本地临时文件,用于支持随机访问
240251
filePath string // 临时文件路径
252+
readOnly bool // 只读模式,不上传到S3
241253
}
242254

243255
func (w *S3File) Name() string {
@@ -300,19 +312,24 @@ func (w *S3File) ReadAt(p []byte, off int64) (n int, err error) {
300312
}
301313

302314
func (w *S3File) Sync() error {
315+
// 只读模式不上传
316+
if w.readOnly {
317+
if w.tempFile != nil {
318+
return w.tempFile.Sync()
319+
}
320+
return nil
321+
}
322+
303323
// 如果使用临时文件,先同步到磁盘
304324
if w.tempFile != nil {
305325
fmt.Printf("S3File.Sync: syncing temp file to disk\n")
306326
if err := w.tempFile.Sync(); err != nil {
307327
return fmt.Errorf("failed to sync temp file to disk: %w", err)
308328
}
309-
310-
// 获取文件信息
311-
fileInfo, err := w.tempFile.Stat()
312-
if err != nil {
313-
return fmt.Errorf("failed to stat temp file: %w", err)
329+
// 获取文件大小用于日志
330+
if stat, err := w.tempFile.Stat(); err == nil {
331+
fmt.Printf("[S3File.Sync] tempFile size: %d bytes, path: %s\n", stat.Size(), w.filePath)
314332
}
315-
fmt.Printf("S3File.Sync: temp file size=%d, name=%s\n", fileInfo.Size(), fileInfo.Name())
316333
}
317334

318335
fmt.Printf("S3File.Sync: uploading to S3\n")
@@ -376,23 +393,16 @@ func (w *S3File) Stat() (os.FileInfo, error) {
376393

377394
// uploadTempFile 上传临时文件到S3
378395
func (w *S3File) uploadTempFile() (err error) {
379-
if w.tempFile == nil {
380-
return fmt.Errorf("temp file is nil")
381-
}
382-
383-
// 获取文件信息以记录大小
384-
fileInfo, err := w.tempFile.Stat()
385-
if err != nil {
386-
return fmt.Errorf("failed to stat temp file: %w", err)
387-
}
388-
389-
// 重置文件指针到开始位置,确保从文件头开始读取
396+
// 重置文件指针到开头
390397
if _, err := w.tempFile.Seek(0, 0); err != nil {
391-
return fmt.Errorf("failed to seek to beginning: %w", err)
398+
fmt.Printf("[S3File.uploadTempFile] failed to seek: %v\n", err)
399+
return fmt.Errorf("failed to seek temp file: %w", err)
392400
}
393401

394-
// 记录上传信息
395-
fmt.Printf("Uploading to S3: bucket=%s, key=%s, size=%d\n", w.storage.config.Bucket, w.objectKey, fileInfo.Size())
402+
// 获取文件大小
403+
stat, _ := w.tempFile.Stat()
404+
fmt.Printf("[S3File.uploadTempFile] uploading to S3: bucket=%s, key=%s, size=%d\n",
405+
w.storage.config.Bucket, w.objectKey, stat.Size())
396406

397407
// 上传到S3
398408
_, err = w.storage.uploader.UploadWithContext(w.ctx, &s3manager.UploadInput{
@@ -403,10 +413,11 @@ func (w *S3File) uploadTempFile() (err error) {
403413
})
404414

405415
if err != nil {
416+
fmt.Printf("[S3File.uploadTempFile] upload failed: %v\n", err)
406417
return fmt.Errorf("failed to upload to S3: %w", err)
407418
}
408419

409-
fmt.Printf("Successfully uploaded to S3: bucket=%s, key=%s\n", w.storage.config.Bucket, w.objectKey)
420+
fmt.Printf("[S3File.uploadTempFile] upload successful: %s\n", w.objectKey)
410421
return nil
411422
}
412423

pkg/storage/storage.go

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -27,6 +27,8 @@ type StorageConfig interface {
2727
// Storage 存储接口
2828
type Storage interface {
2929
CreateFile(ctx context.Context, path string) (File, error)
30+
// OpenFile 以只读模式打开文件(不会上传修改)
31+
OpenFile(ctx context.Context, path string) (File, error)
3032
// Delete 删除文件
3133
Delete(ctx context.Context, path string) error
3234

plugin/gb28181/dialog.go

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -326,6 +326,7 @@ func (d *Dialog) Start() (err error) {
326326
"broadcastPushAfterAck": device.BroadcastPushAfterAck,
327327
})
328328
d.pullCtx.GoToStepConst(StepResponseWait)
329+
d.gb.dialogs.Set(d)
329330
return
330331
}
331332

@@ -522,4 +523,5 @@ func (d *Dialog) Dispose() {
522523
d.Error("listener dialog bye bye", " err", err)
523524
}
524525
}
526+
d.gb.dialogs.Remove(d)
525527
}

plugin/hls/download.go

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -65,7 +65,7 @@ func (plugin *HLSPlugin) queryRecordStreams(params *requestParams) ([]m7s.Record
6565
var recordStreams []m7s.RecordStream
6666

6767
// 首先查询HLS记录 (ts)
68-
query := plugin.DB.Model(&m7s.RecordStream{}).Where("stream_path = ? AND type = ?", params.streamPath, "hls")
68+
query := plugin.DB.Model(&m7s.RecordStream{}).Where("stream_path = ? AND type = ?", params.streamPath, "ts")
6969

7070
// 添加时间范围查询条件
7171
if !params.startTime.IsZero() && !params.endTime.IsZero() {
@@ -143,7 +143,7 @@ func (plugin *HLSPlugin) hasOnlyMp4Records(fileInfoList []*fileInfo) bool {
143143
}
144144

145145
for _, info := range fileInfoList {
146-
if info.recordType == "hls" {
146+
if info.recordType == "ts" {
147147
return false
148148
}
149149
}
@@ -155,7 +155,7 @@ func (plugin *HLSPlugin) filterTsFiles(fileInfoList []*fileInfo) []*fileInfo {
155155
var filteredList []*fileInfo
156156

157157
for _, info := range fileInfoList {
158-
if info.recordType == "hls" {
158+
if info.recordType == "ts" {
159159
filteredList = append(filteredList, info)
160160
}
161161
}

plugin/hls/pkg/record.go

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -151,6 +151,7 @@ func (r *Recorder) Run() (err error) {
151151
}
152152
return
153153
}, func(video *mpegts.VideoFrame) (err error) {
154+
r.Event.Duration = uint32(video.Timestamp.Milliseconds() - r.lastTs.Milliseconds())
154155
vr := r.RecordJob.Subscriber.VideoReader
155156
if vr.Value.IDR {
156157
if err = r.writeSegment(video.Timestamp, vr.Value.WriteTime); err != nil {

0 commit comments

Comments
 (0)