Skip to content

Commit 53ca1b5

Browse files
committed
Revert "fix(storage/mp4): 修复 S3 上传资源泄漏及代码质量问题"
This reverts commit cf163b6.
1 parent 9061f2b commit 53ca1b5

File tree

2 files changed

+58
-68
lines changed

2 files changed

+58
-68
lines changed

pkg/storage/s3.go

Lines changed: 39 additions & 46 deletions
Original file line numberDiff line numberDiff line change
@@ -5,31 +5,18 @@ package storage
55
import (
66
"context"
77
"fmt"
8-
"log"
98
"os"
109
"strings"
1110
"time"
1211

1312
"github.com/aws/aws-sdk-go/aws"
14-
"github.com/aws/aws-sdk-go/aws/awserr"
1513
"github.com/aws/aws-sdk-go/aws/credentials"
1614
"github.com/aws/aws-sdk-go/aws/session"
1715
"github.com/aws/aws-sdk-go/service/s3"
1816
"github.com/aws/aws-sdk-go/service/s3/s3manager"
1917
"m7s.live/v5/pkg/config"
2018
)
2119

22-
// isS3NotFoundError 使用 AWS SDK 类型断言判断是否为 404 错误,避免脆弱的字符串匹配
23-
func isS3NotFoundError(err error) bool {
24-
if aerr, ok := err.(awserr.Error); ok {
25-
switch aerr.Code() {
26-
case s3.ErrCodeNoSuchKey, "NotFound", "NoSuchBucket":
27-
return true
28-
}
29-
}
30-
return false
31-
}
32-
3320
// S3StorageConfig S3存储配置
3421
type S3StorageConfig struct {
3522
Endpoint string `desc:"S3服务端点"`
@@ -159,7 +146,8 @@ func (s *S3Storage) Exists(ctx context.Context, path string) (bool, error) {
159146
})
160147

