Skip to content

Commit 2dfbeee

Browse files
committed
feat: use storage from storage config
1 parent eadf20f commit 2dfbeee

File tree

4 files changed

+70
-8
lines changed

4 files changed

+70
-8
lines changed

pkg/storage/local.go

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -202,6 +202,18 @@ func (s *LocalStorage) CreateFile(ctx context.Context, path string) (File, error
202202
return file, nil
203203
}
204204

205+
func (s *LocalStorage) OpenFileFromStorageLevel(ctx context.Context, path string, storageLevel int) (File, error) {
206+
if storageLevel == 0 {
207+
storageLevel = 1
208+
}
209+
fullPath := filepath.Join(s.GetStoragePath(storageLevel), path)
210+
file, err := os.Open(fullPath)
211+
if err != nil {
212+
return nil, fmt.Errorf("failed to open file: %w", err)
213+
}
214+
return file, nil
215+
}
216+
205217
func (s *LocalStorage) OpenFile(ctx context.Context, path string) (File, error) {
206218
// 选择存储路径
207219
basePath, err := s.selectStoragePath()

plugin/mp4/api.go

Lines changed: 3 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -125,15 +125,14 @@ func (p *MP4Plugin) downloadSingleFile(stream *m7s.RecordStream, flag mp4.Flag,
125125
if isLocalStorage {
126126
// 本地存储:根据存储级别获取完整路径后打开文件
127127
if localStorage, ok := st.(*storage.LocalStorage); ok {
128-
fullPath := localStorage.GetFullPath(stream.FilePath, stream.StorageLevel)
129-
file, err = os.Open(fullPath)
128+
file, err = localStorage.OpenFileFromStorageLevel(p, stream.FilePath, stream.StorageLevel)
130129
if err != nil {
131130
http.Error(w, fmt.Sprintf("failed to open local file: %v", err), http.StatusInternalServerError)
132-
p.Error("failed to open local file", "err", err, "path", fullPath)
131+
p.Error("failed to open local file", "err", err, "path", stream.FilePath, "storageLevel", stream.StorageLevel)
133132
return
134133
}
135134
defer file.Close()
136-
p.Info("reading file for fmp4 conversion from local storage", "storageLevel", stream.StorageLevel, "path", fullPath)
135+
p.Info("reading file for fmp4 conversion from local storage", "storageLevel", stream.StorageLevel, "path", stream.FilePath)
137136
} else {
138137
// 类型不匹配,使用 OpenFile 作为兜底
139138
file, err = st.OpenFile(context.Background(), stream.FilePath)

plugin/mp4/pkg/demux-range.go

Lines changed: 54 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -3,8 +3,10 @@ package mp4
33
import (
44
"context"
55
"log/slog"
6+
"m7s.live/v5/pkg/storage"
67
"net"
78
"os"
9+
"path/filepath"
810
"reflect"
911
"time"
1012

@@ -21,23 +23,71 @@ type DemuxerRange struct {
2123
AudioCodec, VideoCodec codec.ICodecCtx
2224
OnAudio, OnVideo func(box.Sample) error
2325
OnCodec func(codec.ICodecCtx, codec.ICodecCtx)
26+
storage storage.Storage
2427
}
2528

2629
func (d *DemuxerRange) Demux(ctx context.Context) error {
2730
var ts, tsOffset int64
2831
var audioInitialized, videoInitialized bool
32+
st := d.storage
33+
var globalStorageType string
34+
var file storage.File
35+
var err error
36+
if st != nil {
37+
globalStorageType = st.GetKey()
38+
}
2939
for _, stream := range d.Streams {
3040
// 检查流的时间范围是否在指定范围内
3141
if stream.EndTime.Before(d.StartTime) || stream.StartTime.After(d.EndTime) {
3242
continue
3343
}
44+
if filepath.IsAbs(stream.FilePath) {
45+
file, err = os.Open(stream.FilePath)
46+
if err != nil {
47+
continue
48+
}
49+
} else {
50+
useGlobalStorage := st != nil && globalStorageType == stream.StorageType
51+
isLocalStorage := stream.StorageType == string(storage.StorageTypeLocal) || stream.StorageType == ""
52+
if useGlobalStorage {
53+
if isLocalStorage {
54+
if localStorage, ok := st.(*storage.LocalStorage); ok {
55+
fullPath := localStorage.GetFullPath(stream.FilePath, stream.StorageLevel)
56+
file, err = os.Open(fullPath)
57+
if err != nil {
58+
continue
59+
}
60+
} else {
61+
// 类型不匹配,使用 OpenFile 作为兜底
62+
file, err = st.OpenFile(ctx, stream.FilePath)
63+
if err != nil {
64+
continue
65+
}
66+
}
67+
} else {
68+
filePath, err := st.GetURL(ctx, stream.FilePath)
69+
if err != nil || filePath == "" {
70+
continue
71+
}
72+
file, err = st.OpenFile(ctx, filePath)
73+
if err != nil {
74+
continue
75+
}
76+
}
77+
} else {
78+
file, err = os.Open(stream.FilePath)
79+
if err != nil {
80+
continue
81+
}
82+
}
83+
}
3484

3585
// 保存上一个文件的最后时间戳,用于跨文件连续
3686
baseOffset := ts
37-
file, err := os.Open(stream.FilePath)
38-
if err != nil {
39-
continue
40-
}
87+
//file, err := os.Open(stream.FilePath)
88+
//if err != nil {
89+
// continue
90+
//}
4191
defer file.Close()
4292

4393
demuxer := NewDemuxer(file)

plugin/mp4/pkg/pull-recorder.go

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -64,6 +64,7 @@ func (p *RecordReader) Run() (err error) {
6464
writer.PublishVideoWriter = m7s.NewPublishVideoWriter[*VideoFrame](publisher, allocator)
6565
}
6666
},
67+
storage: pullJob.Plugin.Server.Storage,
6768
}
6869
demuxerRange.OnAudio = func(a box.Sample) error {
6970
if publisher.Paused != nil {

0 commit comments

Comments
 (0)