Skip to content

Commit a7ad20a

Browse files
committed
Merge branch 'v5' into eanfs-v5
* v5: fix s3 upload file failed fix:update flv plugin routing settings (langhuihui#373)
2 parents d45d47a + 6d323a6 commit a7ad20a

File tree

2 files changed

+71
-11
lines changed

2 files changed

+71
-11
lines changed

pkg/storage/s3.go

Lines changed: 70 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,7 @@ import (
77
"fmt"
88
"os"
99
"strings"
10+
"sync"
1011
"time"
1112

1213
"github.com/aws/aws-sdk-go/aws"
@@ -250,13 +251,18 @@ type S3File struct {
250251
tempFile *os.File // 本地临时文件,用于支持随机访问
251252
filePath string // 临时文件路径
252253
readOnly bool // 只读模式,不上传到S3
254+
255+
mu sync.Mutex // 保护并发访问
253256
}
254257

255258
func (w *S3File) Name() string {
256259
return w.objectKey
257260
}
258261

259262
func (w *S3File) Write(p []byte) (n int, err error) {
263+
// 并发安全写入
264+
w.mu.Lock()
265+
defer w.mu.Unlock()
260266
// 如果还没有创建临时文件,先创建
261267
if w.tempFile == nil {
262268
fmt.Printf("S3File.Write: creating temp file\n")
@@ -276,10 +282,21 @@ func (w *S3File) Write(p []byte) (n int, err error) {
276282
}
277283

278284
func (w *S3File) Read(p []byte) (n int, err error) {
279-
// 如果还没有创建缓存文件,先下载到本地
285+
// 并发安全读取
286+
w.mu.Lock()
287+
defer w.mu.Unlock()
288+
// 如果还没有创建缓存/临时文件
280289
if w.tempFile == nil {
281-
if err = w.downloadToTemp(); err != nil {
282-
return 0, err
290+
if w.readOnly {
291+
// 只读模式,从远端下载
292+
if err = w.downloadToTemp(); err != nil {
293+
return 0, err
294+
}
295+
} else {
296+
// 可写模式,创建空的临时文件(允许随后写入/读取)
297+
if err = w.createTempFile(); err != nil {
298+
return 0, err
299+
}
283300
}
284301
}
285302

@@ -288,6 +305,9 @@ func (w *S3File) Read(p []byte) (n int, err error) {
288305
}
289306

290307
func (w *S3File) WriteAt(p []byte, off int64) (n int, err error) {
308+
// 并发安全写入
309+
w.mu.Lock()
310+
defer w.mu.Unlock()
291311
// 如果还没有创建临时文件,先创建
292312
if w.tempFile == nil {
293313
if err = w.createTempFile(); err != nil {
@@ -300,6 +320,9 @@ func (w *S3File) WriteAt(p []byte, off int64) (n int, err error) {
300320
}
301321

302322
func (w *S3File) ReadAt(p []byte, off int64) (n int, err error) {
323+
// 并发安全读取
324+
w.mu.Lock()
325+
defer w.mu.Unlock()
303326
// 如果还没有创建缓存文件,先下载到本地
304327
if w.tempFile == nil {
305328
if err = w.downloadToTemp(); err != nil {
@@ -320,6 +343,21 @@ func (w *S3File) Sync() error {
320343
return nil
321344
}
322345

346+
// 并发安全Sync,避免与 Write/Close 竞争
347+
w.mu.Lock()
348+
defer w.mu.Unlock()
349+
return w.syncNoLock()
350+
}
351+
352+
// syncNoLock assumes caller holds w.mu
353+
func (w *S3File) syncNoLock() error {
354+
// 只读模式不上传
355+
if w.readOnly {
356+
if w.tempFile != nil {
357+
return w.tempFile.Sync()
358+
}
359+
return nil
360+
}
323361
// 如果使用临时文件,先同步到磁盘
324362
if w.tempFile != nil {
325363
fmt.Printf("S3File.Sync: syncing temp file to disk\n")
@@ -341,10 +379,19 @@ func (w *S3File) Sync() error {
341379
}
342380

343381
func (w *S3File) Seek(offset int64, whence int) (int64, error) {
382+
w.mu.Lock()
383+
defer w.mu.Unlock()
344384
// 如果还没有创建临时文件,先创建或下载
345385
if w.tempFile == nil {
346-
if err := w.downloadToTemp(); err != nil {
347-
return 0, err
386+
// 只读模式从远端下载,否则创建空临时文件
387+
if w.readOnly {
388+
if err := w.downloadToTemp(); err != nil {
389+
return 0, err
390+
}
391+
} else {
392+
if err := w.createTempFile(); err != nil {
393+
return 0, err
394+
}
348395
}
349396
}
350397

@@ -353,22 +400,24 @@ func (w *S3File) Seek(offset int64, whence int) (int64, error) {
353400
}
354401

355402
func (w *S3File) Close() error {
356-
fmt.Printf("S3File.Close: closing file, objectKey=%s\n", w.objectKey)
357-
358-
if err := w.Sync(); err != nil {
359-
fmt.Printf("S3File.Close: sync failed, error=%v\n", err)
403+
// 并发安全Close,确保不与其他操作竞争
404+
w.mu.Lock()
405+
defer w.mu.Unlock()
406+
if err := w.syncNoLock(); err != nil {
360407
return err
361408
}
362409

363410
if w.tempFile != nil {
364411
fmt.Printf("S3File.Close: closing temp file\n")
365412
w.tempFile.Close()
413+
w.tempFile = nil
366414
}
367415

368416
// 清理临时文件
369417
if w.filePath != "" {
370418
fmt.Printf("S3File.Close: removing temp file %s\n", w.filePath)
371419
os.Remove(w.filePath)
420+
w.filePath = ""
372421
}
373422

374423
fmt.Printf("S3File.Close: file closed successfully\n")
@@ -405,7 +454,14 @@ func (w *S3File) uploadTempFile() (err error) {
405454
w.storage.config.Bucket, w.objectKey, stat.Size())
406455

407456
// 上传到S3
408-
_, err = w.storage.uploader.UploadWithContext(w.ctx, &s3manager.UploadInput{
457+
// 使用独立的上传上下文,避免因调用方上下文取消导致上传中断
458+
uploadCtx := context.Background()
459+
if w.storage.config.Timeout > 0 {
460+
var cancel context.CancelFunc
461+
uploadCtx, cancel = context.WithTimeout(uploadCtx, w.storage.config.Timeout)
462+
defer cancel()
463+
}
464+
_, err = w.storage.uploader.UploadWithContext(uploadCtx, &s3manager.UploadInput{
409465
Bucket: aws.String(w.storage.config.Bucket),
410466
Key: aws.String(w.objectKey),
411467
Body: w.tempFile,
@@ -423,6 +479,10 @@ func (w *S3File) uploadTempFile() (err error) {
423479

424480
// downloadToTemp 下载S3对象到本地临时文件
425481
func (w *S3File) downloadToTemp() error {
482+
// 防止重复创建
483+
if w.tempFile != nil {
484+
return nil
485+
}
426486
// 创建临时文件
427487
tempFile, err := os.CreateTemp("", "s3reader_*.tmp")
428488
if err != nil {

plugin/flv/api.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -96,7 +96,7 @@ func (plugin *FLVPlugin) Download_(w http.ResponseWriter, r *http.Request) {
9696

9797
func (plugin *FLVPlugin) RegisterHandler() map[string]http.HandlerFunc {
9898
return map[string]http.HandlerFunc{
99-
"/jessica/{streamPath}": plugin.jessica,
99+
"/jessica/{streamPath...}": plugin.jessica,
100100
}
101101
}
102102

0 commit comments

Comments
 (0)