161148
if err != nil {
162-
if isS3NotFoundError(err) {
149+
// 检查是否是404错误
150+
if strings.Contains(err.Error(), "NotFound") || strings.Contains(err.Error(), "NoSuchKey") {
163151
return false, nil
164152
}
165153
return false, err
@@ -177,7 +165,7 @@ func (s *S3Storage) GetSize(ctx context.Context, path string) (int64, error) {
177165
})
178166

179167
if err != nil {
180-
if isS3NotFoundError(err) {
168+
if strings.Contains(err.Error(), "NotFound") || strings.Contains(err.Error(), "NoSuchKey") {
181169
return 0, ErrFileNotFound
182170
}
183171
return 0, err
@@ -336,13 +324,20 @@ func (w *S3File) Sync() error {
336324
return nil
337325
}
338326

339-
// 先将临时文件同步到磁盘
327+
// 如果使用临时文件,先同步到磁盘
340328
if w.tempFile != nil {
341329
if err := w.tempFile.Sync(); err != nil {
342330
return err
343331
}
332+
// 获取文件大小用于日志
333+
if stat, err := w.tempFile.Stat(); err == nil {
334+
fmt.Printf("[S3File.Sync] tempFile size: %d bytes, path: %s\n", stat.Size(), w.filePath)
335+
}
336+
}
337+
if err := w.uploadTempFile(); err != nil {
338+
return err
344339
}
345-
return w.uploadTempFile()
340+
return nil
346341
}
347342

348343
func (w *S3File) Seek(offset int64, whence int) (int64, error) {
@@ -358,18 +353,17 @@ func (w *S3File) Seek(offset int64, whence int) (int64, error) {
358353
}
359354

360355
func (w *S3File) Close() error {
361-
// 用 defer 确保无论上传成功与否,临时文件句柄和磁盘文件都被清理
362-
defer func() {
363-
if w.tempFile != nil {
364-
w.tempFile.Close()
365-
w.tempFile = nil
366-
}
367-
if w.filePath != "" {
368-
os.Remove(w.filePath)
369-
w.filePath = ""
370-
}
371-
}()
372-
return w.Sync()
356+
if err := w.Sync(); err != nil {
357+
return err
358+
}
359+
if w.tempFile != nil {
360+
w.tempFile.Close()
361+
}
362+
// 清理临时文件
363+
if w.filePath != "" {
364+
os.Remove(w.filePath)
365+
}
366+
return nil
373367
}
374368

375369
// createTempFile 创建临时文件
@@ -385,30 +379,23 @@ func (w *S3File) createTempFile() error {
385379
}
386380

387381
func (w *S3File) Stat() (os.FileInfo, error) {
388-
if w.tempFile == nil {
389-
return nil, fmt.Errorf("s3 file not initialized")
390-
}
391382
return w.tempFile.Stat()
392383
}
393384

394385
// uploadTempFile 上传临时文件到S3
395386
func (w *S3File) uploadTempFile() (err error) {
396387
// 重置文件指针到开头
397-
if _, err = w.tempFile.Seek(0, 0); err != nil {
388+
if _, err := w.tempFile.Seek(0, 0); err != nil {
389+
fmt.Printf("[S3File.uploadTempFile] failed to seek: %v\n", err)
398390
return fmt.Errorf("failed to seek temp file: %w", err)
399391
}
400392

393+
// 获取文件大小
401394
stat, _ := w.tempFile.Stat()
402-
log.Printf("[S3] uploading: bucket=%s key=%s size=%d", w.storage.config.Bucket, w.objectKey, stat.Size())
403-
404-
// 使用带超时的 background context,避免因录像 context 取消而中断上传
405-
timeout := w.storage.config.Timeout
406-
if timeout <= 0 {
407-
timeout = 30 * time.Second
408-
}
409-
ctx, cancel := context.WithTimeout(context.Background(), timeout)
410-
defer cancel()
395+
fmt.Printf("[S3File.uploadTempFile] uploading to S3: bucket=%s, key=%s, size=%d\n",
396+
w.storage.config.Bucket, w.objectKey, stat.Size())
411397

398+
// 构建上传请求,携带用户自定义元数据
412399
uploadInput := &s3manager.UploadInput{
413400
Bucket: aws.String(w.storage.config.Bucket),
414401
Key: aws.String(w.objectKey),
@@ -418,17 +405,21 @@ func (w *S3File) uploadTempFile() (err error) {
418405
if len(w.metadata) > 0 {
419406
uploadInput.Metadata = aws.StringMap(w.metadata)
420407
}
408+
// 上传到S3
409+
_, err = w.storage.uploader.UploadWithContext(w.ctx, uploadInput)
421410

422-
if _, err = w.storage.uploader.UploadWithContext(ctx, uploadInput); err != nil {
411+
if err != nil {
412+
fmt.Printf("[S3File.uploadTempFile] upload failed: %v\n", err)
423413
return fmt.Errorf("failed to upload to S3: %w", err)
424414
}
425415

426-
log.Printf("[S3] upload successful: %s", w.objectKey)
416+
fmt.Printf("[S3File.uploadTempFile] upload successful: %s\n", w.objectKey)
427417
return nil
428418
}
429419

430420
// downloadToTemp 下载S3对象到本地临时文件
431421
func (w *S3File) downloadToTemp() error {
422+
// 创建临时文件
432423
tempFile, err := os.CreateTemp("", "s3reader_*.tmp")
433424
if err != nil {
434425
return fmt.Errorf("failed to create temp file: %w", err)
@@ -437,6 +428,7 @@ func (w *S3File) downloadToTemp() error {
437428
w.tempFile = tempFile
438429
w.filePath = tempFile.Name()
439430

431+
// 下载S3对象
440432
_, err = w.storage.downloader.DownloadWithContext(w.ctx, tempFile, &s3.GetObjectInput{
441433
Bucket: aws.String(w.storage.config.Bucket),
442434
Key: aws.String(w.objectKey),
@@ -445,14 +437,15 @@ func (w *S3File) downloadToTemp() error {
445437
if err != nil {
446438
tempFile.Close()
447439
os.Remove(w.filePath)
448-
if isS3NotFoundError(err) {
440+
if strings.Contains(err.Error(), "NotFound") || strings.Contains(err.Error(), "NoSuchKey") {
449441
return ErrFileNotFound
450442
}
451443
return fmt.Errorf("failed to download from S3: %w", err)
452444
}
453445

454446
// 重置文件指针到开始位置
455-
if _, err = tempFile.Seek(0, 0); err != nil {
447+
_, err = tempFile.Seek(0, 0)
448+
if err != nil {
456449
tempFile.Close()
457450
os.Remove(w.filePath)
458451
return fmt.Errorf("failed to seek temp file: %w", err)

plugin/mp4/pkg/record.go

Lines changed: 19 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,7 @@ import (
1010
"time"
1111

1212
task "github.com/langhuihui/gotask"
13+
"gorm.io/gorm"
1314
m7s "m7s.live/v5"
1415
"m7s.live/v5/pkg"
1516
"m7s.live/v5/pkg/codec"
@@ -29,48 +30,43 @@ var writeTrailerQueueTask WriteTrailerQueueTask
2930

3031
type writeTrailerTask struct {
3132
task.Task
32-
muxer *Muxer
33-
file storage.File
34-
filePath string
35-
durationMs uint32 // 录像时长(毫秒),用于上传 S3 元数据
33+
muxer *Muxer
34+
file storage.File
35+
filePath string
36+
recordID uint // 录像记录ID
37+
targetStorage map[string]any // 目标存储配置
38+
deleteLocal bool // 上传成功后是否删除本地文件
39+
db *gorm.DB // 数据库连接
40+
durationMs uint32 // 录像时长(毫秒),用于上传 S3 元数据
3641
}
3742

3843
func (task *writeTrailerTask) Start() (err error) {
3944
task.Info("write trailer start")
40-
if err = task.muxer.WriteTrailer(task.file); err != nil {
45+
err = task.muxer.WriteTrailer(task.file)
46+
if err != nil {
4147
task.Error("write trailer", "err", err)
42-
// 关闭文件,忽略关闭错误以保留原始错误
4348
if task.file != nil {
44-
task.file.Close()
45-
task.file = nil
49+
if errClose := task.file.Close(); errClose != nil {
50+
return errClose
51+
}
4652
}
4753
}
4854
return
4955
}
5056

5157
const BeforeMdatData = 16 // free box + mdat box header or big mdat box header
52-
5358
// 将 moov 从末尾移动到前方
5459
// 将 ftyp + free(optional) + moov + mdat 写入临时文件, 然后替换原文件
5560
func (t *writeTrailerTask) Run() (err error) {
5661
t.Info("write trailer")
57-
58-
// 确保任何错误路径下 t.file 都被关闭
59-
defer func() {
60-
if err != nil && t.file != nil {
61-
t.file.Close()
62-
t.file = nil
63-
}
64-
}()
65-
6662
var temp *os.File
6763
temp, err = os.CreateTemp("", "*.mp4")
6864
if err != nil {
6965
t.Error("create temp file", "err", err)
7066
return
7167
}
68+
7269
defer os.Remove(temp.Name())
73-
defer temp.Close()
7470

7571
_, err = t.file.Seek(0, io.SeekStart)
7672
if err != nil {
@@ -125,11 +121,12 @@ func (t *writeTrailerTask) Run() (err error) {
125121
}
126122
if err = t.file.Close(); err != nil {
127123
t.Error("close file", "err", err)
128-
t.file = nil // 防止 defer 重复关闭
129124
return
130125
}
131-
t.file = nil // 标记已关闭,防止 defer 重复关闭
132-
// temp 由 defer temp.Close() 负责关闭
126+
if err = temp.Close(); err != nil {
127+
t.Error("close temp file", "err", err)
128+
}
129+
133130
return
134131
}
135132

0 commit comments

Comments
 (0)